# -*- coding: utf-8 -*- """Module allowing to command and control the plug SPC300""" import tarfile import tempfile import shutil import telnetlib import os import re import retrier import hpav_test import power_strip import list_utils import plug import ssh #Must contain only MSTAR plugs config = { "eth_netmask": "10.3.6.", "plc_netmask": "10.4.6.", "plugs": { #(Power strip index, # Power plug index, # Identifier (this parameter is optional and is needed only if there # are several plugs connected to the same power plug via another # power strip): #( # br0 address, # eth0 address (this parameter is needed only if the bridge is killed # on the plug - otherwise, it can be set to None), # plc0 address (this parameter is needed only if the bridge is killed # on the plug - otherwise, it can be set to None), # device model, as indicated in # cleopatre/buildroot/target/device/Spidcom, i.e in # ["scr300_eoc_master", "scr310", "scr310_eoc_master_8m", # "scr310_eoc_master", "scr310_eoc_slave_8m", "scr310_eoc_slave", # "spc300dini", "spc300dini_eoc_master", "spc300dini_eoc_slave", # "spk300", "spk300g", "spk300g_eoc_master", "spk300g_eoc_slave", # "spk310", "spk310_eoc_master", "spk310_eoc_slave", "spr300", # "spr310"...] #) (1, 1): ("147", "146", "148", "spr310"), (1, 2): ("139", "138", "140", "spr310"), (1, 3, "plug_1"): ("150", None, None, "spr310"), (1, 3, "plug_2"): ("153", None, None, "spr310"), (2, 1): ("136", "135", "137", "spr310"), (2, 2): ("145", "144", "146", "spr310") } } mac_addresses = {} #Pattern of the files containing the traces trace_pattern = "/usr/local/trace/trace_*.gz" def update_config(new_config): """Update the configuration""" global config config = new_config global mac_addresses mac_addresses = {} hpav_test.reset_interfaces() def get_internal_confs(interface_index = 0): result = [] for key in config["plugs"].keys(): host = (key, interface_index) config_string = r_execute(host, ["cat /etc/internal.conf"]) config_list = dictify(config_string) result.append((key, config_list)) return result def get_invalid_internal_confs(ref_internal_conf, internal_confs): return [ (key, internal_conf) for (key, internal_conf) in internal_confs \ if internal_conf != ref_internal_conf ] def set_internal_confs(internal_conf, interface_index = 0): path = "/usr/local/etc/internal.conf" command = "echo -e \"" + stringify(internal_conf) + "\" > " + path for key in config["plugs"].keys(): host = (key, interface_index) r_execute(host, [command]) def check_internal_confs(ref_internal_conf, first_restarted_plug, interface_index = 0): internal_confs = get_internal_confs(interface_index) invalid_internal_confs = get_invalid_internal_confs(ref_internal_conf, internal_confs) if invalid_internal_confs != []: print "On some plugs, /etc/internal.conf is different of the one " \ "set in the test configuration:" print invalid_internal_confs raw_input("""\n\ !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !! WARNING: /usr/local/etc/internalconf is going to !! be overwritten on all the plugs and all the !! plugs are going to be restarted !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!""") set_internal_confs(ref_internal_conf, interface_index) plug.restart_all(first_restarted_plug) def dictify(prop_string): result = {} for prop in prop_string.split("\n"): try: [key, value] = prop.split("=") result[key.strip()] = value.strip() except: pass return result def stringify(prop_dict): result = [ key.strip() + "=" + value.strip() for (key, value) in sorted(prop_dict.items()) ] return "\n".join(result) def get_mac_address(key): """Gives the MAC address corresponding to a power strip key""" global mac_addresses try: return mac_addresses[key] except: ifconfig = r_execute((key, 0), "ifconfig br0") mac_addresses[key] = extract_mac_address(ifconfig) return mac_addresses[key] def extract_mac_address(stdoutdata): """Extract the MAC address from a "ifconfig br0" response""" pattern = '\w\w:\w\w:\w\w:\w\w:\w\w:\w\w' return re.search(pattern, stdoutdata).group() def get_model(key): """Gives the model corresponding to a power strip key""" return config["plugs"][key][3] def plc_rom_is_present(key, interface_index = 0): """Checks if a plc.rom is present on the flash""" result = r_execute((key, interface_index), "ls /usr/local/lib/modules") return "plc.rom" in result def get_version(key, interface_index = 0): """Gives the version corresponding to a power strip key""" result = r_execute((key, interface_index), "cat /proc/net/plc/version") (driver, firmware) = extract_driver_and_firmware(result) result = r_execute((key, interface_index), "uname -a") image = extract_image(result) return (driver, firmware, image) def extract_driver_and_firmware(string): pattern = "PLC Driver: (.*)\nPLC Firmware: (.*)" (driver, firmware) = re.search(pattern, string).groups() return (driver.strip(" \r"), firmware.strip(" \r")) def extract_image(string): pattern = "Linux (.*) 2.6.25.10.*" return re.search(pattern, string).groups()[0] def r_ping(key_origin, key_destination): """Ping the plug identified by key_destination from the plug identified by key_origin""" origin_network = config["eth_netmask"] origin = origin_network + config["plugs"][key_origin][1] destination_network = config["plc_netmask"] destination = destination_network + config["plugs"][key_destination][2] command = ping_command(destination) response = r_execute(origin, command) return ping(response) def switch_on_and_kill_bridge(keys): """Switch on the plugs identified by keys. For each plug, the bridge is desactivated and a different IP address is assigned to eth0 and to plc0. The plugs are switched on one by one in order to avoid the Ethernet loops.""" if type(keys) != list: keys = [keys] if keys == []: return # This would be inconsistent with the idea of switching the plugs one by # one assert plug.duplicated_power_plugs_keys_are_absent(keys) print "Starting the plug(s)", keys,"... {" # We switch off all the plugs in order to be sure to start # from a clean situation for key in keys: plug.switch_off(key) # We start the plugs one by one for key in keys: power_plug_key = plug.get_power_plug_key(key) power_strip.switch("on", power_plug_key) assert slow_ping(key, 0)[0] == "pong" (address_0, address_1, address_2, _) = config["plugs"][key] address_0 = config["eth_netmask"] + address_0 address_1 = config["eth_netmask"] + address_1 address_2 = config["plc_netmask"] + address_2 print "Activate the new address of eth0 of the plug" , key r_execute(address_0, "ifconfig eth0 " + address_1) print "Deactivate the address of br0 of the plug" , key r_execute(address_1, "ifdown br0") print "Activate the new address of plc0 of the plug" , key r_execute(address_1, "ifconfig plc0 " + address_2) print "Set the new route for eth0 on the plug", key r_execute(address_1, "route add -net " + config["eth_netmask"] + "0 netmask 255.255.255.0 dev eth0") print "Set the new route for plc0 on the plug", key r_execute(address_1, "route add -net " + config["plc_netmask"] + "0 netmask 255.255.255.0 dev plc0") assert l_ping(key, 0) == "pang" assert l_ping(key, 1) == "pong" assert l_ping(key, 2) == "pang" print "OK: the plug", key, "has been successfully started" print "}" def switch_on(keys): """Switch on the plugs identified by keys""" if type(keys) != list: keys = [keys] if keys == []: return print "Starting the plug(s)", keys,"... {" # We do not switch off the plugs before switching them on, because # there may be several plugs connected to the same power plug via # a secondary power strip power_plugs_keys = plug.get_power_plugs_keys(keys) for power_plug_key in power_plugs_keys: power_strip.switch("on", power_plug_key) for key in keys: assert "pong" == slow_ping(key, 0)[0] print "OK: the plug", key, "has been successfully started" print "}" def change_nmk(key, new_nmk): """Change the NMK of the plug identified by key""" print "Changing the NMK on the plug", key, "...{" network = config["eth_netmask"] host = network + config["plugs"][key][1] command = "spidapp config -npw " + new_nmk response = r_execute(host, command) assert "NMK: updated" in response, response print"OK: the NMK has been successfully changed on the plug", key print "}" def role(key): """Return the role of the plug identified by key in its AVLN: UCCO, USTA, CCO or STA""" network = config["eth_netmask"] command = "cat /etc/hpav.info" host = network + config["plugs"][key][1] hpav_info = r_execute(host, command) return get_role(hpav_info) def get_role(hpav_info): """Parse the content of the file /etc/hpav_info""" if "CCO = main" in hpav_info: cco = "main" elif "CCO = station" in hpav_info: cco = "station" else: cco = ("unknown_cco", hpav_info) if "STATUS = associated" in hpav_info: status = "associated" elif "STATUS = authenticated" in hpav_info: status = "authenticated" elif "STATUS = unassociated" in hpav_info: status = "unassociated" else: status = ("unknown_status", hpav_info) dictionnary = {("main", "authenticated"): "CCO", ("main", "unassociated"): "UCCO", ("station", "associated"): "STA", ("station", "unassociated"): "USTA", ("station", "authenticated"): "STA"} return dictionnary[(cco, status)] def r_execute(host, commands): """Telnet the host and execute a list of commands on it. The host can be an IP address or a couple (key, interface_index) Since the telnet sometimes fails, we retry 5 times.""" return retrier.retry(r_execute_aux, [host, commands], 5) def r_execute_aux(host, commands): try: (key, interface_index) = host host = get_destination(key, interface_index) except: pass if type(commands) != list: commands = [commands] telnet = telnetlib.Telnet(host) telnet.read_until("login: ") telnet.write("root\n") for command in commands: telnet.write(command + "\n") telnet.write("exit\n") result = telnet.read_all() return result def trace_is_present(key, interface_index = 0): result = r_execute((key, interface_index), "ls " + trace_pattern) return "No such file or directory" not in result def clean_trace(key, interface_index = 0): (trace_directory, _) = os.path.split(trace_pattern) r_execute((key, interface_index), "rm " + trace_directory + "/*") def get_trace_file_path(directory, zip_file_name, suffix): assert re.search("^trace_\d+\.gz$", zip_file_name) is not None trace_file_name = zip_file_name.replace("trace", "trace_" + suffix) return os.path.join(directory, trace_file_name) def random_string(): return os.path.basename(tempfile.NamedTemporaryFile().name) def dump_trace(key, dest_directory, ftp_server, interface_index = 0): """Retrieve the traces present on the plug and dump them in the directory""" # No need to dump anything if there are no traces on the plug if not trace_is_present(key, interface_index): return # Create a temporary remote directory ftp_trace_directory = random_string() remote_trace_directory = os.path.join(ftp_server["root"], ftp_trace_directory) ssh.call(ftp_server["user"], ftp_server["non_plc_ip_address"], "mkdir " + remote_trace_directory) # Retrieve the tar file of traces in the remote temporary directory tar_file_name = random_string() (trace_directory_on_plug, file_pattern) = os.path.split(trace_pattern) commands = \ ["cd " + trace_directory_on_plug, "tar -cf " + tar_file_name + " " + file_pattern, "ftpput -u spidcom -p spidcom " + \ ftp_server["plc_ip_address"] + " " + \ os.path.join(ftp_trace_directory, tar_file_name) + " " + \ tar_file_name, "rm " + tar_file_name] r_execute((key, interface_index), commands) # Create a local temporary directory local_trace_directory = os.path.join("/tmp", ftp_trace_directory) os.mkdir(local_trace_directory) # Copy the remote file locally remote_tar_file_path = os.path.join(remote_trace_directory, tar_file_name) ssh.cp(ftp_server["user"], ftp_server["non_plc_ip_address"], remote_tar_file_path, local_trace_directory) # Remove the remote temporary directory ssh.call(ftp_server["user"], ftp_server["non_plc_ip_address"], "rm -rf " + remote_trace_directory) # Untar the traces locally local_tar_file_path = os.path.join(local_trace_directory, tar_file_name) untar(local_tar_file_path, local_trace_directory) # Delete the local tar file so that only the traces remain os.remove(local_tar_file_path) # Check that there are indeed traces files_names = os.listdir(local_trace_directory) assert files_names != [] # Build the suffix try: (_, _, suffix) = key except: suffix = config["plugs"][key][0] # Create the destination directory if it does not exist if os.path.exists(dest_directory) == False: os.mkdir(dest_directory) # Remove the old traces files, if any for file_name in os.listdir(dest_directory): if os.path.splitext(file_name) == os.path.splitext(trace_pattern): os.remove(os.path.join(dest_directory, file_name)) # Copy the new traces files for file_name in files_names: src_file_path = os.path.join(local_trace_directory, file_name) dest_file_path = get_trace_file_path(dest_directory, file_name, suffix) shutil.copy(src_file_path, dest_file_path) # Delete the local temporary directory shutil.rmtree(local_trace_directory) ################################################################################ # Internal functions ################################################################################ def get_destination(key, interface_index): if interface_index == 2: mask = "plc_netmask" else: mask = "eth_netmask" return config[mask] + config["plugs"][key][interface_index] def l_ping(key, interface_index): """Ping the plug identified by key, using the address specified by the interface_index (0 means br0, 1 means eth0 and 2 means plc0)""" destination = get_destination(key, interface_index) command = ping_command(destination) response = os.popen(command).read() return ping(response) def ping(response): """Return pong if the ping succeeded and pand otherwise""" if "ttl" in response: result = "pong" else: result = "pang" return result def ping_command(destination): """Ping the IP address destination""" return "ping -c1 -W1 " + destination def slow_ping(key, interface_index): """Ping the plug identified by key using the address specified by the interface_index (0 means br0, 1 means eth0 and 2 means plc0). One tries 60 times to let the time to the plug to start up.""" n = 0 result = "pang" while (n < 60 and result == "pang"): print "Attempt #", n, "to ping the address", (key, interface_index) result = l_ping(key, interface_index) n = n + 1 return (result, n) def untar(tar_file_path, destination_directory): assert tarfile.is_tarfile(tar_file_path), tar_file_path tar_file_content = tarfile.open(tar_file_path) tar_file_content.extractall(destination_directory) if __name__ == "__main__": assert get_role("CCO = main\nSTATUS = authenticated") == "CCO" try: get_role("CCO = main\nSTATUS = associated") result = True except KeyError: result = False assert result == False assert "00:13:D7:01:00:30" == extract_mac_address( "root\n" "ifconfig br0\n" "\n" "exit\n" " __ __\n" "| \/ |____ _\n" "| __ / ___|| |_ __ _ _ __\n" "| | | \___ \| __/ _` | '__|\n" "| | | |___) | || (_| | |\n" "|_| |_|____/ \__\__,_|_|\n" "# ifconfig br0\n" "br0 Link encap:Ethernet HWaddr 00:13:D7:01:00:30 \n" " inet addr:192.168.5.101 Bcast:192.168.5.255 Mask:255.255.255.0\n" " UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1\n" " RX packets:4354 errors:0 dropped:0 overruns:0 frame:0\n" " TX packets:4137 errors:0 dropped:0 overruns:0 carrier:0\n" " collisions:0 txqueuelen:0 \n" " RX bytes:285324 (278.6 KiB) TX bytes:632276 (617.4 KiB)\n" "\n" "# \n" "# exit\n") assert ("eoc-0.7.10", "eoc-0.7.11") == \ extract_driver_and_firmware("foo\n" "PLC Driver: eoc-0.7.10 \r\n" "PLC Firmware: eoc-0.7.11 \r\n" "bar") assert ("eoc-0.7.10", "eoc-0.7.11") == \ extract_driver_and_firmware("foo\n" "PLC Driver: eoc-0.7.10 \n" "PLC Firmware: eoc-0.7.11 \n" "bar") assert "spk300g_eoc_slave" == \ extract_image("Linux spk300g_eoc_slave 2.6.25.10 " \ "#1(eoc-drv-2.0.13 based on eoc-drv-2.0.13) " \ "Fri Feb 8 11:37:07 CE armv5tejl unknown") assert "/tmp/trace_slave_13_1.gz" == get_trace_file_path("/tmp", "trace_1.gz", "slave_13") assert {} == dictify("") assert {"a":"b"} == dictify("a=b") assert {"a":"b"} == dictify(" a = b ") assert {"a":"b"} == dictify(" a = b \n") assert {"a":"b"} == dictify(" a = b \n c") assert {"a":"b", "c":"d"} == dictify(" a = b \n c=d") assert {"a":"b", "c":"d"} == dictify(" c=d \n a = b ") assert "" == stringify({}) assert "a=b" == stringify({"a":"b"}) assert "a=b" == stringify({" a ":" b "}) assert "a=b\nc=d" == stringify({" a ":" b ", "c":"d"}) assert "a=b\nc=d" == stringify({"c":"d", " a ":" b "}) assert [] == get_invalid_internal_confs({}, []) assert [] == get_invalid_internal_confs({}, [("key1", {})]) assert [] == get_invalid_internal_confs({"1":"2"}, [("key1", {"1":"2"})]) assert [] == get_invalid_internal_confs({"1":"2", "3":"4"}, [("key1", {"1":"2", "3":"4"})]) assert [] == get_invalid_internal_confs({"3":"4", "1":"2"}, [("key1", {"1":"2", "3":"4"})]) assert [("key1", {})] == get_invalid_internal_confs({"1":"2"}, [("key1", {})]) assert [("key1", {})] == get_invalid_internal_confs({"1":"2"}, [("key1", {}), ("key2", {"1":"2"})]) assert [("key1", {}), ("key2", {"1":"3"})] == get_invalid_internal_confs({"1":"2"}, [("key1", {}), ("key2", {"1":"3"})])