# -*- coding: utf-8 -*- """Module allowing to command and control the plug SPC300""" import gzip import tarfile import tempfile import shutil import telnetlib import os import re import retrier import hpav_test import power_strip import list_utils import plug #Must contain only MSTAR plugs config = { "eth_netmask": "10.3.6.", "plc_netmask": "10.4.6.", "plugs": { #(Power strip index, # Power plug index, # Identifier (this parameter is optional and is needed only if there # are several plugs connected to the same power plug via another # power strip): #( # br0 address, # eth0 address (this parameter is needed only if the bridge is killed # on the plug - otherwise, it can be set to None), # plc0 address (this parameter is needed only if the bridge is killed # on the plug - otherwise, it can be set to None), # device model, as indicated in # cleopatre/buildroot/target/device/Spidcom, i.e in # ["scr300_eoc_master", "scr310", "scr310_eoc_master_8m", # "scr310_eoc_master", "scr310_eoc_slave_8m", "scr310_eoc_slave", # "spc300dini", "spc300dini_eoc_master", "spc300dini_eoc_slave", # "spk300", "spk300g", "spk300g_eoc_master", "spk300g_eoc_slave", # "spk310", "spk310_eoc_master", "spk310_eoc_slave", "spr300", # "spr310"...] #) (1, 1): ("147", "146", "148", "spr310"), (1, 2): ("139", "138", "140", "spr310"), (1, 3, "plug_1"): ("150", None, None, "spr310"), (1, 3, "plug_2"): ("153", None, None, "spr310"), (2, 1): ("136", "135", "137", "spr310"), (2, 2): ("145", "144", "146", "spr310") }, "ftp_server":{ #Address of the local FTP server viewed from the plugs "address":"192.168.10.254", #Root directory of the local FTP server "root":"/var/all_ftp" } } mac_addresses = {} #Pattern of the files containing the traces trace_pattern = "/usr/local/trace/trace*gz" def update_config(new_config): """Update the configuration""" global config config = new_config global mac_addresses mac_addresses = {} hpav_test.reset_interfaces() def get_mac_address(key): """Gives the MAC address corresponding to a power strip key""" global mac_addresses try: return mac_addresses[key] except: ifconfig = r_execute((key, 0), "ifconfig br0") mac_addresses[key] = extract_mac_address(ifconfig) return mac_addresses[key] def extract_mac_address(stdoutdata): """Extract the MAC address from a "ifconfig br0" response""" pattern = '\w\w:\w\w:\w\w:\w\w:\w\w:\w\w' return re.search(pattern, stdoutdata).group() def get_model(key): """Gives the model corresponding to a power strip key""" return config["plugs"][key][3] def plc_rom_is_present(key, interface_index = 0): """Checks if a plc.rom is present on the flash""" result = r_execute((key, interface_index), "ls /usr/local/lib/modules") return "plc.rom" in result def get_version(key, interface_index = 0): """Gives the version corresponding to a power strip key""" result = r_execute((key, interface_index), "cat /proc/net/plc/version") (driver, firmware) = extract_driver_and_firmware(result) result = r_execute((key, interface_index), "uname -a") image = extract_image(result) return (driver, firmware, image) def extract_driver_and_firmware(string): pattern = "PLC Driver: (.*)\nPLC Firmware: (.*)" (driver, firmware) = re.search(pattern, string).groups() return (driver.strip("\r"), firmware.strip("\r")) def extract_image(string): pattern = "Linux (.*) 2.6.25.10.*" return re.search(pattern, string).groups()[0] def r_ping(key_origin, key_destination): """Ping the plug identified by key_destination from the plug identified by key_origin""" origin_network = config["eth_netmask"] origin = origin_network + config["plugs"][key_origin][1] destination_network = config["plc_netmask"] destination = destination_network + config["plugs"][key_destination][2] command = ping_command(destination) response = r_execute(origin, command) return ping(response) def switch_on_and_kill_bridge(keys): """Switch on the plugs identified by keys. For each plug, the bridge is desactivated and a different IP address is assigned to eth0 and to plc0. The plugs are switched on one by one in order to avoid the Ethernet loops.""" if type(keys) != list: keys = [keys] if keys == []: return # This would be inconsistent with the idea of switching the plugs one by # one assert plug.duplicated_power_plugs_keys_are_absent(keys) print "Starting the plug(s)", keys,"... {" # We switch off all the plugs in order to be sure to start # from a clean situation for key in keys: plug.switch_off(key) # We start the plugs one by one for key in keys: power_plug_key = plug.get_power_plug_key(key) power_strip.switch("on", power_plug_key) assert slow_ping(key, 0)[0] == "pong" (address_0, address_1, address_2, _) = config["plugs"][key] address_0 = config["eth_netmask"] + address_0 address_1 = config["eth_netmask"] + address_1 address_2 = config["plc_netmask"] + address_2 print "Activate the new address of eth0 of the plug" , key r_execute(address_0, "ifconfig eth0 " + address_1) print "Deactivate the address of br0 of the plug" , key r_execute(address_1, "ifdown br0") print "Activate the new address of plc0 of the plug" , key r_execute(address_1, "ifconfig plc0 " + address_2) print "Set the new route for eth0 on the plug", key r_execute(address_1, "route add -net " + config["eth_netmask"] + "0 netmask 255.255.255.0 dev eth0") print "Set the new route for plc0 on the plug", key r_execute(address_1, "route add -net " + config["plc_netmask"] + "0 netmask 255.255.255.0 dev plc0") assert l_ping(key, 0) == "pang" assert l_ping(key, 1) == "pong" assert l_ping(key, 2) == "pang" print "OK: the plug", key, "has been successfully started" print "}" def switch_on(keys): """Switch on the plugs identified by keys""" if type(keys) != list: keys = [keys] if keys == []: return print "Starting the plug(s)", keys,"... {" # We do not switch off the plugs before switching them on, because # there may be several plugs connected to the same power plug via # a secondary power strip power_plugs_keys = plug.get_power_plugs_keys(keys) for power_plug_key in power_plugs_keys: power_strip.switch("on", power_plug_key) for key in keys: assert "pong" == slow_ping(key, 0)[0] print "OK: the plug", key, "has been successfully started" print "}" def change_nmk(key, new_nmk): """Change the NMK of the plug identified by key""" print "Changing the NMK on the plug", key, "...{" network = config["eth_netmask"] host = network + config["plugs"][key][1] command = "spidapp config -npw " + new_nmk response = r_execute(host, command) assert "NMK: updated" in response, response print"OK: the NMK has been successfully changed on the plug", key print "}" def role(key): """Return the role of the plug identified by key in its AVLN: UCCO, USTA, CCO or STA""" network = config["eth_netmask"] command = "cat /etc/hpav.info" host = network + config["plugs"][key][1] hpav_info = r_execute(host, command) return role_aux(hpav_info) def get_role(hpav_info): """Parse the content of the file /etc/hpav_info""" if "CCO = main" in hpav_info: cco = "main" elif "CCO = station" in hpav_info: cco = "station" else: cco = ("unknown_cco", hpav_info) if "STATUS = associated" in hpav_info: status = "associated" elif "STATUS = authenticated" in hpav_info: status = "authenticated" elif "STATUS = unassociated" in hpav_info: status = "unassociated" else: status = ("unknown_status", hpav_info) dictionnary = {("main", "authenticated"): "CCO", ("main", "unassociated"): "UCCO", ("station", "associated"): "STA", ("station", "unassociated"): "USTA", ("station", "authenticated"): "STA"} return dictionnary[(cco, status)] def r_execute(host, commands): """Telnet the host and execute a list of commands on it. The host can be an IP address or a couple (key, interface_index) Since the telnet sometimes fails, we retry 5 times.""" return retrier.retry(r_execute_aux, [host, commands], 5) def r_execute_aux(host, commands): try: (key, interface_index) = host host = get_destination(key, interface_index) except TypeError: pass if type(commands) != list: commands = [commands] telnet = telnetlib.Telnet(host) telnet.read_until("login: ") telnet.write("root\n") for command in commands: telnet.write(command + "\n") telnet.write("exit\n") result = telnet.read_all() return result def trace_is_present(key, interface_index = 0): result = r_execute((key, interface_index), "ls " + trace_pattern) return "No such file or directory" not in result def clean_trace(key, interface_index = 0): (trace_directory, _) = os.path.split(trace_pattern) r_execute((key, interface_index), "rm " + trace_directory + "/*") def get_trace_file_path(directory, zip_file_name, suffix): assert re.search("^trace_\d+\.gz$", zip_file_name) is not None trace_file_name = zip_file_name.replace("trace", "trace_" + suffix) return os.path.join(directory, trace_file_name) def dump_trace(key, directory, interface_index = 0): """Retrieve the traces present on the plug and dump them in the directory""" # No need to dump anything if there are no traces on the plug if not trace_is_present(key, interface_index): return # Create a temporary directory under the FTP directory trace_directory = tempfile.mkdtemp(dir = config["ftp_server"]["root"]) ftp_trace_directory = os.path.basename(trace_directory) # Retrieve the tar file of traces in the temporary directory tar_file_name = os.path.basename(tempfile.NamedTemporaryFile().name) (trace_directory_on_plug, file_pattern) = os.path.split(trace_pattern) commands = \ ["cd " + trace_directory_on_plug, "tar -cf " + tar_file_name + " " + file_pattern, "ftpput -u spidcom -p spidcom " + \ config["ftp_server"]["address"] + " " + \ os.path.join(ftp_trace_directory, tar_file_name) + " " + \ tar_file_name, "rm " + tar_file_name] r_execute((key, interface_index), commands) # Untar the traces tar_file_path = os.path.join(trace_directory, tar_file_name) untar(tar_file_path, trace_directory) # Delete the tar file so that only the traces remain os.remove(tar_file_path) # Check that there are indeed traces files_names = os.listdir(trace_directory) assert files_names != [] # Build the suffix try: (_, _, suffix) = key except: suffix = config["plugs"][key][0] # Copy the files for file_name in files_names: src_file_path = os.path.join(trace_directory, file_name) dest_file_path = get_trace_file_path(directory, file_name, suffix) shutil.copy(src_file_path, dest_file_path) # Delete the temporary directory shutil.rmtree(trace_directory) ################################################################################ # Internal functions ################################################################################ def get_destination(key, interface_index): if interface_index == 2: mask = "plc_netmask" else: mask = "eth_netmask" return config[mask] + config["plugs"][key][interface_index] def l_ping(key, interface_index): """Ping the plug identified by key, using the address specified by the interface_index (0 means br0, 1 means eth0 and 2 means plc0)""" destination = get_destination(key, interface_index) command = ping_command(destination) response = os.popen(command).read() return ping(response) def ping(response): """Return pong if the ping succeeded and pand otherwise""" if "ttl" in response: result = "pong" else: result = "pang" return result def ping_command(destination): """Ping the IP address destination""" return "ping -c1 -W1 " + destination def slow_ping(key, interface_index): """Ping the plug identified by key using the address specified by the interface_index (0 means br0, 1 means eth0 and 2 means plc0). One tries 60 times to let the time to the plug to start up.""" n = 0 result = "pang" while (n < 60 and result == "pang"): print "Attempt #", n, "to ping the address", (key, interface_index) result = l_ping(key, interface_index) n = n + 1 return (result, n) def unzip(zip_file_path): zip_file_handler = gzip.open(zip_file_path, 'rb') zip_file_content = zip_file_handler.read() zip_file_handler.close() return zip_file_content def untar(tar_file_path, destination_directory): assert tarfile.is_tarfile(tar_file_path) tar_file_content = tarfile.open(tar_file_path) tar_file_content.extractall(destination_directory) if __name__ == "__main__": assert get_role("CCO = main\nSTATUS = authenticated") == "CCO" try: get_role("CCO = main\nSTATUS = associated") result = True except KeyError: result = False assert result == False assert "00:13:D7:01:00:30" == extract_mac_address( "root\n" "ifconfig br0\n" "\n" "exit\n" " __ __\n" "| \/ |____ _\n" "| __ / ___|| |_ __ _ _ __\n" "| | | \___ \| __/ _` | '__|\n" "| | | |___) | || (_| | |\n" "|_| |_|____/ \__\__,_|_|\n" "# ifconfig br0\n" "br0 Link encap:Ethernet HWaddr 00:13:D7:01:00:30 \n" " inet addr:192.168.5.101 Bcast:192.168.5.255 Mask:255.255.255.0\n" " UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1\n" " RX packets:4354 errors:0 dropped:0 overruns:0 frame:0\n" " TX packets:4137 errors:0 dropped:0 overruns:0 carrier:0\n" " collisions:0 txqueuelen:0 \n" " RX bytes:285324 (278.6 KiB) TX bytes:632276 (617.4 KiB)\n" "\n" "# \n" "# exit\n") assert ("eoc-0.7.10", "eoc-0.7.11") == \ extract_driver_and_firmware("foo\n" "PLC Driver: eoc-0.7.10\r\n" "PLC Firmware: eoc-0.7.11\r\n" "bar") assert ("eoc-0.7.10", "eoc-0.7.11") == \ extract_driver_and_firmware("foo\n" "PLC Driver: eoc-0.7.10\n" "PLC Firmware: eoc-0.7.11\n" "bar") assert "spk300g_eoc_slave" == \ extract_image("Linux spk300g_eoc_slave 2.6.25.10 " \ "#1(eoc-drv-2.0.13 based on eoc-drv-2.0.13) " \ "Fri Feb 8 11:37:07 CE armv5tejl unknown") assert "/tmp/trace_slave_13_1.gz" == get_trace_file_path("/tmp", "trace_1.gz", "slave_13")