summaryrefslogtreecommitdiff
path: root/cesar/maximus/python/test/test_macframe.py
diff options
context:
space:
mode:
Diffstat (limited to 'cesar/maximus/python/test/test_macframe.py')
-rw-r--r--cesar/maximus/python/test/test_macframe.py547
1 files changed, 547 insertions, 0 deletions
diff --git a/cesar/maximus/python/test/test_macframe.py b/cesar/maximus/python/test/test_macframe.py
new file mode 100644
index 0000000000..ab5026ead8
--- /dev/null
+++ b/cesar/maximus/python/test/test_macframe.py
@@ -0,0 +1,547 @@
+#! usr/bin/env python
+
+print "\n*** " + __file__ + " ***\n"
+
+import sys, startup
+
+from maximus.macframe import *
+from maximus.macframe.create import create_pb
+from maximus.macframe.mpdu import SIZE_OF_PB520, MAX_PB_NB_PER_MPDU, MPDU
+from maximus.macframe.msdu import MSDU
+from maximus.macframe.pb import PB
+from maximus.mme import *
+from maximus.utils.crc import crc8, crc24, crc32
+from maximus.utils.format import *
+from interface import *
+from struct import pack, unpack
+
+
+# MPDU TEST
+
+# Create a Maximus instance
+m = Maximus()
+
+# Initialize Maximus with command line arguments
+m.init(sys.argv)
+
+def prepare_rx(s, pb_nb=1):
+ fcall = m.create_fcall('prepare_rx')
+ if pb_nb == 1:
+ fc_mode = 2 # PHY_FC_MODE_AV_1
+ short_ppdu = False
+ mod = 2 # PHY_MOD_MINI_ROBO
+ fecrate = 0 # PHY_FEC_RATE_1_2
+ pb_size = 0 # PHY_PB_SIZE_136
+ gil = 1 # PHY_GIL_567
+ else:
+ fc_mode = 2 # PHY_FC_MODE_AV_1
+ short_ppdu = False
+ mod = 0 # PHY_MOD_ROBO
+ fecrate = 0 # PHY_FEC_RATE_1_2
+ pb_size = 1 # PHY_PB_SIZE_520
+ gil = 0 # PHY_GIL_417
+ fcall.add_param_ushort("fc_mode", fc_mode)
+ fcall.add_param_bool("short_ppdu", short_ppdu)
+ fcall.add_param_ushort("mod", mod)
+ fcall.add_param_ushort("fecrate", fecrate)
+ fcall.add_param_ushort("pb_size", pb_size)
+ fcall.add_param_ushort("gil", gil)
+ fcall.add_param_ushort("pb_nb", pb_nb)
+ fcall.send(s)
+
+# Create a MAC Frame (6 octets)
+macFrame1 = MACFrame(FC_10 = FC_10(), FC_AV = FC_AV())
+
+# Create a MAC Frame (6 octets)
+fcav = FC_AV()
+macFrame2 = MACFrame(FC_AV=fcav)
+
+# Create a MAC Frame (14 octets)
+macFrame3 = MACFrame()
+macFrame3.set_msdu(pack('Q', 0x1122334455667788))
+
+# Create a MAC Frame (10 octets)
+f = file('/tmp/data.txt', 'w')
+f.write(pack('I', 0x31323334))
+f.close()
+macFrame4 = MACFrame(FC_AV=FC_AV(DT_AV='SOF'))
+f = file('/tmp/data.txt', 'r')
+macFrame4.set_msdu(f.readline())
+f.close()
+macFrame4.set_fc_10(FC_10(CC=1, DT='SOF_RSP'))
+
+# Maximus needs to receive the TONEMASK from a station
+station = m.create_sta()
+station.debug()
+fcall1 = m.create_fcall('set_tonemask')
+fcall1.send(station)
+
+# Send the MAC Frame asynchronously
+prepare_rx(station)
+macFrame4.send(m)
+
+# Create a MAC Frame containing an MME (66 octets)
+macFrame5 = MACFrame(msdu=MME(), PBHeader=PBHeader(SSN=0xFFF0))
+prepare_rx(station)
+macFrame5.send(m)
+
+# Create a MAC Frame Queue
+list1 = [macFrame1, macFrame2, macFrame3, macFrame4, macFrame5] # 102 octets => 1 PB
+queue = MACFrameQueue(PBHeader=PBHeader(), macframelist=list1)
+list1.reverse()
+queue.add(list1)
+prepare_rx(station, pb_nb=1)
+queue.send(m)
+list2 = [MACFrame(msdu=507*'a'), MACFrame(msdu=1017*'b')] # 1536 octets => 3 PBs
+queue.set_macframelist(list2)
+prepare_rx(station, pb_nb=3)
+queue.send(m)
+
+station.remove()
+
+
+# DOC TEST
+
+import doctest
+doctest.testmod(create)
+doctest.testmod(fc_10)
+doctest.testmod(fc_av)
+doctest.testmod(macframe)
+doctest.testmod(macframeheader)
+doctest.testmod(macframequeue)
+doctest.testmod(mpdu)
+doctest.testmod(msdu)
+doctest.testmod(pb)
+doctest.testmod(pbheader)
+
+# UNIT TEST
+
+import unittest
+
+class TestFC_10Functions(unittest.TestCase):
+
+ def setUp(self):
+ self.fc10 = FC_10()
+
+ def tearDown(self):
+ pass
+
+ def test_set_cc(self):
+ # Test with a Python integer
+ cc = 1
+ self.fc10.set_cc(cc)
+ self.assertEqual(self.fc10.get_cc(),cc)
+
+ def test_set_dt(self):
+ # Test with a Python integer
+ dt = 0x07
+ self.fc10.set_dt(dt)
+ self.assertEqual(self.fc10.get_dt(),dt)
+ # Test with a Python string
+ dt = 'NACK'
+ self.fc10.set_dt(dt)
+ self.assertEqual(self.fc10.get_dt(),0x05)
+
+ def test_set_vf(self):
+ # Test with a Python integer
+ vf = 0x1FFF
+ self.fc10.set_vf(vf)
+ self.assertEqual(self.fc10.get_vf(),vf)
+
+ def test_set_fccs(self):
+ # Test with a Python integer
+ fccs = 0xFF
+ self.fc10.set_fccs(fccs)
+ self.assertEqual(self.fc10.get_fccs(),fccs)
+ # Test with a Python string
+ s = pack('B',fccs)
+ self.fc10.set_fccs(s)
+ self.assertEqual(self.fc10.get_fccs(),fccs)
+
+ def test_get(self):
+ fc10 = FC_10(CC=1, DT='NACK', VF=0x1FFF)
+ self.assertEqual(fc10.get(),0x01BFFF00 + crc8(htohp24(0x01BFFF)))
+ fc10.set_fccs(15)
+ self.assertEqual(fc10.get(),0x01BFFF0F)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestFC_10Functions)
+
+class TestFC_AVFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.fcav = FC_AV()
+
+ def tearDown(self):
+ pass
+
+ def test_set_dt_av(self):
+ # Test with a Python integer
+ dt_av = 3
+ self.fcav.set_dt_av(dt_av)
+ self.assertEqual(self.fcav.get_dt_av(),dt_av)
+ # Test with a Python string
+ dt_av = 'Sound'
+ self.fcav.set_dt_av(dt_av)
+ self.assertEqual(self.fcav.get_dt_av(),4)
+
+ def test_set_access(self):
+ # Test with a Python integer
+ access = 1
+ self.fcav.set_access(access)
+ self.assertEqual(self.fcav.get_access(),access)
+
+ def test_set_snid(self):
+ # Test with a Python integer
+ snid = 0x0F
+ self.fcav.set_snid(snid)
+ self.assertEqual(self.fcav.get_snid(),snid)
+
+ def test_set_vf_av(self):
+ # Test with a Python integer
+ vf_av = 123456789123456789
+ self.fcav.set_vf_av(vf_av)
+ self.assertEqual(self.fcav.get_vf_av(),vf_av)
+ # Test with a Python string
+ vf_av = 'ABCDEFGHIJKL'
+ self.fcav.set_vf_av(vf_av)
+ self.assertEqual(self.fcav.get_vf_av(),0x4C4B4A494847464544434241)
+
+ def test_set_fccs_av(self):
+ # Test with a Python integer
+ fccs_av = 0x123
+ self.fcav.set_fccs_av(fccs_av)
+ self.assertEqual(self.fcav.get_fccs_av(),fccs_av)
+ # Test with a Python string
+ fccs_av = 'ABC'
+ self.fcav.set_fccs_av(fccs_av)
+ self.assertEqual(self.fcav.get_fccs_av(),0x434241)
+
+ def test_get(self):
+ fcav = FC_AV(DT_AV=7, ACCESS=0, SNID=4, VF_AV=0xFFFF)
+ self.assertEqual(fcav.get(),(0x47000000, 0, 0x000000FF,\
+ 0xFF000000 + crc24(htohp32(0x47000000) + htohp32(0) + htohp32(0x000000FF) + htohp8(0xFF))))
+ fcav.set_fccs_av(15)
+ self.assertEqual(fcav.get(),(0x47000000, 0, 0x000000FF, 0xFF00000F))
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestFC_AVFunctions))
+
+class TestMACFrameFunctions(unittest.TestCase):
+
+ def setUp(self):
+
+ self.macframe = MACFrame()
+ self.assertEqual(self.macframe.get_fc_10(), 0)
+ self.assertNotEqual(self.macframe.get_fc_av(), None)
+ self.assertNotEqual(self.macframe.get_macframeheader(), None)
+ self.assertEqual(self.macframe.get_ats(), None)
+ self.assertEqual(self.macframe.get_confounder(), None)
+ self.assertEqual(self.macframe.get_icv(), None)
+ self.assertEqual(self.macframe.get_msdu(), '')
+ self.assertEqual(self.macframe.get_iv(), 3*4*'0')
+ self.assertEqual(self.macframe.get_nek(), 4*4*'0')
+
+ self.m = m
+ self.sta = self.m.create_sta()
+ fcall = self.m.create_fcall('set_tonemask')
+ fcall.send(self.sta)
+
+ def tearDown(self):
+ self.sta.remove()
+
+ def test_set_macframeheader(self):
+ # Test with a MAC Frame Header object
+ header = MACFrameHeader()
+ self.macframe.set_macframeheader(header)
+ self.assertEqual(self.macframe.get_macframeheader(),header.get())
+ # Test with a Python integer
+ header = 0x1234
+ self.macframe.set_macframeheader(header)
+ self.assertEqual(self.macframe.get_macframeheader(),pack('H',header))
+ # Test with a Python string
+ header = 'AB'
+ self.macframe.set_macframeheader(header)
+ self.assertEqual(self.macframe.get_macframeheader(),header)
+
+ def test_set_ats(self):
+ # Test with a Python long
+ ats = 0x12345678
+ self.macframe.set_ats(ats)
+ self.assertEqual(self.macframe.get_ats(),pack('I',ats))
+ # Test with a Python string
+ ats = 'ABCD'
+ self.macframe.set_ats(ats)
+ self.assertEqual(self.macframe.get_ats(),ats)
+
+ def test_set_confounder(self):
+ # Test with a Python long
+ confounder = 0x12345678
+ self.macframe.set_confounder(confounder)
+ self.assertEqual(self.macframe.get_confounder(),pack('I',confounder))
+ # Test with a Python string
+ confounder = 'ABCD'
+ self.macframe.set_confounder(confounder)
+ self.assertEqual(self.macframe.get_confounder(),confounder)
+
+ def test_set_icv(self):
+ # Test with a Python long
+ icv = 4294967295;
+ self.macframe.set_icv(icv)
+ self.assertEqual(self.macframe.get_icv(),pack('I',icv))
+ # Test with a Python string
+ icv = pack('I', 1)
+ self.macframe.set_icv(icv)
+ self.assertEqual(self.macframe.get_icv(),icv)
+
+ def test_set_msdu(self):
+ # Test with an MSDU object
+ msdu = MME()
+ self.macframe.set_msdu(msdu)
+ self.assertEqual(self.macframe.get_msdu(),msdu.get())
+ # Test with a Python string
+ msdu = '0123456789ABCDEF'
+ self.macframe.set_msdu(msdu)
+ self.assertEqual(self.macframe.get_msdu(),msdu)
+
+ def test_sendnrecv(self):
+ # Tested in 'py/test_tx_rx.py' because another station is needed for this test
+ # Here, just test the timeout
+ prepare_rx(self.sta)
+ self.macframe.set_fc_10(123)
+ self.macframe.set_fc_av(pack('IIII', 123, 456, 789, 10))
+ self.macframe.set_msdu('This is the MPDU payload')
+ t = (self.m.get_date() + 250000 - 1) / 250000
+ d = 250000
+ rsp = self.macframe.sendnrecv(self.m, timeout=d)
+ self.assertEqual(rsp, None)
+ self.assertEqual(self.m.get_date(), (t + 1) * 250000)
+
+ def test_send(self):
+ prepare_rx(self.sta)
+ self.macframe.set_fc_10(123)
+ self.macframe.set_fc_av(pack('IIII', 123, 456, 789, 10))
+ self.macframe.set_msdu('This is the MPDU payload')
+ self.macframe.send(self.m)
+
+ def test_fill_mpdu_attr(self):
+ payload = 'ABCDEFGHIJKL'
+ header = MACFrameHeader(MFL=len(payload)-1)
+ self.macframe.set_macframeheader(header)
+ self.macframe.set_msdu(payload)
+ self.macframe.fill_mpdu_attr(0, 0, 0)
+ expected_payload = header.get() + payload + htohp32(crc32(payload))
+ self.assertEqual(self.macframe.payload, expected_payload)
+ icv = '1234'
+ self.macframe.set_icv(icv)
+ self.macframe.fill_mpdu_attr(0, 0, 0)
+ expected_payload = header.get() + payload + icv
+ self.assertEqual(self.macframe.payload, expected_payload)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestMACFrameFunctions))
+
+class TestMACFrameHeaderFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.header = MACFrameHeader()
+
+ def tearDown(self):
+ pass
+
+ def test_set_mft(self):
+ # Test with a Python integer
+ mft = 0x01
+ self.header.set_mft(mft)
+ self.assertEqual(self.header.get_mft(),mft)
+
+ def test_set_mfl(self):
+ # Test with a Python integer
+ mfl = 0x1234
+ self.header.set_mfl(mfl)
+ self.assertEqual(self.header.get_mfl(),mfl)
+
+ def test_set(self):
+ mft = 0x02
+ header = MACFrameHeader(MFT=mft)
+ mfl = 0x2345
+ self.header.set(mft,mfl)
+ self.assertEqual(self.header.get_mft(),mft)
+ self.assertEqual(self.header.get_mfl(),mfl)
+
+ def test_get(self):
+ mft = 0x03
+ mfl = 0x3123
+ get = pack('H', (mfl << 2) + mft)
+ header = MACFrameHeader(MFT=mft, MFL=mfl)
+ self.assertEqual(header.get(),get)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestMACFrameHeaderFunctions))
+
+class TestMACFrameQueueFunctions(unittest.TestCase):
+
+ def setUp(self):
+
+ self.macframequeue = MACFrameQueue()
+ self.assertEqual(self.macframequeue.get_fc_10(), 0)
+ self.assertNotEqual(self.macframequeue.get_fc_av(), None)
+ self.assertEqual(self.macframequeue.get_iv(), 3*4*'0')
+ self.assertEqual(self.macframequeue.get_nek(), 4*4*'0')
+ self.assertEqual(self.macframequeue.get_macframelist(), [])
+
+ self.m = m
+ self.sta = self.m.create_sta()
+ fcall = self.m.create_fcall('set_tonemask')
+ fcall.send(self.sta)
+
+ def tearDown(self):
+ self.sta.remove()
+
+ def test_set_macframelist(self):
+ # Test with an Python list
+ list = [MACFrame(), MACFrame(), MACFrame()]
+ self.macframequeue.set_macframelist(list)
+ self.assertEqual(self.macframequeue.get_macframelist(), list)
+
+ def test_add(self):
+ self.macframequeue.set_macframelist([MACFrame()])
+ # Test with a a MAC Frame
+ self.macframequeue.add(MACFrame())
+ self.assertEqual(len(self.macframequeue.get_macframelist()), 2)
+ # Test with a Python list of MAC Frames
+ self.macframequeue.add([MACFrame(), MACFrame(), MACFrame()])
+ self.assertEqual(len(self.macframequeue.get_macframelist()), 5)
+
+ def test_sendnrecv(self):
+ # Tested in 'py/test_tx_rx.py' because another station is needed for this test
+ # Here, just test the timeout
+ prepare_rx(self.sta, pb_nb=3)
+ self.macframequeue.set_fc_10(123)
+ self.macframequeue.set_fc_av(pack('IIII', 123, 456, 789, 10))
+ list = [MACFrame(), MACFrame(), MACFrame()]
+ self.macframequeue.set_macframelist(list)
+ t = (self.m.get_date() + 250000 - 1) / 250000
+ d = 250000
+ rsp = self.macframequeue.sendnrecv(self.m, timeout=d)
+ self.assertEqual(rsp, None)
+ self.assertEqual(self.m.get_date(), (t + 1) * 250000)
+
+ def test_send(self):
+ prepare_rx(self.sta, pb_nb=3)
+ self.macframequeue.set_fc_10(123)
+ self.macframequeue.set_fc_av(pack('IIII', 123, 456, 789, 10))
+ list = [MACFrame(), MACFrame(), MACFrame()]
+ self.macframequeue.set_macframelist(list)
+ self.macframequeue.send(self.m)
+ self.assertEqual(self.macframequeue.get_macframelist(), [])
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestMACFrameQueueFunctions))
+
+class TestMPDUFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.mpdu = MPDU()
+
+ def tearDown(self):
+ pass
+
+ def test_set_fc_10(self):
+ # Test with an FC 1.0 object
+ fc10 = FC_10()
+ self.mpdu.set_fc_10(fc10)
+ self.assertEqual(self.mpdu.get_fc_10(), fc10.get())
+ # Test with a Python long
+ fc10 = 0x12345678
+ self.mpdu.set_fc_10(fc10)
+ self.assertEqual(self.mpdu.get_fc_10(), fc10)
+ # Test with a Python string
+ fc10 = 'ABCD'
+ self.mpdu.set_fc_10(fc10)
+ self.assertEqual(self.mpdu.get_fc_10(),unpack('I', fc10)[0])
+
+ def test_set_fc_av(self):
+ # Test with an FC AV object
+ fcav = FC_AV()
+ self.mpdu.set_fc_av(fcav)
+ self.assertEqual(self.mpdu.get_fc_av(), fcav.get())
+ # Test with a Python string
+ fcav = 4*'ABCD'
+ self.mpdu.set_fc_av(fcav)
+ self.assertEqual(self.mpdu.get_fc_av(), unpack('IIII',fcav))
+ # Test with a tuple of 4 Python longs
+ fcav = (0, 1, 2, 3)
+ self.mpdu.set_fc_av(fcav)
+ self.assertEqual(self.mpdu.get_fc_av(), fcav)
+
+ def test_set_iv(self):
+ # Test with a Python string
+ iv = 3*'0123'
+ self.mpdu.set_iv(iv)
+ self.assertEqual(self.mpdu.get_iv(), iv)
+
+ def test_set_nek(self):
+ # Test with a Python string
+ nek = 4*'0123'
+ self.mpdu.set_nek(nek)
+ self.assertEqual(self.mpdu.get_nek(), nek)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestMPDUFunctions))
+
+class TestMSDUFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.msdu = MSDU()
+
+ def tearDown(self):
+ pass
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestMSDUFunctions))
+
+class TestPBFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.pb = PB()
+
+ def tearDown(self):
+ pass
+
+ def test_set_mpdu_attr(self):
+ fc10 = 10
+ fcav = (0, 1, 2, 3)
+ data = 'ABCDEFGH'
+ payload = MAX_PB_NB_PER_MPDU * data * (SIZE_OF_PB520 / len(data))
+ self.pb.set_mpdu_attr(fc10, fcav, payload)
+ self.assertEqual(self.pb.get_fc_10(), fc10)
+ self.assertEqual(self.pb.get_fc_av(), fcav)
+ self.assertEqual(len(self.pb.get_pblist()), MAX_PB_NB_PER_MPDU)
+
+ def test_create_pb(self):
+ self.assertNotEqual(create_pb(), None)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestPBFunctions))
+
+class TestPBHeaderFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.header = PBHeader()
+
+ def tearDown(self):
+ pass
+
+ def test_get(self):
+ self.assertEqual(self.header.get(), 0x0A000000)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestPBHeaderFunctions))
+
+try:
+ suite.addTest(doctest.DocTestSuite(create))
+ suite.addTest(doctest.DocTestSuite(fc_10))
+ suite.addTest(doctest.DocTestSuite(fc_av))
+ suite.addTest(doctest.DocTestSuite(macframe))
+ suite.addTest(doctest.DocTestSuite(macframeheader))
+ suite.addTest(doctest.DocTestSuite(mpdu))
+ suite.addTest(doctest.DocTestSuite(msdu))
+ suite.addTest(doctest.DocTestSuite(pb))
+ suite.addTest(doctest.DocTestSuite(pbheader))
+except ValueError:
+ print "has no tests"
+
+if __name__ == '__main__':
+ testResult = unittest.TextTestRunner(verbosity=2).run(suite)