summaryrefslogtreecommitdiff
path: root/cesar/test_general
diff options
context:
space:
mode:
authorJean-Philippe SAVE2011-03-07 13:55:35 +0100
committerJean-Philippe SAVE2011-03-07 13:55:35 +0100
commit0d478fa18ccfd9e4b5611d865bb23c3fec0dd17d (patch)
tree5457d8e2a6174f80c16175def0ca65d6b0092f41 /cesar/test_general
parent65da0099ae4a5bfdfc9dc817ef64999abd5905a1 (diff)
parentc7d6cbfdd7b58339ba3eda51ef001b8674f3a0a1 (diff)
Merge branch 'master' into av
Conflicts: cesar/mac/pbproc/src/fsm_tx_data.c
Diffstat (limited to 'cesar/test_general')
-rw-r--r--cesar/test_general/station/cco0/s2/py/sc16_igmp.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/cesar/test_general/station/cco0/s2/py/sc16_igmp.py b/cesar/test_general/station/cco0/s2/py/sc16_igmp.py
new file mode 100644
index 0000000000..99dda843f3
--- /dev/null
+++ b/cesar/test_general/station/cco0/s2/py/sc16_igmp.py
@@ -0,0 +1,202 @@
+#!/usr/bin/python
+
+#############################################################################
+# Copyright (C) 2011 Spidcom
+#
+# Test of the multicast behavior
+#############################################################################
+
+import os
+import unittest
+import sys
+
+# Add Maximus and CSI to PATH module lookup.
+sys.path.append ('../../../../maximus/python/tools/csi/')
+sys.path.append ('../../../../maximus/python/obj/')
+sys.path.append ('../../../../maximus/python/')
+sys.path.append ('../../../../maximus/python/lib/cesar')
+
+from csicore import *
+from sta_mgr import Sta_mgr
+from maximus import *
+
+csi = csiCore (1234)
+
+# Create the AVLN.
+avln_sta_nb = 5
+avln = csi.avln_add ("Homeplug_AVLN", "AVLN")
+avln_stas = list ()
+for i in range (0, avln_sta_nb):
+ mac = "00:13:d7:00:01:%02x" % (i)
+ avln_stas.append (avln.sta_add (mac, False, False,
+ "HomePlug_AVLN_Station%d" % (i), "av_spc300_sta%d" % (i),
+ "avln_station%d" % (i), 1, False))
+
+# Launch the AVLN
+csi.process_init (sys.argv + ['-e', 'obj/cco0s2.elf'])
+csi.process_avln_launch (avln)
+csi.process_wait_association (15)
+csi.process_wait_authentication (15)
+
+# Create a sta manager.
+sta_data = Sta_mgr()
+# Get maximus.
+maximus = csi.get_maximus()
+
+# our reference stations will be 00:13:d7:00:01:00 -> ref_sta_cesar
+for i in range (0, avln_sta_nb):
+ mac_ref = "00:13:d7:00:01:%02x" % (0)
+ if avln.get_sta(i).get_mac_addr() == mac_ref:
+ ref_sta_cesar = avln.get_sta(i).get_sta_cesar()
+ break
+
+# wait for the mactotei table to be completed
+mac2tei = {}
+print ""
+for i in range (1, avln_sta_nb):
+ mac = (0x00, 0x13, 0xd7, 00, 1, i)
+ tei = sta_data.get_tei_from_mac(maximus, ref_sta_cesar, mac)
+ # check the mactotei is completed before copying it
+ while (tei == 0):
+ csi.process_wait_sec (1)
+ tei = sta_data.get_tei_from_mac(maximus, ref_sta_cesar, mac)
+ mac2tei[mac] = tei
+
+def test_mactotei_is_valid(self):
+ """ check the mactotei contains the mac and tei of the neighbors"""
+ for i in range (1, avln_sta_nb):
+ mac = (0x00, 0x13, 0xd7, 00, 1, i)
+ tei = sta_data.get_tei_from_mac(maximus, ref_sta_cesar, mac)
+ self.failUnless (tei == mac2tei[mac])
+
+def test_mactotei_no_group(self):
+ """ check there is no group in the mactotei table """
+ for i in range (1, avln_sta_nb):
+ mac_group = (0x01, 0x00, 0x5E, i, i, i)
+ tei = sta_data.get_tei_from_mac(maximus, ref_sta_cesar, mac_group)
+ self.failUnless (tei == 0)
+
+def data_msdu_filter (msdu):
+ if msdu.get_type () is 'ETHERNET_TYPE_MME':
+ # The received object is an MSDU
+ return True
+ else:
+ return False
+
+def send_mme_DRV_MCAST_SET_LIST (self, payload):
+ """ send DRV_MCAST_SET_REQ to ref_sta_cesar """
+ # Create and send mme message
+ dst_mac = "00:13:d7:00:01:00"
+ src_mac = "10:12:13:14:15:16"
+ MMTYPE_DRV_MCAST_SET_LIST = 0xB0A8
+
+ mme = MME (MMHeader=MMHeader (dst_mac, src_mac,
+ MTYPE=0x88E1,
+ MMV=1,
+ MMTYPE=MMTYPE_DRV_MCAST_SET_LIST,
+ FMI=0),
+ MMEntry=MMEntry(payload))
+ mme.send (maximus, ref_sta_cesar)
+
+ # Wait for response
+ rsp = recv (maximus, count=1, filter=data_msdu_filter,
+ timeout=50000000)
+ # Fail in case of no response
+ self.failUnless (rsp != None)
+ rsp = rsp[0]
+ entry = rsp.get_mmentry()
+ head = rsp.get_mmheader()
+ mmtype = unpack ("H", head[15:17])[0]
+
+ # Check that mmtype is correct
+ self.failUnless (mmtype == MMTYPE_DRV_MCAST_SET_LIST + 1)
+ result = unpack ("B", entry[0:1])[0]
+ return (result)
+
+class TestIGMP(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testIT (self):
+ # test mactotei contains the neighbors
+ test_mactotei_is_valid(self)
+ # check the mactotei table doesn't contain any group yet
+ test_mactotei_no_group(self)
+
+ # Send an empty list to the station
+ payload = pack ('B', 0)
+ (result) = send_mme_DRV_MCAST_SET_LIST (self, payload)
+ self.failUnless (result == 0)
+
+ # test mactotei contains the neighbors
+ test_mactotei_is_valid(self)
+ # check the mactotei table doesn't contain any group yet
+ test_mactotei_no_group(self)
+
+ # send an invalid list to the reference station
+ data = [avln_sta_nb - 1]
+ for i in range (1, avln_sta_nb):
+ mac_group = (0x01, 0x01, 0x5E, i, i, i)
+ mac_member = (0x00, 0x13, 0xD7, 0, 1, i)
+ data.extend (mac_group)
+ data.append (0x01)
+ data.extend (mac_member)
+ #Convert entry data into a single string
+ payload = ""
+ for i in data:
+ payload += pack('B', i)
+
+ (result) = send_mme_DRV_MCAST_SET_LIST (self, payload)
+ self.failUnless (result == 1)
+
+ # test mactotei contains the neighbors
+ test_mactotei_is_valid(self)
+ # check the mactotei table doesn't contain any group yet
+ test_mactotei_no_group(self)
+
+ # send a valid list to the reference station
+ data = [avln_sta_nb - 1]
+ for i in range (1, avln_sta_nb):
+ mac_group = (0x01, 0x00, 0x5E, i, i, i)
+ mac_member = (0x00, 0x13, 0xD7, 0x00, 0x01, i)
+ data.extend (mac_group)
+ data.append (0x01)
+ data.extend (mac_member)
+ #Convert entry data into a single string
+ payload = ""
+ for i in data:
+ payload += pack('B', i)
+
+ (result) = send_mme_DRV_MCAST_SET_LIST (self, payload)
+ self.failUnless (result == 0)
+
+ # test mactotei contains the neighbors
+ test_mactotei_is_valid(self)
+ # Check the groups have been added to the mactotei
+ for i in range (1, avln_sta_nb):
+ mac_group = (0x01, 0x00, 0x5E, i, i , i)
+ mac_member = (0x00, 0x13, 0xd7, 0x00, 0x01, i)
+ tei = sta_data.get_tei_from_mac(maximus, ref_sta_cesar, mac_group)
+ self.failUnless (tei == mac2tei[mac_member])
+
+ # Send an empty list to remove the multicast entries
+ data = pack ('B', 0)
+ (result) = send_mme_DRV_MCAST_SET_LIST (self, data)
+ self.failUnless (result == 0)
+
+ # test mactotei contains the neighbors
+ test_mactotei_is_valid(self)
+ # check the mactotei table doesn't contain any group again
+ test_mactotei_no_group(self)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestIGMP)
+testResult = unittest.TextTestRunner(verbosity=2).run(suite)
+csi.process_avlns_stop ()
+# For nightly build errors
+sys.exit ((1, 0)[testResult.wasSuccessful ()])
+# Killall stations.
+os.system ('killall cco0s2.elf')