# -*- coding: utf-8 -*- import os import re import pylab import numpy import tempfile import subprocess import shutil import list_utils import plug import spc300 config = { # Path, relative to the home directory, of the directory containing the # Ixia results "results_path":"ixos/Results", # Ixia port of the master, with the syntax X.Y.Z where # X is the chassis index, Y is the board index and Z is the port index "master_port":"1.3.1", # Ixia port of the slaves, with the syntax X.Y.Z where # X is the chassis index, Y is the board index and Z is the port index # If several ports are used, for "mesh" tests, they must be consecutive # and "slave_port" must be the smallest one "slave_port":"1.3.2", # Minimum VLAN ID used - if several VLAN ID's are used, they must be # consecutive - not used for "mesh" tests "min_vlan_id":11 } font = {'color':'k', 'fontweight':'bold', 'fontsize':10} def results_path(): return os.path.join(os.getenv("HOME"), config["results_path"]) def update_config(new_config): """Update the configuration""" global config config = new_config parse_port(config["master_port"]) parse_port(config["slave_port"]) def tcl_file_name(stas_nb, direction): return "%d_%s" % (stas_nb, direction) def get_map_add(master_port, slave_port, direction, test_type, stas_nb): master_port = parse_port(master_port) slave_port = parse_port(slave_port) if test_type == "mesh": slaves_ports_number = stas_nb else: slaves_ports_number = 1 result = [] for i in range(0, slaves_ports_number): current_slave_port = (slave_port[0], slave_port[1], slave_port[2] + i) if direction == "down": couples = [(master_port, current_slave_port)] elif direction == "up": couples = [(current_slave_port, master_port)] elif direction == "bi" : couples = [(master_port, current_slave_port), (current_slave_port, master_port)] elif direction == "loop": if i == slaves_ports_number - 1: next_port = master_port else: next_port = (slave_port[0], slave_port[1], slave_port[2] + i + 1) if i == 0: couples = [(master_port, current_slave_port), (current_slave_port, next_port)] else: couples = [(current_slave_port, next_port)] result.extend(couples) return "\n".join(map(get_map_add_aux, result)) def parse_port(port): result = re.search("^([1-8])\.([1-8])\.([1-8])$", port).groups() return (int(result[0]), int(result[1]), int(result[2])) def get_map_add_aux((port_1, port_2)): args = (port_1[0], port_1[1], port_1[2], port_2[0], port_2[1], port_2[2]) return "map add %s %s %s %s %s %s" % args def execute(test_type, stas_nb, direction, version_directory, parameters, ftp_server): clean_traces() # We retrieve here the new parameters deduced from a previous test new_parameters_list = get_parameters(test_type, stas_nb, direction, version_directory) for new_parameters in new_parameters_list: new_parameters = parameters + new_parameters new_parameters = [ (test_type, key, value) for (key, value) in new_parameters] execute_aux(test_type, stas_nb, direction, version_directory, new_parameters) dump_traces(test_type, stas_nb, direction, version_directory, ftp_server) def clean_traces(): for key in spc300.config["plugs"].keys(): spc300.clean_trace(key) def get_parameters(test_type, stas_nb, direction, prim_directory): if test_type in ["tput", "bcast", "mesh"]: return [[]] elif test_type in ["latency", "floss"]: (frame_sizes, [max_values], _, _) = \ extract_data("tput", stas_nb, [direction], prim_directory) directions = [direction] * len(frame_sizes) return map(get_parameters_aux, frame_sizes, max_values, directions) else: assert False, test_type def get_parameters_aux(frame_size, max_value, direction): if direction == "bi": max_value = max_value / 2 max_value = round(max_value * 0.9, 2) return [("mbpsRate", max_value), ("framesizeList", "{ %s }" % frame_size)] def execute_aux(test_type, stas_nb, direction, version_directory, parameters): master_port = config["master_port"] slave_port = config["slave_port"] min_vlan_id = config["min_vlan_id"] (tmp_directory, file_out_name) = create_tcl_file(test_type, stas_nb, direction, version_directory, parameters, master_port, slave_port, min_vlan_id) file_out_path = os.path.join(tmp_directory, file_out_name) subprocess.check_call(["ixwish", file_out_path]) shutil.rmtree(tmp_directory) def create_tcl_file(test_type, stas_nb, direction, version_directory, parameters, master_port, slave_port, min_vlan_id): file_in_name = test_type + ".tcl" file_in_path = os.path.join(os.path.dirname(__file__), "ixia_tcl_templates", file_in_name) file_in = open(file_in_path, 'r') file_in_content = file_in.read() file_in.close() directory = "\"" + os.path.join(results_path(), version_directory) + "\"" file_out_content = replace_parameters( \ file_in_content, [("user", "version", version_directory), ("results", "directory", directory)] + parameters) map_add = get_map_add(master_port, slave_port, direction, test_type, stas_nb) (dictionnary, _, _) = get_specs(test_type) (log_on, test_name, test_name_str, map_type) = \ get_from_direction(dictionnary, direction) template_dict = {"min_vlan_id":min_vlan_id, "stas_nb":stas_nb, "master_comma_port":master_port.replace('.', ','), "slave_comma_port":slave_port.replace('.', ','), "master_dot_port":master_port, "slave_dot_port":slave_port, "map_add":map_add, "log_on":log_on, "map_type":map_type, "test_name":test_name, "test_name_str":test_name_str } for key, value in template_dict.items(): file_out_content = file_out_content.replace("{%s}" % key, str(value)) tmp_directory = tempfile.mkdtemp() file_out_name = tcl_file_name(stas_nb, direction) + '.tcl' file_out_path = os.path.join(tmp_directory, file_out_name) file_out = open(file_out_path, 'w') file_out.write(file_out_content) file_out.close() return (tmp_directory, file_out_name) def dump_traces(test_type, stas_nb, direction, version_directory, ftp_server): """Dump the traces present on the plugs in the directory containing the results of the last executed script""" results_directory = get_last_results_directory(test_type, stas_nb, direction, version_directory) for key in spc300.config["plugs"].keys(): spc300.dump_trace(key, results_directory, ftp_server) def get_color(ref_value, couples, last_color): couples.sort() couples.reverse() for (value, color) in couples: if ref_value > value: return color return last_color def normalize_latency(x): '''Convert a latency in nanoseconds (x) in a latency in milliseconds''' return round(x / 1000 / 1000, 2) def latency_data_fun(_, data_vectors): [(min_vector, _), (_, max_vector), (avg_vector, _)] = data_vectors min_vector = map(normalize_latency, min_vector) max_vector = map(normalize_latency, max_vector) avg_vector = map(normalize_latency, avg_vector) return avg_vector, [min_vector, avg_vector, max_vector] def latency_color_fun(vector, _): return vector def throughput_data_fun(_, data_vectors): [(min_vector, max_vector)] = data_vectors return max_vector, [min_vector, max_vector] def prim_throughput_color_fun(_, vectors): [min_vector, max_vector] = vectors return map(delta_value, min_vector, max_vector) def sec_throughput_color_fun(sec_vector, prim_vector): return map(delta_value, sec_vector, prim_vector) def mesh_data_fun(frame_sizes_vector, data_vectors): [(min_vector, max_vector)] = data_vectors min_vector = map(normalize_fps, min_vector, frame_sizes_vector) max_vector = map(normalize_fps, max_vector, frame_sizes_vector) return max_vector, [min_vector, max_vector] def prim_mesh_color_fun(_, vectors): [min_vector, max_vector] = vectors return map(delta_value, min_vector, max_vector) def sec_mesh_color_fun(sec_vector, prim_vector): return map(delta_value, sec_vector, prim_vector) def normalize_fps(x, y): '''Convert a frames per second rate (x) and a frame size in bytes (y) in a megabits per second rate''' return round(x * y * 8 / 1024 / 1024, 2) def broadcast_data_fun(frame_sizes_vector, data_vectors): [(min_vector, max_vector)] = data_vectors min_vector = map(normalize_fps, min_vector, frame_sizes_vector) max_vector = map(normalize_fps, max_vector, frame_sizes_vector) return max_vector, [min_vector, max_vector] def broadcast_color_fun(_, vectors): [min_vector, max_vector] = vectors return map(delta_value, min_vector, max_vector) def frame_loss_data_fun(frame_sizes_vector, data_vectors): [(min_vector, max_vector)] = data_vectors return max_vector, [min_vector, max_vector] def frame_loss_color_fun(vector, _): return vector def get_specs(test_type): # The first term contains the parameters to customize the TCL templates # The second term contains the parameters to extract the right data from the # Ixia measurements files # The third term contains the parameters to display correctly the data # on the figures if test_type == "latency": return \ ( ({"default":("latency", "latency", "Latency", "one2one")}), ("RFC 2544 - IPv6 Benchmark", {"default":"Latency"}, "AggregateResults.csv", ("Frame Size", {"default":["Min Latency (ns)", "Max Latency (ns)", "Average Latency (ns)"]}), latency_data_fun), (['Min', 'Avg', 'Max'], 'upper left', "millisec", (latency_color_fun, "avg (millisec)", [(200, "red"), (100, "orange")], "white"), None,) ) elif test_type == "tput": return \ ( ({"default":("thruput", "tput", "Throughput", "one2one")}), ("RFC 2544 - IPv6 Benchmark", {"default":"Throughput"}, "AggregateResults.csv", ("Frame Size", {"default":["Agg Tput (Mbps)"]}), throughput_data_fun), (['Min', 'Max'], 'upper left', "Mbits/sec", (prim_throughput_color_fun, "(max - min) / max (%)", [(10, "red"), (2, "orange")], "white"), (sec_throughput_color_fun, "(a - b) / a (%)", [(5, "pink"), (3, "palegreen"), (-3, "white"), (- 5, "orange")], "red"), ) ) elif test_type == "mesh": return \ ( ({"up":("many2one", "manyToOne", "Many to one Throughput", "many2one"), "down":("one2many", "oneToMany", "One to many Throughput", "one2many"), "default": ("partiallymeshed", "partiallyMeshed", "Partially Meshed", "many2many")}), ("RFC 2889", {"up":"Many to one Throughput", "down":"One to many Throughput", "default":"Partially Meshed", }, "AggregateResults.csv", ("Frame Size", {"loop":["Agg RxTput(fps)"], "default":["Agg TxTput(fps)"]}), mesh_data_fun), (['Min', 'Max'], 'upper left', "Mbits/sec", (prim_mesh_color_fun, "(max - min) / max (%)", [(10, "red"), (2, "orange")], "white"), (sec_mesh_color_fun, "(a - b) / a (%)", [(5, "pink"), (3, "palegreen"), (-3, "white"), (- 5, "orange")], "red"), ) ) elif test_type == "bcast": return \ ( ({"default":("broadcast", "broadcast", "Broadcast Rate", "one2many")}), ("RFC 2889", {"default":"Broadcast Rate"}, "AggregateResults.csv", ("Frame Size (bytes)", {"default":["Agg Avg Forwarding Rate (fps)"]}), broadcast_data_fun), (['Min', 'Max'], 'lower left', "Mbits/sec", (broadcast_color_fun, "(max - min) / max (%)", [(2, "red"), (1, "orange")], "white"), None, ) ) elif test_type == "floss": return \ ( ({"default":("frameloss", "floss", "Frame Loss", "one2one")}), ("RFC 2544 - IPv6 Benchmark", {"default":"Frame Loss"}, "iteration.csv", ("Frame Size", {"default":["Frame Loss (%)"]}), frame_loss_data_fun), (['Min', 'Max'], 'lower left', "%", (frame_loss_color_fun, "max (%)", [(1, "red"), (0, "orange")], "white"), None, ) ) else: assert False, type def extract_data(test_type, stas_nb, directions, version_directory): # Some types of tests depend on other tests if test_type in ["tput", "bcast", "mesh"]: sec_frame_sizes_vector = None directories_nb = 1 elif test_type in ["latency", "floss"]: (sec_frame_sizes_vector, _, _, _) = \ extract_data("tput", stas_nb, directions, version_directory) directories_nb = len(sec_frame_sizes_vector) else: assert False, test_type (_, (main_directory, sub_directories, file_name, file_columns_headers, data_fun), _) = get_specs(test_type) frame_sizes_matrix = [] data = [] detailed_data = [] traces_files = [] for direction in directions: sub_directory = get_from_direction(sub_directories, direction) frame_sizes_vector, data_vectors, traces_files_aux = \ get_data(version_directory, main_directory, sub_directory, stas_nb, direction, directories_nb, file_name, file_columns_headers) traces_files.append(traces_files_aux) frame_sizes_matrix.append(frame_sizes_vector) data_vector, detailed_data_vectors = \ data_fun(frame_sizes_vector, data_vectors) detailed_data.append(detailed_data_vectors) data.append(data_vector) #We expect the frame sizes to be the same for all tests frame_sizes_vector = frame_sizes_matrix[0] for vector in frame_sizes_matrix: assert vector == frame_sizes_vector, (vector, frame_sizes_vector) assert (sec_frame_sizes_vector is None) or \ (sec_frame_sizes_vector == frame_sizes_vector), \ (sec_frame_sizes_vector, frame_sizes_vector) return (frame_sizes_vector, data, detailed_data, traces_files) def get_fig_data(sec_directory, directions): figures_number = 0 # For the secondary table if sec_directory is not None: figures_number = figures_number + 2 # For the primary table figures_number = figures_number + 2 # For the figures figures_number = figures_number + len(directions) # For the first term: # 121 means that the figures are displayed in a # matrix 1 (vertical) x 2 (horizontal) and that the current # figure is the first one, at the upper left # For the second term: # it represent the size in inches of the figure (width x height) height_factor = 0.84 dictionary = {1:(121, (8, 3 * height_factor)), 2:(121, (8, 3 * height_factor)), 3:(221, (8, 6 * height_factor)), 4:(221, (8, 6 * height_factor)), 5:(321, (8, 9 * height_factor)), 6:(321, (8, 9 * height_factor)), 7:(421, (8, 12 * height_factor)), 8:(421, (8, 12 * height_factor)) } try: return dictionary[figures_number] except: assert False, ("Invalid figures number", figures_number) def draw(test_type, stas_nb, directions, prim_directory, sec_directory, ylim, figure_path): """Draw a figure in a file in return the list of the corresponding traces""" (prim_frame_sizes_vector, prim_data, prim_detailed_data, traces_files) = \ extract_data(test_type, stas_nb, directions, prim_directory) (fig_index, fig_size) = get_fig_data(sec_directory, directions) init_pylab(fig_size) # Extract the display parameters (_, _, (bar_labels, bar_legend_loc, unit, prim_table_colors_spec, sec_table_colors_spec)) = get_specs(test_type) # Draw the secondary table, if any if sec_directory is not None: (sec_frame_sizes_vector, sec_data, _ , _) = \ extract_data(test_type, stas_nb, directions, sec_directory) assert sec_frame_sizes_vector == prim_frame_sizes_vector, \ ("The secondary and the primary data have not the same frame sizes", test_type, stas_nb, directions, prim_directory, sec_directory, prim_frame_sizes_vector, sec_frame_sizes_vector) draw_table(sec_data, prim_data, fig_index, sec_frame_sizes_vector, directions, sec_directory + " (b)", sec_table_colors_spec) fig_index = fig_index + 2 # Draw the primary table draw_table(prim_data, prim_detailed_data, fig_index, prim_frame_sizes_vector, directions, prim_directory + " (a)", prim_table_colors_spec) fig_index = fig_index + 2 # Draw the bars for (_, vectors, direction) in \ zip(prim_data, prim_detailed_data, directions): plot_bar(unit, prim_frame_sizes_vector, vectors, bar_labels, bar_legend_loc, direction, fig_index) fig_index = fig_index + 1 if ylim != "default": pylab.ylim(ylim) pylab.savefig(figure_path) return traces_files def init_pylab(fig_size): (fig_height, fig_width) = fig_size pylab.clf() pylab.figure(1, facecolor = 'w', edgecolor = 'c') params = {'backend': 'ps', 'axes.labelsize': 8, 'text.fontsize': 8, 'legend.fontsize': 8, 'xtick.labelsize': 8, 'ytick.labelsize': 8, 'text.usetex': False} pylab.rcParams.update(params) # For some unknown reason, the figure is not modified the parameters are # passed as argument to pylab.figure or pylab.rcParams.update fig = pylab.gcf() fig.set_size_inches(fig_height, fig_width) # Necessary to avoid any overlapping between the xlabel and the titles when # there are several rows of subplots fig.subplots_adjust(hspace = 0.5) def prepare_axis(fig_index): ax = pylab.subplot(fig_index, frame_on = False) ax.xaxis.set_ticks_position("none") ax.yaxis.set_ticks_position("none") pylab.setp(ax.get_xticklabels(), visible = False) pylab.setp(ax.get_yticklabels(), visible = False) def draw_table(ref_data, comparison_data, fig_index, frame_sizes_vector, directions, title, table_colors_spec): (color_fun, _, couples, last_color) = table_colors_spec table_colors = [] for (ref_datum, comparison_datum) in zip(ref_data, comparison_data): vector = color_fun(ref_datum, comparison_datum) colors_vector = [ get_color(value, couples, last_color) for value in vector ] table_colors.append(colors_vector) prepare_axis(fig_index) pylab.table(cellText = zip(*ref_data), rowLabels = frame_sizes_vector, colLabels = directions, loc = 'center left', cellColours = zip(*table_colors) ) pylab.title(title, font) draw_table_color_legend(fig_index + 1, table_colors_spec) def draw_table_color_legend(fig_index, table_colors_spec): prepare_axis(fig_index) (_, header, couples, last_color) = table_colors_spec couples.sort() couples.reverse() table_color_legend = get_table_color_legend(header, couples, last_color) (cell_text, cell_colours) = zip(*table_color_legend) pylab.table(cellText = [ [elem] for elem in cell_text], loc = 'center left', cellColours = [ [elem] for elem in cell_colours] ) pylab.title("table color legend", font) def get_table_color_legend(header, couples, last_color): result = [("x = " + header, "white")] (value, color) = couples[0] result.append(("x > %d" % value, color)) i = 0 for i in range(1, len(couples)): (prev_value, _) = couples[i - 1] (value, color) = couples[i] result.append(("%d >= x > %d" % (prev_value, value), color)) result.append(("%d >= x" % couples[len(couples) - 1][0], last_color)) return result def get_results_directories(version_directory, main_directory, sub_directory, stas_nb, direction, directories_nb): path = os.path.join(results_path(), version_directory, main_directory + ".resDir", sub_directory + ".resDir", tcl_file_name(stas_nb, direction) + ".res") directories = os.listdir(path) directories.sort() first_file_index = len(directories) - directories_nb assert first_file_index > 0 directories = directories[first_file_index:len(directories)] for directory in directories: assert re.search("^Run\d{4}\.res$", directory) is not None, \ directory return (path, directories) def get_last_results_directory(test_type, stas_nb, direction, version_directory): (_, (main_directory, sub_directories, _, _, _), _) = get_specs(test_type) sub_directory = get_from_direction(sub_directories, direction) (path, [directory]) = get_results_directories(version_directory, main_directory, sub_directory, stas_nb, direction, 1) return os.path.join(path, directory) def get_data(version_directory, main_directory, sub_directory, stas_nb, direction, directories_nb, file_name, file_columns_headers): (path, directories) = get_results_directories(version_directory, main_directory, sub_directory, stas_nb, direction, directories_nb) input_results = "" traces_files = [] print "" for directory in directories: directory_path = os.path.join(path, directory) all_files = os.listdir(directory_path) filter_fun = lambda x: re.search("^trace_.*_\d+\.gz$", x) is not None map_fun = lambda x: os.path.join(path, directory, x) traces_files_aux = map(map_fun, filter(filter_fun, all_files)) traces_files.append(traces_files_aux) input_results_path = os.path.join(directory_path, file_name) print "Extracting data from", input_results_path, "..." input_results_file = open(input_results_path) input_results = input_results + input_results_file.read() input_results_file.close() file_columns_headers = update_headers(file_columns_headers, direction) frame_sizes_vector, data_vectors = get_data_aux(input_results, file_columns_headers) return frame_sizes_vector, data_vectors, traces_files def update_headers((frame_size_header, data_headers), direction): data_headers = get_from_direction(data_headers, direction) return (frame_size_header, data_headers) def get_data_aux(input_results, file_columns_headers): input_results = input_results.split("\n") file_frame_size_column_header, file_data_columns_headers = \ file_columns_headers # The first line contains the headers headers = input_results[0].split(",") frame_size_index = headers.index(file_frame_size_column_header) # The last line is normally empty assert input_results[len(input_results) - 1] == "" input_results = input_results[1:len(input_results) - 1] output_results = [] for input_result in input_results: row = input_result.split(",") for header in headers: try: frame_size = int(row[frame_size_index]) data = float(row[headers.index(header)]) output_result = (frame_size, header, data) output_results.append(output_result) except: # We ignore the non-numeric data, including the # redundant lines of headers pass # Get the distinct frame sizes, without duplicates, sorted frame_sizes_vector = [frame_size for (frame_size, _, _) \ in output_results ] frame_sizes_vector = list_utils.drop_duplicates(frame_sizes_vector) frame_sizes_vector.sort() # Get the maximum and minimum for each frame size and each header data_vectors = [] for header in file_data_columns_headers: min_vector = [] max_vector = [] for frame_size in frame_sizes_vector: lines = filter_lines(frame_size, header, output_results) min_vector.append(min_value(lines)) max_vector.append(max_value(lines)) assert len(frame_sizes_vector) == len(min_vector) assert len(frame_sizes_vector) == len(max_vector) data_vectors.append((min_vector, max_vector)) return frame_sizes_vector, data_vectors def plot_bar(unit, frame_sizes_vector, data_vectors, data_legends, legend_loc, direction, fig_index): pylab.subplot(fig_index) # bar_width * len(data_vectors) must be < 1 bar_width = 1. / (1 + len(data_vectors)) bar_offset = 0.1 bar_colors = ['red', 'blue', 'orange'] fs_indexes = numpy.arange(len(frame_sizes_vector)) data_indexes = range(len(data_vectors)) bars = [ pylab.bar(bar_offset + fs_indexes + data_index * bar_width, data_vectors[data_index], bar_width, color = bar_colors[data_index]) for data_index in data_indexes ] xtick_offset = bar_offset + bar_width * len(data_vectors) / 2 pylab.xticks(fs_indexes + xtick_offset, frame_sizes_vector) pylab.legend([ bars[data_index][0] for data_index in data_indexes ], data_legends, legend_loc) pylab.title(direction, font) pylab.ylabel(unit, font) pylab.xlabel("frame size (bytes)", font) pylab.grid(True) def filter_lines(frame_size, header, output_results): acc = [] for output_result in output_results: if output_result[0] == frame_size and \ output_result[1] == header: acc.append(output_result[2]) return acc def min_value(values): try: return round(max([min(values), 0]), 2) except: return 0 def max_value(values): try: return round(max([max(values), 0]), 2) except: return 0 def delta_value(min_value, max_value): result = 100.0 * (max_value - min_value) / max_value return round(result, 2) def replace_parameters(file_content, parameters): def fun(line): for (level, key, value) in parameters: target = level + " config -" + key + " " if target in line: line = target + str(value) return line separator = "\n" lines = file_content.split(separator) return separator.join(map(fun, lines)) def get_from_direction(dictionnary, direction): try: return dictionnary[direction] except: return dictionnary["default"] if __name__ == "__main__": assert 1 == get_from_direction({"up":1, "default":2}, "up") assert 2 == get_from_direction({"up":1, "default":2}, "down") assert 0 == max_value([]) assert 0 == max_value([0]) assert 2 == max_value([2, 1]) assert 0 == max_value([-1, -2]) assert 0 == min_value([]) assert 0 == min_value([0]) assert 1 == min_value([2, 1]) assert 0 == min_value([-1, -2]) assert 0 == delta_value(100, 100) assert 6.54 == delta_value(100, 107) assert 23.08 == delta_value(100, 130) assert [] == filter_lines(1, 2, []) assert [] == filter_lines(1, 2, [(0, 1, 1)]) assert [0] == filter_lines(1, 2, [(1, 2, 0)]) assert [0] == filter_lines(1, 2, [(1, 2, 0), (1, 1, 1)]) assert [0, 2] == filter_lines(1, 2, [(1, 2, 0), (1, 2, 2)]) AggregateResults = \ "Trial,Frame Size,Agg Tput (fps),Max Tput (fps),Agg Tput (Mbps),Max Tput (Mbps),Agg Tput (% Line rate)\n" \ "1,68,41876.051,20938.026,22.781,11.390,1.474\n" \ "1,128,41037.432,20518.716,42.022,21.011,2.430\n" \ "1,256,39885.127,19942.563,81.685,40.842,4.403\n" \ "1,512,30932.938,15466.469,126.701,63.351,6.583\n" \ "1,768,34965.035,17482.517,214.826,107.413,11.021\n" \ "1,1024,20983.716,10491.858,171.899,85.949,8.763\n" \ "1,1280,29439.474,14719.737,301.460,150.730,15.309\n" \ "1,1518,25216.864,12608.432,306.234,153.117,15.513\n" \ "2,68,41876.051,20938.026,22.781,11.390,1.474\n" \ "2,128,39885.135,19942.567,40.842,20.421,2.361\n" \ "2,256,20086.775,10043.388,41.138,20.569,2.218\n" \ "Trial,Frame Size,Agg Tput (fps),Max Tput (fps),Agg Tput (Mbps),Max Tput (Mbps),Agg Tput (% Line rate)\n" \ "2,512,37346.880,18673.440,152.973,76.486,7.947\n" \ "2,768,34965.035,17482.517,214.826,107.413,11.021\n" \ "2,1024,32258.065,16129.032,264.258,132.129,13.471\n" \ "2,1280,29968.834,14984.417,306.881,153.440,15.584\n" \ "2,1518,25105.441,12552.721,304.880,152.440,15.445\n" \ "3,68,41876.051,20938.026,22.781,11.390,1.474\n" \ "3,128,41037.432,20518.716,42.022,21.011,2.429\n" \ "3,256,39885.127,19942.563,81.685,40.842,4.403\n" \ "3,512,37346.880,18673.440,152.973,76.486,7.947\n" \ "3,768,34965.035,17482.517,214.826,107.413,11.021\n" \ "3,1024,32258.065,16129.032,264.258,132.129,13.471\n" \ "3,1280,29832.936,14916.468,305.489,152.745,15.513\n" \ "3,1518,25216.864,12608.432,306.234,153.117,15.513\n" assert \ ([68, 128, 256, 512, 768, 1024, 1280, 1518], [([22.78, 40.84, 41.14, 126.7, 214.83, 171.9, 301.46, 304.88], [22.78, 42.02, 81.69, 152.97, 214.83, 264.26, 306.88, 306.23]) ]) == get_data_aux(AggregateResults, ("Frame Size", ["Agg Tput (Mbps)"])) iteration = \ "Trial,Frame Size,Iteration,Tx Port,Rx Port,Rate FPS,Tx Rate (mbps),Rate %,TX Count,RX Count,Frame Loss,Frame Loss (%),RX Rate (fps),Rx Rate (mbps)\n" \ "1,68,1,Slaves,Master,30080.511,16.364,2.118,18048312,17567793,480519,2.662,29290,15.934\n" \ "Trial,Frame Size,Iteration,Tx Port,Rx Port,Rate FPS,Tx Rate (mbps),Rate %,TX Count,RX Count,Frame Loss,Frame Loss (%),RX Rate (fps),Rx Rate (mbps)\n" \ "1,68,1,Slaves,Master,30080.511,16.364,2.118,18048312,17567793,480519,2.662,29290,15.934\n" assert ([68], [([2.66], [2.66]) ])\ == get_data_aux(iteration, ("Frame Size", ["Frame Loss (%)"])) assert "tput config -numtrials 3" == \ replace_parameters("tput config -numtrials 5", [("tput", "numtrials", "3")]) assert ("user config -version 1.0.0\n" \ "tput config -numtrials 3") == \ replace_parameters("user config -version 1.0.0\n" \ "tput config -numtrials 5", [("tput", "numtrials", "3")]) assert ("user config -version 1.0.1\n" \ "tput config -numtrials 3") == \ replace_parameters("user config -version 1.0.0\n" \ "tput config -numtrials 5", [("user", "version", "1.0.1"), \ ("tput", "numtrials", "3")]) assert ("tput config -numtrials 3") == \ replace_parameters("tput config -numtrials 5", [("tput", "numtrials", 3)]) assert [16.0, 32.0] == map(normalize_fps, [1024 * 1024, 2 * 1024 * 1024], [2, 2]) assert "red" == get_color(3, [(2, "red"), (1, "orange")], "white") assert "orange" == get_color(2, [(2, "red"), (1, "orange")], "white") assert "orange" == get_color(1.5, [(2, "red"), (1, "orange")], "white") assert "white" == get_color(0.5, [(2, "red"), (1, "orange")], "white") assert "map add 1 4 2 1 4 1" == get_map_add("1.4.1", "1.4.2", "up", "tput", 2) assert "map add 1 4 1 1 4 2" == get_map_add("1.4.1", "1.4.2", "down", "tput", 1) assert "map add 1 4 1 1 4 2\n" \ "map add 1 4 2 1 4 1" == get_map_add("1.4.1", "1.4.2", "bi", "tput", 1) assert "map add 1 4 1 1 4 2\n" \ "map add 1 4 2 1 4 1" == get_map_add("1.4.1", "1.4.2", "bi", "mesh", 1) assert "map add 1 4 1 1 4 2" == get_map_add("1.4.1", "1.4.2", "down", "mesh", 1) assert "map add 1 4 2 1 4 1" == get_map_add("1.4.1", "1.4.2", "up", "mesh", 1) assert "map add 1 4 2 1 4 1\n" \ "map add 1 4 3 1 4 1" == get_map_add("1.4.1", "1.4.2", "up", "mesh", 2) assert "map add 1 4 1 1 4 2\n" \ "map add 1 4 1 1 4 3" == get_map_add("1.4.1", "1.4.2", "down", "mesh", 2) assert "map add 1 4 1 1 4 2\n" \ "map add 1 4 2 1 4 1\n" \ "map add 1 4 1 1 4 3\n" \ "map add 1 4 3 1 4 1" == get_map_add("1.4.1", "1.4.2", "bi", "mesh", 2) assert "map add 1 4 1 1 4 2\n" \ "map add 1 4 2 1 4 3\n" \ "map add 1 4 3 1 4 1" == get_map_add("1.4.1", "1.4.2", "loop", "mesh", 2) assert "map add 1 4 1 1 4 3\n" \ "map add 1 4 3 1 4 1" == get_map_add("1.4.1", "1.4.3", "loop", "mesh", 1) assert "map add 1 4 1 1 4 3\n" \ "map add 1 4 3 1 4 4\n" \ "map add 1 4 4 1 4 1" == get_map_add("1.4.1", "1.4.3", "loop", "mesh", 2) assert (1, 2, 3) == parse_port("1.2.3") for port in ["aaa", "1.2", "1.2,3", "1.2.30", "1.2.0"]: test_failed = False try: parse_port(port) except: test_failed = True assert test_failed assert "map add 1 4 2 1 4 1" == get_map_add_aux(((1, 4, 2), (1, 4, 1))) (tmp_directory, file_out_name) = create_tcl_file("tput", 24, "up", "version_directory", [], "1.3.1", "1.4.1", 11) file_out_path = os.path.join(tmp_directory, file_out_name) assert "24_up.tcl" == file_out_name file_out = open(file_out_path) file_out_content = file_out.read() for string in ["set testConf(portname,1.3.1) Master", "set VlanID(1,4,1) 11", "set NumVlans(1,4,1) 24", "user config -version version_directory"]: assert string in file_out_content file_out.close() shutil.rmtree(tmp_directory) assert [("x = " + "foo", "white"), ("x > 10", "orange"), ("10 >= x > 5", "blue"), ("5 >= x", "red")] == \ get_table_color_legend("foo", [(10, "orange"), (5, "blue")], "red") assert [('mbpsRate', 90.0), ('framesizeList', '{ 68 }')] == get_parameters_aux(68, 100, "up") assert [('mbpsRate', 45.0), ('framesizeList', '{ 68 }')] == get_parameters_aux(68, 100, "bi") assert ("foo", ["bar"]) == update_headers(("foo", {"up":["bar"]}), "up") assert ("foo", ["bar"]) == update_headers(("foo", {"default":["bar"]}), "up")