summaryrefslogtreecommitdiff
path: root/cesar/test_general/station
diff options
context:
space:
mode:
authorNélio Laranjeiro2011-11-10 09:42:03 +0100
committerNélio Laranjeiro2011-11-28 17:12:37 +0100
commita5f76322b91d4974f71c835ebb8318e99e34852e (patch)
tree4361402f2a01a20f85f74ca8fbbb05affb17926e /cesar/test_general/station
parent079a3144b2d48bcb6f0810c857239b832180e58b (diff)
cesar: update and moved cco0/s1 cc_who_ru to scenario directory
Diffstat (limited to 'cesar/test_general/station')
-rw-r--r--cesar/test_general/station/cco0/s1/py/sc4_cc_who_ru.py76
-rw-r--r--cesar/test_general/station/scenario/Makefile2
-rw-r--r--cesar/test_general/station/scenario/py/sc04_cc_whoru.py83
3 files changed, 84 insertions, 77 deletions
diff --git a/cesar/test_general/station/cco0/s1/py/sc4_cc_who_ru.py b/cesar/test_general/station/cco0/s1/py/sc4_cc_who_ru.py
deleted file mode 100644
index 566cad778c..0000000000
--- a/cesar/test_general/station/cco0/s1/py/sc4_cc_who_ru.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/python
-
-##############################################################################
-# CC_WHO_ARE_YOU #
-# ----------------------------------------------------------------------- #
-# #
-# Objective: One station which receives the request from the linux and #
-# answers it. #
-##############################################################################
-
-import unittest
-import sys
-
-sys.path.append ('../../../../maximus/python/tools/csi/')
-sys.path.append ('../../../../maximus/python/obj/')
-sys.path.append ('../../../../maximus/python/')
-sys.path.append ('../../../../maximus/python/lib/cesar')
-
-from csicore import *
-MACSDEFAULT = "00:13:d7:00:0%x:%02x"
-csi = csiCore (1234)
-# Creating an AVLN.
-avln1 = csi.avln_add ("Homeplug_AVLN1", "AVLN1")
-avln1_sta_nb = 2
-# Adding the stations.
-avln1_stas = list ()
-for i in range (0, avln1_sta_nb):
- avln1_stas.append (avln1.sta_add (MACSDEFAULT % (1, i+1), False, False,
- "HomePlug_AVLN1_Station%d" % (i+1),
- "av1_spc300_sta%d" % (i+1),
- "avln1_station%d" % (i+1), 1, False))
-csi.process_init (sys.argv + ['-e', 'obj/cco0s1.elf'])
-csi.process_avlns_launch ()
-csi.process_wait_association ()
-
-class TestInitFunctions(unittest.TestCase):
- def setUp(self):
- self.sta = avln1_stas[0]
- self.cco = avln1_stas[avln1_sta_nb - 1]
-
- def tearDown(self):
- pass
-
- def testWhoAreYou(self):
- from maximus.mme.mme import MME
- from maximus.mme.mmheader import MMHeader
- from maximus.mme.mmentry import MMEntry
- from maximus.simu.rx import recv
- from struct import pack, unpack
- from csifilter import frame_filter_msdu_mme
- from csitime import sec_to_tck
- mme = MME(MMHeader=MMHeader(ODA=self.cco.get_mac_addr(),
- OSA=self.cco.get_mac_addr(),
- MTYPE=0x88E1, MMV=1, MMTYPE=0x002C, FMI=0),
- MMEntry=MMEntry(pack ('Q',8910861401412611)))
- mme.send(csi.get_maximus(), self.cco.get_sta_cesar().get())
- rsp = recv(csi.get_maximus(), count=1, filter=frame_filter_msdu_mme,
- timeout = sec_to_tck (2))
- self.failUnless (rsp != None)
- if rsp != None:
- rsp = rsp[0]
- entry = rsp.get_mmentry()
- nid = hex(unpack ('Q', entry[0:7] + '\0')[0])
- cco_mac = hex(unpack ('Q', entry[7:13] + "\0\0")[0])
- hfid = entry[13:13 + len(avln1.get_ahfid())]
- self.failUnless (nid.lower() == '0x1FA86198795403L'.lower())
- self.failUnless (hfid == avln1.get_ahfid())
-
-suite = unittest.TestLoader().loadTestsFromTestCase(TestInitFunctions)
-testResult = unittest.TextTestRunner(verbosity=2).run(suite)
-
-# Delete the station.
-csi.process_avlns_remove ()
-
-# For nightly build errors
-sys.exit ((1, 0)[testResult.wasSuccessful ()])
diff --git a/cesar/test_general/station/scenario/Makefile b/cesar/test_general/station/scenario/Makefile
index c2793fff32..6ee3acad3e 100644
--- a/cesar/test_general/station/scenario/Makefile
+++ b/cesar/test_general/station/scenario/Makefile
@@ -12,7 +12,7 @@ test_scenario_MODULES = lib mac/common mac cl hle interface cp hal station \
include $(BASE)/common/make/top.mk
testbook: py/sc01_assoc_auth.py py/sc02_stas_communication.py \
- py/sc03_two_avln_coexisting.py
+ py/sc03_two_avln_coexisting.py py/sc04_cc_whoru.py
python testbook.py $^ > $@.rst
CLEAN_FILES += testbook.rst
diff --git a/cesar/test_general/station/scenario/py/sc04_cc_whoru.py b/cesar/test_general/station/scenario/py/sc04_cc_whoru.py
new file mode 100644
index 0000000000..a1144944de
--- /dev/null
+++ b/cesar/test_general/station/scenario/py/sc04_cc_whoru.py
@@ -0,0 +1,83 @@
+#!/usr/bin/python
+
+#############################################################################
+# Copyright (C) 2011 Spidcom
+#############################################################################
+
+import sys
+sys.path.append ('py')
+from scenario_init import *
+
+class TestWhoAreYou (unittest.TestCase):
+ """Send a CC_WHO_RU.REQ to each station of the station."""
+
+ def setUp (self):
+ self.csi = csiCore (1234)
+ self.csi.process_init (args)
+ self.csi.init_test_bed ([2])
+ self.computer_mac = "10:10:10:10:10:10"
+ self.get_cco_and_sta ()
+
+ def tearDown (self):
+ self.csi.uninit_test_bed ()
+ self.csi.process_uninit ()
+ del (self.csi)
+
+ def get_cco_and_sta (self):
+ """Parse the AVLN and store in self, the cco and the sta reference.
+ """
+ maximus = self.csi.get_maximus ()
+ avln = self.csi.avln_get (0)
+ sta_data = Station_own_data ()
+ for i in range (avln.get_nb_sta ()):
+ station = avln.get_sta (i)
+ data = sta_data.get_data (maximus, station.get_sta_cesar ())
+ self.avln_nid = data.nid
+ # Too possibilities, the station is CCo or STA.
+ if data.is_cco:
+ cco = station
+ else:
+ sta = station
+ self.failUnless (data.tei > 0 and data.tei < 255)
+ self.failUnless (sta)
+ self.failUnless (cco)
+ self.sta = sta
+ self.cco = cco
+ # Convert the nid from string to int.
+
+ def send_cc_who_ru_req_and_read_cnf (self, sta):
+ """Send a CC_WHO_RU.REQ MME to the sta and read the confirmation."""
+ req = Ether (dst = sta.get_mac_addr (), src = self.computer_mac) \
+ / scammer.MME (mmtype = scammer.HPAV_MMTYPES ['CC_WHO_RU_REQ']) \
+ / scammer.CC_WHO_RU_REQ (nid = int (self.avln_nid))
+ send_mme (self.csi.get_maximus(), sta, req)
+ rsp = recv (self.csi.get_maximus(), timeout = 10,
+ filter = frame_filter_msdu_mme, count = 3)
+ if sta is self.cco:
+ self.failUnless (rsp)
+ cnf = Ether (rsp[0].get ())
+ self.failUnless (cnf.src == sta.get_mac_addr ())
+ self.failUnless (cnf.mac == self.cco.get_mac_addr ())
+ self.failUnless (cnf.nid == req.nid)
+ else:
+ self.failUnless (not rsp)
+
+ def testFromComputer (self):
+ """Sends a CC_WHO_RU.REQ from a computer. Wait for the answer
+ CC_WHO_RU.CNF from the station.
+
+ 1. Start an AVLN with 2 stations.
+ 2. Send a CC_WHO_RU.REQ MME to the CCo.
+ 3. Check the CCo answered to it.
+ 4. Send a CC_WHO_RU.REQ MME to the STA.
+ 5. The STA should not answer.
+
+ Excepted results: Only the CCo should answer.
+ """
+ self.send_cc_who_ru_req_and_read_cnf (self.cco)
+ self.send_cc_who_ru_req_and_read_cnf (self.sta)
+
+if __name__ == '__main__':
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestWhoAreYou)
+ testResult = unittest.TextTestRunner(verbosity=2).run(suite)
+ sys.exit ((1, 0)[testResult.wasSuccessful ()])