summaryrefslogtreecommitdiff
path: root/validation/validlib/spc300.py
blob: 22e6915b962d53d7a44d14cfeac3d1fb1053c387 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# -*- coding: utf-8 -*-

"""Module allowing to command and control the plug SPC300"""

import tarfile
import tempfile
import shutil
import telnetlib
import os
import re

import retrier
import hpav_test
import power_strip
import list_utils
import plug
import ssh

#Must contain only MSTAR plugs
config = {
    "eth_netmask": "10.3.6.",
    "plc_netmask": "10.4.6.",
    "plugs":
        {
        #(Power strip index,
        # Power plug index,
        # Identifier (this parameter is optional and is needed only if there
        # are several plugs connected to the same power plug via another
        # power strip):
        #(
        # br0 address,
        # eth0 address (this parameter is needed only if the bridge is killed
        # on the plug - otherwise, it can be set to None),
        # plc0 address (this parameter is needed only if the bridge is killed
        # on the plug - otherwise, it can be set to None),
        # device model, as indicated in
        # cleopatre/buildroot/target/device/Spidcom, i.e in
        # ["scr300_eoc_master", "scr310", "scr310_eoc_master_8m",
        #  "scr310_eoc_master", "scr310_eoc_slave_8m", "scr310_eoc_slave",
        #  "spc300dini", "spc300dini_eoc_master", "spc300dini_eoc_slave",
        #  "spk300", "spk300g", "spk300g_eoc_master", "spk300g_eoc_slave",
        #  "spk310", "spk310_eoc_master", "spk310_eoc_slave", "spr300",
        #  "spr310"...]
        #)
        (1, 1): ("147", "146", "148", "spr310"),
        (1, 2): ("139", "138", "140", "spr310"),
        (1, 3, "plug_1"): ("150", None, None, "spr310"),
        (1, 3, "plug_2"): ("153", None, None, "spr310"),
        (2, 1): ("136", "135", "137", "spr310"),
        (2, 2): ("145", "144", "146", "spr310")
        }
    }
mac_addresses = {}

#Pattern of the files containing the traces
trace_pattern = "/usr/local/trace/trace_*.gz"

def update_config(new_config):
    """Update the configuration"""
    global config
    config = new_config
    global mac_addresses
    mac_addresses = {}
    hpav_test.reset_interfaces()

def get_mac_address(key):
    """Gives the MAC address corresponding to a power strip key"""
    global mac_addresses
    try:
        return mac_addresses[key]
    except:
        ifconfig = r_execute((key, 0), "ifconfig br0")
        mac_addresses[key] = extract_mac_address(ifconfig)
        return mac_addresses[key]

def extract_mac_address(stdoutdata):
    """Extract the MAC address from a "ifconfig br0" response"""
    pattern = '\w\w:\w\w:\w\w:\w\w:\w\w:\w\w'
    return re.search(pattern, stdoutdata).group()

def get_model(key):
    """Gives the model corresponding to a power strip key"""
    return config["plugs"][key][3]

def plc_rom_is_present(key, interface_index = 0):
    """Checks if a plc.rom is present on the flash"""
    result = r_execute((key, interface_index), "ls /usr/local/lib/modules")
    return "plc.rom" in result

def get_version(key, interface_index = 0):
    """Gives the version corresponding to a power strip key"""
    result = r_execute((key, interface_index), "cat /proc/net/plc/version")
    (driver, firmware) = extract_driver_and_firmware(result)
    result = r_execute((key, interface_index), "uname -a")
    image = extract_image(result)
    return (driver, firmware, image)

def extract_driver_and_firmware(string):
    pattern = "PLC Driver: (.*)\nPLC Firmware: (.*)"
    (driver, firmware) = re.search(pattern, string).groups()
    return (driver.strip("\r"), firmware.strip("\r"))

def extract_image(string):
    pattern = "Linux (.*) 2.6.25.10.*"
    return re.search(pattern, string).groups()[0]

def r_ping(key_origin, key_destination):
    """Ping the plug identified by key_destination from the plug identified by
    key_origin"""
    origin_network = config["eth_netmask"]
    origin = origin_network + config["plugs"][key_origin][1]
    destination_network = config["plc_netmask"]
    destination =  destination_network + config["plugs"][key_destination][2]
    command = ping_command(destination)
    response = r_execute(origin, command)
    return ping(response)

