# -*- coding: utf-8 -*- """Module allowing to execute performance tests (attenuation, SNR, SJR, frame size and dynamic_jammer) with Iperf between 2 plugs and to generate a PDF report with the results""" import time import subprocess import os import numpy import math import sys import shutil import validlib.attenuator as attenuator import validlib.waveform_generator as waveform_generator import validlib.power_strip as power_strip import validlib.spc300 as spc300 import validlib.non_spc300 as non_spc300 import validlib.iperf as iperf import validlib.list_utils as list_utils import validlib.rst_utils as rst_utils import validlib.config_utils as config_utils import validlib.dia_utils as dia_utils config = None #Peak to Average Ratio of the power, in OFDM par_db = 10 #Units of the "values" for each type of test units = {"attenuation":"dB", "SNR":"dB", "SJR":"dB", "frame_size":"bytes", "dynamic_jammer":"seconds" } def update_config(new_config): """Update the configuration""" global config config = new_config # Configure the 'attenuator' module attenuator.update_config(config["attenuator"]) # Configure the 'waveform_generator' module waveform_generator.update_config(config["waveform_generator"]) # Configure the 'power_strip' module power_strip.update_config(config["power_strip"]) # Configure the 'spc300' module spc300.update_config(config["spc300"]) # Configure the 'non_spc300' module non_spc300.update_config(config["non_spc300"]) # Configure the 'iperf' module iperf.update_config(config["iperf"]) check_config() def set_tokens(new_config): update_config(new_config) iperf.set_tokens() def clear_tokens(new_config): update_config(new_config) iperf.clear_tokens() def get_bench_data_file_path(): reports_directory = get_reports_directory() return os.path.join(reports_directory, "bench_data.py") def write_bench_data(): bench_data = iperf.bench_data() bench_data_file = open(get_bench_data_file_path(), "w") bench_data_file.write(str(bench_data)) bench_data_file.close() def read_bench_data(): bench_data_file_path = get_bench_data_file_path() return config_utils.evaluate(bench_data_file_path) def report(config, config_file): """Generates the PDF report as specified in the configuration. It requires to set up first the test bench as detailed in the test bench description.""" update_config(config) reports_directory = get_reports_directory() # Build the picture of the test bench bench_fig_name = dia_utils.dia2eps(__file__, reports_directory) # Make all the measurements if config["make_measurements"]: iperf.check_tokens() write_bench_data() #We dump the file and not the config, since it may be more #readable and may contain some useful comments shutil.copy(config_file, reports_directory) t0 = time.time() estimated_min = measurements_estimated_time_min() print "\nEstimated measurements time in minutes:", estimated_min, "\n" iperf_plugs_checked = False for (test_protocol, test_type, test_params) in config["fixtures"]: make_measurements(test_protocol, test_type, test_params, iperf_plugs_checked) iperf_plugs_checked = True actual_min = round((time.time() - t0) / 60.0, 2) print "Actual/estimated measurements times in minutes:", \ actual_min, "/", estimated_min # Extract the datarates from the measurements files, generate the # images and retrieve the traces traces_files = [] for (test_protocol, test_type, test_params) in config["fixtures"]: traces_files_aux = get_traces_files(test_protocol, test_type, test_params) traces_files.append(traces_files_aux) datarates = extract_datarates(test_protocol, test_type, test_params) dump_and_draw(test_protocol, test_type, test_params, datarates, str_max_throughput()) if test_type != "dynamic_jammer": for ce_stat_name in config["displayed_CE_stats"]: ce_stats = extract_ce_stats(test_protocol, test_type, test_params, ce_stat_name) dump_and_draw(test_protocol, test_type, test_params, ce_stats, ce_stat_name) # Copy the power measurement figure power_fig_name = "power_measurement.png" power_fig_path = os.path.join(os.path.dirname(__file__), power_fig_name) shutil.copy(power_fig_path, reports_directory) # Build the rst file build_rst_file(bench_fig_name, power_fig_name, traces_files) # Build the pdf file rst_utils.rst2pdf(file_prefix()) def check_config(): waveform_data = get_waveform_data() snr_offset_db = waveform_data["snr_offset_db"] sjr_offset_db = waveform_data["sjr_offset_db"] sjr_attenuator_attenuation_db = \ waveform_data["sjr_attenuator_attenuation_db"] fixtures = config["fixtures"] attenuation_offset = get_attenuation_offset() attenuation_range_db = attenuator.attenuation_range_db() power_range_dbm = waveform_generator.power_range_dbm() ce_stats = config["displayed_CE_stats"] [ iperf.unit(ce_stat) for ce_stat in ce_stats] for fixture in fixtures: (test_protocol, test_type, test_params) = fixture curves = config[test_type]["curves"] test_values = config[test_type]["values"] check_config_aux(attenuation_offset, attenuation_range_db, snr_offset_db, sjr_offset_db, power_range_dbm, sjr_attenuator_attenuation_db, curves, test_values, test_protocol, test_type, test_params) def check_config_aux(attenuation_offset, attenuation_range_db, snr_offset_db, sjr_offset_db, power_range_dbm, sjr_attenuator_attenuation_db, curves, test_values, test_protocol, test_type, test_params): assert test_protocol in ["UDP", "TCP"] assert test_type in ["attenuation", "SNR", "frame_size", "SJR", "dynamic_jammer"] for curve in curves: assert curve in iperf.measurements_specs().keys() if test_type == "attenuation": (min_attenuation_db, max_attenuation_db) = attenuation_range_db for test_value in test_values: attenuation_db = test_value - attenuation_offset #We check that the attenuation is in the attenuation range #of the attenuator assert attenuation_db <= max_attenuation_db, \ ("Attenuation too high", test_value, attenuation_offset, \ max_attenuation_db) assert attenuation_db >= min_attenuation_db, \ ("Attenuation too low", test_value, attenuation_offset, \ min_attenuation_db) if test_type == "SNR": #We check here that the values of SNR are consistent with the #output power range of the waveform generator (min_power_dbm, max_power_dbm) = power_range_dbm for test_value in test_values: power_dbm = snr_offset_db - test_value assert power_dbm <= max_power_dbm, \ ("SNR too low", snr_offset_db, test_value, max_power_dbm) assert power_dbm >= min_power_dbm, \ ("SNR too high", snr_offset_db, test_value, min_power_dbm) assert test_params == None elif test_type == "SJR": #Because of sjr_attenuator_attenuation_db, the bench is not #symetrical on this test assert curve == "a_b_uni", curve (min_attenuation_db, max_attenuation_db) = attenuation_range_db #We check that the attenuation is in the attenuation range #of the attenuator assert sjr_attenuator_attenuation_db <= max_attenuation_db, \ ("Attenuation for SJR too high", sjr_attenuator_attenuation_db) assert sjr_attenuator_attenuation_db >= min_attenuation_db, \ ("Attenuation for SJR too low", sjr_attenuator_attenuation_db) #We check here that the values of SJR are consistent with the #output power range of the waveform generator for test_value in test_values: power_dbm = sjr_offset_db - test_value (min_power_dbm, max_power_dbm) = power_range_dbm assert power_dbm <= max_power_dbm, \ ("SJR too low", sjr_offset_db, test_value, max_power_dbm) assert power_dbm >= min_power_dbm, \ ("SJR too high", sjr_offset_db, test_value, min_power_dbm) elif test_type == "dynamic_jammer": assert test_values == ["time"] (min_attenuation_db, max_attenuation_db) = attenuation_range_db #We check that the attenuation is in the attenuation range #of the attenuator assert sjr_attenuator_attenuation_db <= max_attenuation_db, \ ("Attenuation for SJR too high", sjr_attenuator_attenuation_db) assert sjr_attenuator_attenuation_db >= min_attenuation_db, \ ("Attenuation for SJR too low", sjr_attenuator_attenuation_db) power_dbm = sjr_offset_db - test_params[0] #We check here that the values of SJR are consistent with the #output power range of the waveform generator (min_power_dbm, max_power_dbm) = power_range_dbm assert power_dbm <= max_power_dbm, \ ("SJR too low", sjr_offset_db, test_params[0], max_power_dbm) assert power_dbm >= min_power_dbm, \ ("SJR too high", sjr_offset_db, test_params[0], min_power_dbm) else: assert test_params == None, test_params def build_rst_file(bench_fig_name, power_fig_name, traces_files): bench_data = read_bench_data() waveform_data = get_waveform_data() rst_template_dict = {"bench_fig_name":bench_fig_name, "power_fig_name":power_fig_name} rst_template_dict.update(waveform_data) rst_template_dict.update(bench_data) rst_template_dict.update(config) rst_file = open(os.path.splitext(__file__)[0] + ".rst", "r") rst_template = rst_file.read() rst_file.close() rst_filled_template = rst_template.format (**rst_template_dict) rst_file = rst_filled_template + rst_results() + \ rst_utils.traces(traces_files, get_reports_directory()) report = open(file_prefix() + ".rst", 'w') report.write(rst_file) report.close() def rst_results(): title = "" \ "Results\n" \ "=======\n" \ "\n" results = [] for fixture in config["fixtures"] : results = results + [rst_result(fixture, str_max_throughput())] + \ [rst_result(fixture, ce_stat) for ce_stat in config["displayed_CE_stats"]] return rst_utils.page_break().join(results) def rst_picture_scale(): return ".png\n" + \ " :scale: 65 %\n" + \ "\n" def rst_figure(test_protocol, test_type, test_params, figure_name): return ".. figure:: " + file_prefix(test_protocol, test_type, test_params, None, None, False) + "_" + \ figure_name.replace(' ','_') def rst_params(properties): acc = "" for (key, value) in properties: acc = acc + " * " + key + ": " + str(value) + "\n\n" return acc def rst_result((test_protocol, test_type, test_params), figure_name): if test_type == "SJR": (freq_mhz, am) = test_params if am is None: am_part = "None" else: am_part = "(%s - %s kHz - %s %%)" % am test_params_title = " - (%s MHz - AM = %s)" % (freq_mhz, am_part) elif test_type == "dynamic_jammer": test_params_title = " - (%s dB - %s MHz - %s MHz - %s s)" \ % test_params else: test_params_title = "" if figure_name == str_max_throughput(): return test_protocol + " - " + test_type + test_params_title + \ rst_utils.title_break("-") + \ rst_params([("Duration of each measurement (seconds)", config[test_type]["duration_s"])]) + \ rst_comment(test_type, test_params) + "\n" + \ rst_figure(test_protocol, test_type, test_params, figure_name) + \ rst_picture_scale() else: if test_type == "dynamic_jammer": return "" return figure_name + " - " + test_protocol + " - " + test_type + \ test_params_title + rst_utils.title_break("~") + \ rst_params([("Y-axis value", iperf.ce_stat_description(figure_name))]) + \ rst_figure(test_protocol, test_type, test_params, figure_name) + \ rst_picture_scale() def rst_comment(test_type, test_params): if test_type in ["attenuation", "SNR"]: params = [("Duration of the `Channel Estimation (CE)`_ (seconds)", config["ce_sync_time_s"])] elif test_type == "frame_size": params = [("Input parameter", "(frame size (bytes), max throughput (Mbits/sec))")] elif test_type == "SJR": (freq_mhz, am) = test_params if am is None: am_params = [("AM", None)] else: am_params = [("AM form", am[0]), ("AM frequency (kHz)", am[1]), ("AM depth (%)", am[2])] params = [("Duration of the CE synchronisation before each " \ "measurement (seconds)", config["ce_sync_time_s"]), ("Jammer frequency (MHz)", freq_mhz)] + \ am_params elif test_type == "dynamic_jammer": params = [("SJR (dB)", test_params[0]), ("Sweep start frequency (MHz)", test_params[1]), ("Sweep stop frequency (MHz)", test_params[2]), ("Sweep time (seconds)", test_params[3])] else: assert False, test_type return rst_params(params) def measurements_estimated_time_min(): """Estimation of the time required to complete all measurements""" time_s = 0 for (_, test_type, _) in config["fixtures"] : if config["restart_plugs"]: start_time_s = 15 time_s = time_s + 2 * start_time_s duration_s = config[test_type]["duration_s"] if test_type in ["attenuation", "SNR", "SJR"]: duration_s = duration_s + config["ce_sync_time_s"] elif test_type in ["frame_size", "dynamic_jammer"]: pass else: assert test_type curves = config[test_type]["curves"] test_values = get_test_values(test_type) time_s = time_s + len(curves) * len(test_values) * duration_s # 1.5 is an average of the ratio between actual and estimated times return 1.5 * time_s / 60 def make_measurements(test_protocol, test_type, test_params, iperf_plugs_checked): """Remove the previous measurements, modify the channel, restart the plugs, execute the new measurements and dump the results in files""" print "Removing the previous files of results...\n" files_pattern = file_prefix(test_protocol, test_type, test_params) + "*" subprocess.call("rm " + files_pattern, stderr = subprocess.PIPE, shell = True) time_s = config["ce_sync_time_s"] print "Starting the measurements of the maximum", test_protocol, \ "throughput vs the", test_type, "..." initialize_testing_devices() restart_plugs(iperf_plugs_checked) test_values = get_test_values(test_type) if test_values == ["time"]: iperf.clean_traces() change_channel(test_type, test_params, None) make_measurement(test_protocol, test_type, test_params, None) else: for test_value in test_values: iperf.clean_traces() if change_channel(test_type, test_params, test_value): iperf.update_tone_maps(time_s) make_measurement(test_protocol, test_type, test_params, test_value) iperf.kill_iperfs() initialize_testing_devices() print "End of the measurements of the maximum", test_protocol, \ "throughput vs the", test_type def initialize_testing_devices(): attenuator.set_attenuation(0) waveform_generator.switch("off") def restart_plugs(iperf_plugs_checked): if config["restart_plugs"]: iperf.switch_on_plugs() if iperf_plugs_checked == False: iperf.check_plugs(get_reports_directory()) def get_attenuation_offset(): """Offset allowing to convert the overall attenuation in the attenuation at the attenuator level (attenuation = overall attenuation - offset)""" signal_attenuation_db = config["signal_attenuation_db"] amn_attenuation_db = config["amn_attenuation_db"] t_attenuation_db = config["t_attenuation_db"] return (2 * (signal_attenuation_db + amn_attenuation_db) + \ t_attenuation_db) def change_channel(test_type, test_params, test_value): print "\nSetting", test_value, units[test_type], "of", test_type waveform_data = get_waveform_data() if test_type == "attenuation": attenuation_offset = get_attenuation_offset() attenuator.set_attenuation(test_value - attenuation_offset) return True elif test_type == "SNR": waveform_generator.switch("on") power_dbm = waveform_data["snr_offset_db"] - test_value waveform_generator.configure_white_noise(power_dbm) return True elif test_type == "frame_size": return False elif test_type == "SJR": attenuator.set_attenuation( \ waveform_data["sjr_attenuator_attenuation_db"]) waveform_generator.switch("on") power_dbm = waveform_data["sjr_offset_db"] - test_value (freq_mhz, am) = test_params waveform_generator.configure_sinus( freq_mhz * 1000 * 1000, power_dbm, 0, am, None) return True elif test_type == "dynamic_jammer": attenuator.set_attenuation( \ waveform_data["sjr_attenuator_attenuation_db"]) waveform_generator.switch("on") power_dbm = waveform_data["sjr_offset_db"] - test_params[0] waveform_generator.configure_sinus( 1, power_dbm, 0, None, (test_params[1], test_params[2], test_params[3])) return True assert False, test_type def make_measurement(test_protocol, test_type, test_params, test_value): """Execute the new measurement and dump the results in a file""" duration_s = config[test_type]["duration_s"] curves = config[test_type]["curves"] prefix = file_prefix(test_protocol, test_type, test_params, test_value) if test_type in ["attenuation", "SNR", "SJR", "dynamic_jammer"]: udp_throughputs = config["udp_throughputs_mbits_per_sec"] (unidir_udp_throughput, bidir_udp_throughput) = udp_throughputs udp_parameters = ((iperf.default_frame_size, unidir_udp_throughput), (iperf.default_frame_size, bidir_udp_throughput)) elif test_type == "frame_size": udp_parameters = (test_value, test_value) else: assert False, test_type iperf.make_measurement(test_protocol, udp_parameters, duration_s, curves, prefix, config["get_CE_stats"]) def extract_datarates(test_protocol, test_type, test_params): """Extract the datarates from the measurement files""" datarates = {} for direction in get_all_directions(test_type): test_values = get_test_values(test_type) if test_values == ["time"]: datarates[direction] = \ extract_values_from_report(test_protocol, test_type, test_params, None, direction, str_max_throughput())[0] else: datarates[direction] = \ [ extract_values_from_report(test_protocol, test_type, test_params, test_value, direction, str_max_throughput())[1] for test_value in test_values ] return datarates def extract_ce_stats(test_protocol, test_type, test_params, ce_stat_name): """Extract the CE statistics from the measurement files""" ce_stats = {} for direction in get_all_directions(test_type): test_values = get_test_values(test_type) ce_stats[direction] = \ [ extract_values_from_report(test_protocol, test_type, test_params, test_value, direction, ce_stat_name) for test_value in test_values ] return ce_stats def extract_values_from_report(test_protocol, test_type, test_params, test_value, direction, name): """Extract the data from a measurement file""" prefix = file_prefix(test_protocol, test_type, test_params, test_value) if test_type in ["attenuation", "SNR", "SJR", "dynamic_jammer"]: frame_size = iperf.default_frame_size elif test_type == "frame_size": frame_size = test_value[0] else: assert False, test_type duration_s = config[test_type]["duration_s"] if name == str_max_throughput(): fun = iperf.extract_datarates_from_report else: fun = iperf.extract_ce_stat_from_report return fun(test_protocol, frame_size, prefix, direction, duration_s, name) def dump_and_draw(test_protocol, test_type, test_params, values, yname): """Dump the data and draw the curves""" directions = get_all_directions(test_type) test_values = get_test_values(test_type) if test_values == ["time"]: test_values = range(1, config[test_type]["duration_s"] + 1) file_name_base = file_prefix(test_protocol, test_type, test_params, None, yname, True) title = yname + " vs " + test_type + " in " + test_protocol iperf.dump_measurement(directions, test_type, test_values, file_name_base + ".txt", values) xlabel = test_type + " (" + units[test_type] + ")" ylabel = yname + " (" + iperf.unit(yname) + ")" iperf.draw(title, xlabel, ylabel, directions, test_type, test_values, file_name_base + ".png", values) def get_traces_files(test_protocol, test_type, test_params): test_values = get_test_values(test_type) if test_values == ["time"]: traces_directories = [file_prefix(test_protocol, test_type, test_params, None)] else: traces_directories = [ file_prefix(test_protocol, test_type, test_params, test_value) for test_value in test_values ] traces_files = [] for trace_directory in traces_directories: if os.path.exists(trace_directory): traces_files_paths = \ [ os.path.join(trace_directory, trace_file_name) for trace_file_name in os.listdir(trace_directory) ] traces_files.append(traces_files_paths) return traces_files def get_all_directions(test_type): curves = config[test_type]["curves"] return iperf.get_all_directions(curves) def get_reports_directory(): directory = config["reports_directory"] if os.path.exists(directory) == False: os.makedirs(directory) return directory def file_prefix(test_protocol = None, test_type = None, test_params = None, test_value = None, measure_name = None, directory = True): if directory == False: reports_directory = "" else: reports_directory = get_reports_directory() if test_type is None: test_unit = None else: test_unit = units[test_type] return get_file_prefix(test_protocol, test_type, test_params, test_value, test_unit, measure_name, reports_directory) def get_file_prefix(test_protocol, test_type, test_params, test_value, test_unit, measure_name, reports_directory): base_name = os.path.splitext(os.path.basename(__file__))[0] if test_protocol is not None: base_name += "_" + test_protocol if test_type is not None: base_name += "_" + test_type if test_params is not None: test_params_strings = \ [ str(elem) for elem in list_utils.flatten(test_params) ] base_name += "_%s" % "_".join(test_params_strings) if test_value is not None: base_name += "_%s%s" % (str(test_value), test_unit) if measure_name is not None: base_name += "_" + measure_name.replace(' ','_') return os.path.join(reports_directory, base_name) def get_waveform_data(): ppsd_dbm_per_hz = config["ppsd_dbm_per_hz"] signal_band_mhz = config["signal_band_mhz"] sjr_attenuator_attenuation_db = config["sjr_attenuator_attenuation_db"] signal_attenuation_db = config["signal_attenuation_db"] amn_attenuation_db = config["amn_attenuation_db"] waveform_attenuation_db = config["waveform_attenuation_db"] noise_band_mhz = waveform_generator.noise_band_mhz() return get_waveform_data_aux(ppsd_dbm_per_hz, signal_band_mhz, noise_band_mhz, sjr_attenuator_attenuation_db, amn_attenuation_db, signal_attenuation_db, waveform_attenuation_db) def get_waveform_data_aux(ppsd_dbm_per_hz, signal_band_mhz, noise_band_mhz, sjr_attenuator_attenuation_db, amn_attenuation_db, signal_attenuation_db, waveform_attenuation_db): (signal_freq_min_mhz, signal_freq_max_mhz) = signal_band_mhz signal_bandwith_mhz = signal_freq_max_mhz - signal_freq_min_mhz #the term 60 corresponds to the conversion from MHz to Hz tx_signal_power_dbm = int (ppsd_dbm_per_hz - \ par_db + \ 60 + \ round(10 * math.log10(float(signal_bandwith_mhz)), 0)) (noise_freq_min_mhz, noise_freq_max_mhz) = noise_band_mhz noise_bandwith_mhz = noise_freq_max_mhz - noise_freq_min_mhz bandwiths_ratio_db = int(round(10 * math.log10( \ float(signal_bandwith_mhz) / float(noise_bandwith_mhz)), 0)) snr_offset_db = int(tx_signal_power_dbm - \ amn_attenuation_db - \ signal_attenuation_db + \ waveform_attenuation_db - \ bandwiths_ratio_db) sjr_offset_db = int(tx_signal_power_dbm - \ amn_attenuation_db - \ sjr_attenuator_attenuation_db - \ signal_attenuation_db + \ waveform_attenuation_db) return {"ppsd_dbm_per_hz":ppsd_dbm_per_hz, "par_db":par_db, "signal_freq_min_mhz":signal_freq_min_mhz, "signal_freq_max_mhz":signal_freq_max_mhz, "signal_bandwith_mhz":signal_bandwith_mhz, "noise_freq_min_mhz":noise_freq_min_mhz, "noise_freq_max_mhz":noise_freq_max_mhz, "noise_bandwith_mhz":noise_bandwith_mhz, "tx_signal_power_dbm":tx_signal_power_dbm, "bandwiths_ratio_db":bandwiths_ratio_db, "signal_attenuation_db":signal_attenuation_db, "sjr_attenuator_attenuation_db":sjr_attenuator_attenuation_db, "amn_attenuation_db":amn_attenuation_db, "waveform_attenuation_db":waveform_attenuation_db, "snr_offset_db":snr_offset_db, "sjr_offset_db":sjr_offset_db} def get_test_values(test_type): return config[test_type]["values"] def str_max_throughput(): return iperf.str_max_throughput if __name__ == "__main__": assert {"ppsd_dbm_per_hz":-50, "par_db":10, "signal_freq_min_mhz":2, "signal_freq_max_mhz":28, "signal_bandwith_mhz":26, "noise_freq_min_mhz":0, "noise_freq_max_mhz":50, "noise_bandwith_mhz":50, "tx_signal_power_dbm":14, "bandwiths_ratio_db":-3, "signal_attenuation_db":10, "sjr_attenuator_attenuation_db":17, "amn_attenuation_db":6, "waveform_attenuation_db":10, "snr_offset_db":11, "sjr_offset_db":-9} == \ get_waveform_data_aux(-50, (2, 28), (0, 50), 17, 6, 10, 10) assert {"ppsd_dbm_per_hz":-63, "par_db":10, "signal_freq_min_mhz":1, "signal_freq_max_mhz":38, "signal_bandwith_mhz":37, "noise_freq_min_mhz":0, "noise_freq_max_mhz":50, "noise_bandwith_mhz":50, "tx_signal_power_dbm":3, "bandwiths_ratio_db":-1, "signal_attenuation_db":10, "sjr_attenuator_attenuation_db":17, "amn_attenuation_db":0, "waveform_attenuation_db":10, "snr_offset_db":4, "sjr_offset_db":-14} == \ get_waveform_data_aux(-63, (1, 38), (0, 50), 17, 0, 10, 10) assert "./P2P_throughput_UDP_frame_size_(5, 5)bytes" == \ get_file_prefix("UDP", "frame_size", None, (5, 5), "bytes", None, ".") assert "./P2P_throughput_UDP_SNR_-5dB" == \ get_file_prefix("UDP", "SNR", None, -5, "dB", None, ".") assert "/reports/P2P_throughput_UDP_SNR_-5dB" == \ get_file_prefix("UDP", "SNR", None, -5, "dB", None, "/reports") assert "reports/P2P_throughput_UDP_SNR_-5dB" == \ get_file_prefix("UDP", "SNR", None, -5, "dB", None, "reports") assert "reports/P2P_throughput_UDP_SNR_-5dB" == \ get_file_prefix("UDP", "SNR", None, -5, "dB", None, "reports/") assert "reports/subreports/P2P_throughput_UDP_SNR_-5dB" == \ get_file_prefix("UDP", "SNR", None, -5, "dB", None, "reports/subreports") assert "reports/subreports/P2P_throughput_UDP_SJR_15_-5dB" == \ get_file_prefix("UDP", "SJR", (15,), -5, "dB", None, "reports/subreports") assert "reports/P2P_throughput_TCP_SNR" == \ get_file_prefix("TCP", "SNR", None, None, None, None, "reports") assert "reports/P2P_throughput_TCP" == \ get_file_prefix("TCP", None, None, None, None, None, "reports") assert "reports/P2P_throughput" == \ get_file_prefix(None, None, None, None, None, None, "reports") assert "P2P_throughput" == \ get_file_prefix(None, None, None, None, None, None, "") assert "reports/P2P_throughput_UDP_SNR_-10dB_Throughput" == \ get_file_prefix("UDP", "SNR", None, -10, "dB", "Throughput", "reports") def test_check_config_aux(attenuation_offset, attenuation_range_db, snr_offset_db, sjr_offset_db, power_range_dbm, sjr_attenuator_attenuation_db, curves, test_values, test_protocol, test_type, test_params): try: check_config_aux(attenuation_offset, attenuation_range_db, snr_offset_db, sjr_offset_db, power_range_dbm, sjr_attenuator_attenuation_db, curves, test_values, test_protocol, test_type, test_params) return True except Exception as exception: #To be uncommented when the tests fail #print "Exception:", exception return False assert False == test_check_config_aux(None, None, None, None, None, None, None, None, None, None, None) assert False == test_check_config_aux(None, None, None, None, None, None, None, None, "TCP", None, None) assert False == test_check_config_aux(None, None, None, None, None, None, None, None, "TCP", "attenuation", None) assert False == test_check_config_aux(3, (5, 8), None, None, None, None, ["a_b_uni"], [7], "TCP", "attenuation", None) assert True == test_check_config_aux(3, (5, 8), None, None, None, None, ["a_b_uni"], [8], "TCP", "attenuation", None) assert True == test_check_config_aux(3, (5, 8), None, None, None, None, ["a_b_uni"], [11], "TCP", "attenuation", None) assert False == test_check_config_aux(3, (5, 8), None, None, None, None, ["a_b_uni"], [12], "TCP", "attenuation", None) assert False == test_check_config_aux(None, None, None, None, None, None, ["a_b_uni"], [1], "TCP", "SNR", None) assert False == test_check_config_aux(None, None, 10, None, (15, 25), None, ["a_b_uni"], [-4], "TCP", "SNR", None) assert True == test_check_config_aux(None, None, 10, None, (15, 25), None, ["a_b_uni"], [-5], "TCP", "SNR", None) assert True == test_check_config_aux(None, None, 10, None, (15, 25), None, ["a_b_uni"], [-15], "TCP", "SNR", None) assert False == test_check_config_aux(None, None, 10, None, (15, 25), None, ["a_b_uni"], [-16], "TCP", "SNR", None) assert False == test_check_config_aux(None, (5, 8), None, None, None, 7, ["a_b_uni"], [1], "TCP", "SJR", None) assert False == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["a_b_uni"], [-4], "TCP", "SJR", None) assert True == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["a_b_uni"], [-5], "TCP", "SJR", None) assert False == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 3, ["a_b_uni"], [-5], "TCP", "SJR", None) assert True == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["a_b_uni"], [-15], "TCP", "SJR", None) assert False == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["a_b_uni"], [-16], "TCP", "SJR", None) assert False == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["b_a_uni"], [-15], "TCP", "SJR", None) assert False == test_check_config_aux(None, (5, 8), None, None, None, 7, ["a_b_uni"], [1], "TCP", "dynamic_jammer", None) assert True == test_check_config_aux(None, (5, 8), None, 10, (15, 25), 7, ["a_b_uni"], ["time"], "TCP", "dynamic_jammer", (-10,)) assert " * label1: text1\n\n * label2: text2\n\n" == rst_params([("label1", "text1"), ("label2", "text2")])