summaryrefslogtreecommitdiff
path: root/cesar/test_general/maximus/integration/interface-dp/test01.py
blob: 016f447f52dea5709e62e9b9cf8edd777f08c1a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/python

#############################################################################
#  Copyright (C) 2010 Spidcom
#
#  Test the communication between the Interface module and the Datapath
#  module.
#
#  MMe send pass throw the interface module and is sent to the peer station.
#
#############################################################################

import sys, unittest, os
sys.path.append(sys.path[0] + '/../../../../maximus/python/tools/csi/')
sys.path.append(sys.path[0] + '/../../../../maximus/python/obj/')
sys.path.append(sys.path[0] + '/../../../../maximus/python/')
sys.path.append(sys.path[0] + '/../../../../maximus/python/lib/cesar')
# Import some classes from the CSI module.
from csicore import csiCore, csiSta
from csitime import ms_to_tck
from maximus.mme import MME
from maximus.mme.mmheader import MMHeader
from maximus.ethernet.eth import Eth
from struct import unpack

MAC_ADDR_BASE = '00:13:d7:00:00:%02x'

def filter_mme (msdu):
    return msdu.get_type () == 'ETHERNET_TYPE_MME'

def filter_data (msdu):
    return msdu.get_type () == 'ETHERNET_TYPE_DATA'

csi = csiCore(4242);
csi.process_init(sys.argv + ['-e', sys.path[0] + '/obj/station.elf'])
avln = csi.avln_add("HomePlug_AVLN", "AVLN")
for i in range (0,2):
    avln.sta_add (MAC_ADDR_BASE % (i+1),
            False, False, "Homeplug_Station_%02d" % i,
            "spidcom_spc300_sta_%02d" % i, "station_%02d" % i, 1,
            False)
# Start the AVLN and STA.
csi.process_avlns_launch()
csi.process_wait_sec (1)
sta1 = avln.get_sta (0)
sta2 = avln.get_sta (1)

class TestInitFunctions(unittest.TestCase):

    def setUp (self):
        pass

    def tearDown (self):
        pass

    def testMME (self):
        """Send the MME from STA 1 to STA 2."""
        # Station executable is configured to send MME to the peer STA, in a
        # real executable this MMe should be handled by the CP.
        mme = MME (MMHeader=MMHeader(ODA=sta2.get_mac_addr (),
            OSA=sta1.get_mac_addr (), MTYPE=0x88E1, MMV=1,
            MMTYPE=0x002C, FMI=0))
        db = mme.sendnrecv (csi.get_maximus (), sta1.get_sta_cesar (),
                filter = filter_mme, timeout = ms_to_tck (40))
        self.failUnless (db != None)
        for i in db:
            e = Eth ()
            e.set_msdu_attr (i.get ())
            self.failUnless (e.src == sta2.get_mac_addr ())
            self.failUnless (e.dst == sta1.get_mac_addr ())
            self.failUnless (e.type == 0x88E1)
            # Get the MMType.
            mmheader = i.get_mmheader ()
            mmtype = unpack ('B' * len (mmheader), mmheader)[15:17]
            mmtype = mmtype[0] | mmtype[1] << 8
            self.failUnless (mmtype == 0x002d)

    def testDATA (self):
        """Send data from STA 1 to STA 2."""
        frame = Eth()
        frame.src = sta1.get_mac_addr ()
        frame.dst = sta2.get_mac_addr ()
        frame.type = 0x4F50
        frame.payload = "aabbccddeeffjj"
        db = frame.sendnrecv (csi.get_maximus (), sta1.get_sta_cesar (),
                timeout = ms_to_tck (40), filter = filter_data)
        self.failUnless (db != None)
        self.failUnless (len (db) == 1)
        for i in db:
            self.failUnless (i.src == frame.src)
            self.failUnless (i.dst == frame.dst)
            self.failUnless (i.type == frame.type)
            self.failUnless (i.payload == frame.payload)

suite = unittest.TestLoader().loadTestsFromTestCase(TestInitFunctions)
testResult = unittest.TextTestRunner(verbosity=2).run(suite)

csi.process_avlns_stop ()
csi.process_avlns_remove ()
os.system ('killall station.elf')

# For nightly build errors
sys.exit ((1, 0)[testResult.wasSuccessful ()])