def switch_on_and_kill_bridge(keys):
    """Switch on the plugs identified by keys.
    For each plug, the bridge is desactivated and a different IP address is
    assigned to eth0 and to plc0.
    The plugs are switched on one by one in order to avoid the Ethernet
    loops."""

    if type(keys) != list:
        keys = [keys]

    if keys == []:
        return

    # This would be inconsistent with the idea of switching the plugs one by
    # one
    assert plug.duplicated_power_plugs_keys_are_absent(keys)

    print "Starting the plug(s)", keys,"... {"

    # We switch off all the plugs in order to be sure to start
    # from a clean situation
    for key in keys:
        plug.switch_off(key)

    # We start the plugs one by one
    for key in keys:
        power_plug_key = plug.get_power_plug_key(key)
        power_strip.switch("on", power_plug_key)
        assert slow_ping(key, 0)[0] == "pong"

        (address_0, address_1, address_2, _) = config["plugs"][key]
        address_0 = config["eth_netmask"] + address_0
        address_1 = config["eth_netmask"] + address_1
        address_2 = config["plc_netmask"] + address_2
        print "Activate the new address of eth0 of the plug" , key
        r_execute(address_0, "ifconfig eth0 " + address_1)
        print "Deactivate the address of br0 of the plug" , key
        r_execute(address_1, "ifdown br0")
        print "Activate the new address of plc0 of the plug" , key
        r_execute(address_1, "ifconfig plc0 " + address_2)
        print "Set the new route for eth0 on the plug", key
        r_execute(address_1, "route add -net " + config["eth_netmask"] +
                  "0 netmask 255.255.255.0 dev eth0")
        print "Set the new route for plc0 on the plug", key
        r_execute(address_1, "route add -net " + config["plc_netmask"] +
                  "0 netmask 255.255.255.0 dev plc0")
        assert l_ping(key, 0) == "pang"
        assert l_ping(key, 1) == "pong"
        assert l_ping(key, 2) == "pang"
        print "OK: the plug", key, "has been successfully started"

    print "}"

def switch_on(keys):
    """Switch on the plugs identified by keys"""

    if type(keys) != list:
        keys = [keys]

    if keys == []:
        return

    print "Starting the plug(s)", keys,"... {"

    # We do not switch off the plugs before switching them on, because
    # there may be several plugs connected to the same power plug via
    # a secondary power strip

    power_plugs_keys = plug.get_power_plugs_keys(keys)

    for power_plug_key in power_plugs_keys:
        power_strip.switch("on", power_plug_key)

    for key in keys:
        assert "pong" == slow_ping(key, 0)[0]
        print "OK: the plug", key, "has been successfully started"

    print "}"

def change_nmk(key, new_nmk):
    """Change the NMK of the plug identified by key"""
    print "Changing the NMK on the plug", key, "...{"
    network = config["eth_netmask"]
    host = network + config["plugs"][key][1]
    command = "spidapp config -npw " + new_nmk
    response = r_execute(host, command)
    assert "NMK: updated" in response, response
    print"OK: the NMK has been successfully changed on the plug", key
    print "}"

def role(key):
    """Return the role of the plug identified by key in its AVLN: UCCO, USTA,
    CCO or STA"""
    network = config["eth_netmask"]
    command = "cat /etc/hpav.info"
    host = network + config["plugs"][key][1]
    hpav_info = r_execute(host, command)
    return get_role(hpav_info)

def get_role(hpav_info):
    """Parse the content of the file /etc/hpav_info"""
    if "CCO = main" in hpav_info:
        cco = "main"
    elif "CCO = station" in hpav_info:
        cco = "station"
    else:
        cco = ("unknown_cco", hpav_info)

    if "STATUS = associated" in hpav_info:
        status = "associated"
    elif "STATUS = authenticated" in hpav_info:
        status = "authenticated"
    elif "STATUS = unassociated" in hpav_info:
        status = "unassociated"
    else:
        status = ("unknown_status", hpav_info)

    dictionnary = {("main", "authenticated"): "CCO",
                   ("main", "unassociated"): "UCCO",
                   ("station", "associated"): "STA",
                   ("station", "unassociated"): "USTA",
                   ("station", "authenticated"): "STA"}

    return dictionnary[(cco, status)]

def r_execute(host, commands):
    """Telnet the host and execute a list of commands on it.
    The host can be an IP address or a couple (key, interface_index)
    Since the telnet sometimes fails, we retry 5 times."""
    return retrier.retry(r_execute_aux, [host, commands], 5)

def r_execute_aux(host, commands):
    try:
        (key, interface_index) = host
        host = get_destination(key, interface_index)
    except:
        pass
    if type(commands) != list:
        commands = [commands]
    telnet = telnetlib.Telnet(host)
    telnet.read_until("login: ")
    telnet.write("root\n")
    for command in commands:
        telnet.write(command + "\n")
    telnet.write("exit\n")
    result = telnet.read_all()
    return result

def trace_is_present(key, interface_index = 0):
    result = r_execute((key, interface_index), "ls " + trace_pattern)
    return "No such file or directory" not in result

def clean_trace(key, interface_index = 0):
    (trace_directory, _) = os.path.split(trace_pattern)
    r_execute((key, interface_index), "rm " + trace_directory + "/*")

def get_trace_file_path(directory, zip_file_name, suffix):
    assert re.search("^trace_\d+\.gz$", zip_file_name) is not None
    trace_file_name = zip_file_name.replace("trace", "trace_" + suffix)
    return os.path.join(directory, trace_file_name)

def random_string():
    return os.path.basename(tempfile.NamedTemporaryFile().name)

def dump_trace(key, dest_directory, ftp_server, interface_index = 0):
    """Retrieve the traces present on the plug and dump them in the directory"""

    # No need to dump anything if there are no traces on the plug
    if not trace_is_present(key, interface_index):
        return

    # Create a temporary remote directory
    ftp_trace_directory = random_string()
    remote_trace_directory = os.path.join(ftp_server["root"],
                                          ftp_trace_directory)
    ssh.call(ftp_server["user"], ftp_server["non_plc_ip_address"],
             "mkdir " + remote_trace_directory)

    # Retrieve the tar file of traces in the remote temporary directory
    tar_file_name = random_string()
    (trace_directory_on_plug, file_pattern) = os.path.split(trace_pattern)
    commands = \
        ["cd " + trace_directory_on_plug,
         "tar -cf " + tar_file_name + " " + file_pattern,
         "ftpput -u spidcom -p spidcom " + \
             ftp_server["plc_ip_address"] + " " + \
             os.path.join(ftp_trace_directory, tar_file_name) + " " + \
             tar_file_name,
         "rm " + tar_file_name]
    r_execute((key, interface_index), commands)

    # Create a local temporary directory
    local_trace_directory = os.path.join("/tmp", ftp_trace_directory)
    os.mkdir(local_trace_directory)

    # Copy the remote file locally
    remote_tar_file_path = os.path.join(remote_trace_directory, tar_file_name)
    ssh.cp(ftp_server["user"], ftp_server["non_plc_ip_address"],
           remote_tar_file_path, local_trace_directory)

    # Remove the remote temporary directory
    ssh.call(ftp_server["user"], ftp_server["non_plc_ip_address"],
             "rm -rf " + remote_trace_directory)

    # Untar the traces locally
    local_tar_file_path = os.path.join(local_trace_directory, tar_file_name)
    untar(local_tar_file_path, local_trace_directory)

    # Delete the local tar file so that only the traces remain
    os.remove(local_tar_file_path)

    # Check that there are indeed traces
    files_names = os.listdir(local_trace_directory)
    assert files_names != []

    # Build the suffix
    try:
        (_, _, suffix) = key
    except:
        suffix = config["plugs"][key][0]

    # Create the destination directory if it does not exist
    if os.path.exists(dest_directory) == False:
        os.mkdir(dest_directory)

    # Remove the old traces files, if any
    for file_name in os.listdir(dest_directory):
        if os.path.splitext(file_name) == os.path.splitext(trace_pattern):
            os.remove(os.path.join(dest_directory, file_name))

    # Copy the new traces files
    for file_name in files_names:
        src_file_path = os.path.join(local_trace_directory, file_name)
        dest_file_path = get_trace_file_path(dest_directory, file_name, suffix)
        shutil.copy(src_file_path, dest_file_path)

    # Delete the local temporary directory
    shutil.rmtree(local_trace_directory)

################################################################################
# Internal functions
################################################################################

def get_destination(key, interface_index):
    if interface_index == 2:
        mask = "plc_netmask"
    else:
        mask = "eth_netmask"
    return config[mask] + config["plugs"][key][interface_index]

def l_ping(key, interface_index):
    """Ping the plug identified by key, using the address specified by the
    interface_index (0 means br0, 1 means eth0 and 2 means plc0)"""
    destination = get_destination(key, interface_index)
    command = ping_command(destination)
    response = os.popen(command).read()
    return ping(response)

def ping(response):
    """Return pong if the ping succeeded and pand otherwise"""
    if "ttl" in response:
        result = "pong"
    else:
        result = "pang"
    return result

def ping_command(destination):
    """Ping the IP address destination"""
    return "ping -c1 -W1 " + destination

def slow_ping(key, interface_index):
    """Ping the plug identified by key using the address specified by the
    interface_index (0 means br0, 1 means eth0 and 2 means plc0).
    One tries 60 times to let the time to the plug to start up."""
    n = 0
    result = "pang"
    while (n < 60 and result == "pang"):
        print "Attempt #", n, "to ping the address", (key, interface_index)
        result = l_ping(key, interface_index)
        n = n + 1
    return (result, n)

def untar(tar_file_path, destination_directory):
    assert tarfile.is_tarfile(tar_file_path)
    tar_file_content = tarfile.open(tar_file_path)
    tar_file_content.extractall(destination_directory)

if __name__ == "__main__":
    assert get_role("CCO = main\nSTATUS = authenticated") == "CCO"
    try:
        get_role("CCO = main\nSTATUS = associated")
        result = True
    except KeyError:
        result = False
    assert result == False
    assert "00:13:D7:01:00:30" == extract_mac_address(
        "root\n"
        "ifconfig br0\n"
        "\n"
        "exit\n"
        " __  __\n"
        "|  \/  |____  _\n"
        "|  __  / ___|| |_ __ _ _ __\n"
        "| |  | \___ \| __/ _` | '__|\n"
        "| |  | |___) | || (_| | |\n"
        "|_|  |_|____/ \__\__,_|_|\n"
        "# ifconfig br0\n"
        "br0       Link encap:Ethernet  HWaddr 00:13:D7:01:00:30  \n"
        "          inet addr:192.168.5.101  Bcast:192.168.5.255  Mask:255.255.255.0\n"
        "          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1\n"
        "          RX packets:4354 errors:0 dropped:0 overruns:0 frame:0\n"
        "          TX packets:4137 errors:0 dropped:0 overruns:0 carrier:0\n"
        "          collisions:0 txqueuelen:0 \n"
        "          RX bytes:285324 (278.6 KiB)  TX bytes:632276 (617.4 KiB)\n"
        "\n"
        "# \n"
        "# exit\n")

    assert ("eoc-0.7.10", "eoc-0.7.11") == \
        extract_driver_and_firmware("foo\n"
                                    "PLC Driver: eoc-0.7.10\r\n"
                                    "PLC Firmware: eoc-0.7.11\r\n"
                                    "bar")

    assert ("eoc-0.7.10", "eoc-0.7.11") == \
        extract_driver_and_firmware("foo\n"
                                    "PLC Driver: eoc-0.7.10\n"
                                    "PLC Firmware: eoc-0.7.11\n"
                                    "bar")

    assert "spk300g_eoc_slave" == \
        extract_image("Linux spk300g_eoc_slave 2.6.25.10 " \
                          "#1(eoc-drv-2.0.13 based on eoc-drv-2.0.13) " \
                          "Fri Feb 8 11:37:07 CE armv5tejl unknown")

    assert "/tmp/trace_slave_13_1.gz" == get_trace_file_path("/tmp",
                                                             "trace_1.gz",
                                                             "slave_13")