summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cesar/common/tests/tests3
-rw-r--r--cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication.py184
-rw-r--r--cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication_multiple_frames.py200
3 files changed, 28 insertions, 359 deletions
diff --git a/cesar/common/tests/tests b/cesar/common/tests/tests
index fe0f0d39ca..7d630e54cf 100644
--- a/cesar/common/tests/tests
+++ b/cesar/common/tests/tests
@@ -400,13 +400,12 @@ core_timer: ./obj/simu/core_timer.elf
test_general/station/cco0/s1:
make
-init: python py/init.py -d false -t 2500000000
+init: python py/init.py -d false -t 2500000000
cm_unassociated_sta: python py/cm_unassociated_sta.py -d false -t 2500000000
sc1_one_sta_mac_start_cco_pref: python py/sc1_one_sta_mac_start_cco_pref.py -d false -t 2500000000
sc1_one_sta_mac_start: python py/sc1_one_sta_mac_start.py -d false -t 2500000000
sc2_two_sta_assoc_one_cco_pref: python py/sc2_two_sta_assoc_one_cco_pref.py -d false -t 2500000000
sc2_two_sta_assoc: python py/sc2_two_sta_assoc.py -d false -t 2500000000
-sc3_two_sta_communication_multiple_frames: python py/sc3_two_sta_communication_multiple_frames.py -d false -t 2500000000
sc3_two_sta_communication: python py/sc3_two_sta_communication.py -d false -t 2500000000
sc4_cc_who_ru: python py/sc4_cc_who_ru.py -d false -t 2500000000
sc5_cc_leave: python py/sc5_cc_leave.py -d false -t 2500000000
diff --git a/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication.py b/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication.py
index 55e94dbcd0..211233a57b 100644
--- a/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication.py
+++ b/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication.py
@@ -11,173 +11,43 @@
# Hint: The two station have the same NID. #
##############################################################################
-import sys
-sys.path.append('../../../../maximus/python/obj');
-sys.path.append('../../../../maximus/python');
-sys.path.append('../../../../maximus/python/lib/cesar');
-
-from interface import *
-from string import *
-from struct import *
-from maximus import *
-from sta_cesar import STACesar
-from own_data import Station_own_data
-from sniffer import *
-
import unittest
+import sys
-def ms_to_tck (ms):
- return (ms * 25000)
-
-
-def my_mpdu_filter(mpdu):
- if mpdu.get_type() is 'PHY_TYPE_MPDU_PAYLOAD':
- # The received object is an MPDU
- return True
- else:
- return False
-
-def data_msdu_filter (msdu):
- if msdu.get_type () is 'ETHERNET_TYPE_DATA':
- # The received object is an MSDU
- return True
- else:
- return False
-
-# Initialise Maximus with the station.
-maximus = Maximus()
-maximus.init (sys.argv + ['-e', './obj/cco0s1.elf'])
-
-realloc_buffer ( True )
-
-# Configure the first station.
-sta1_conf = Config ()
-sta1_conf.mac_address = (0x10, 0x11, 0x12, 0x13, 0x14, 0x15)
-sta1_conf.cco_preference = True
-sta1_conf.was_cco = True
-sta1_conf.npw = "HomePlugAV0123"
-sta1_conf.dpw = "STATION_DEVICE_PASSWORD"
-sta1_conf.m_sta_hfid = "HPAV_STATION"
-sta1_conf.u_sta_hfid = "STA1"
-sta1_conf.avln_hfid = "AVLN1"
-sta1_conf.tonemask = (85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535)
-sta1_conf.sl = 1
-sta1_conf.snid = None
-
-# Configure the second station.
-sta2_conf = Config ()
-sta2_conf.mac_address = (0x20, 0x21, 0x22, 0x23, 0x24, 0x25)
-sta2_conf.cco_preference = False
-sta2_conf.was_cco = False
-sta2_conf.npw = "HomePlugAV0123"
-sta2_conf.dpw = "STATION_DEVICE_PASSWORD"
-sta2_conf.m_sta_hfid = "HPAV_STATION"
-sta2_conf.u_sta_hfid = "STA2"
-sta2_conf.avln_hfid = "AVLN1"
-sta2_conf.tonemask = (85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535)
-sta2_conf.sl = 1
-sta2_conf.snid = None
-
-sta1_mac_addr = '10:11:12:13:14:15'
-sta2_mac_addr = '20:21:22:23:24:25'
-
-# Create the station
-print ""
-print "Starting the test ..."
-sta1 = STACesar (maximus, config=sta1_conf, mme_buffer_nb=1, debug=False, config_mode="MME", seed = 0x2345)
-sta2 = STACesar (maximus, config=sta2_conf, mme_buffer_nb=1, debug=False, config_mode="MME", seed = 0x1234)
-
-maximus.wait (50000000)
-
-class TestInitFunctions(unittest.TestCase):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def testAuthenticateSta1(self):
- sta_data = Station_own_data ()
- data = sta_data.get_data (maximus, sta1)
+sys.path.append ('../../../../maximus/python/tools/csi/')
+sys.path.append ('../../../../maximus/python/obj/')
+sys.path.append ('../../../../maximus/python/')
+sys.path.append ('../../../../maximus/python/lib/cesar')
- self.failUnless (data.mac_address == sta1_conf.mac_address)
- self.failUnless (data.cco_prefered == sta1_conf.cco_preference)
- # The station become CCo so the was_cco status have evolved.
- self.failUnless (data.was_cco == True)
- self.failUnless (data.npw == sta1_conf.npw)
- self.failUnless (data.dpw == sta1_conf.dpw)
- self.failUnless (data.hfid_manufacturer == sta1_conf.m_sta_hfid)
- self.failUnless (data.hfid_user == sta1_conf.u_sta_hfid)
- self.failUnless (data.security_level == sta1_conf.sl)
- # The station is CCo and automatically authenticated.
- self.failUnless (data.tei != 0)
- self.failUnless (data.authenticated != 0)
- self.failUnless (data.is_cco != 0)
+from csicore import *
- def testAuthenticateSta2(self):
- sta_data = Station_own_data ()
- data = sta_data.get_data (maximus, sta2)
+csi = csiCore (1234)
- self.failUnless (data.mac_address == sta2_conf.mac_address)
- self.failUnless (data.cco_prefered == sta2_conf.cco_preference)
- # The station become CCo so the was_cco status have evolved.
- self.failUnless (data.was_cco == sta2_conf.was_cco)
- self.failUnless (data.npw == sta2_conf.npw)
- self.failUnless (data.dpw == sta2_conf.dpw)
- self.failUnless (data.hfid_manufacturer == sta2_conf.m_sta_hfid)
- self.failUnless (data.hfid_user == sta2_conf.u_sta_hfid)
- self.failUnless (data.security_level == sta2_conf.sl)
- self.failUnless (data.tei != 0)
- self.failUnless (data.authenticated != 0)
- self.failUnless (data.is_cco == 0)
+# Creating an AVLN.
+avln = csi.avln_add ("Homeplug_AVLN1", "AVLN1")
- def testDataTransmission1 (self):
- alloc_data_buffer (maximus, sta1)
- frame = Eth()
- frame.dst = sta1_mac_addr
- frame.src = sta2_mac_addr
- frame.type = 0x4F50
- frame.payload = "Hello world, this is only a test to send data over the PWL with two stations."
- frame.send (maximus, sta2)
-
- rsp = recv(maximus, count=1, filter=data_msdu_filter, timeout = 100000)
- self.failUnless (rsp != None)
- if rsp != None:
- rsp = rsp[0]
- self.failUnless (rsp.dst == frame.dst)
- self.failUnless (rsp.src == frame.src)
- self.failUnless (rsp.type == frame.type)
- self.failUnless (rsp.payload == frame.payload)
+sta1_debug = False
+sta2_debug = False
- def testDataTransmission2 (self):
- alloc_data_buffer (maximus, sta2)
- frame = Eth()
- frame.dst = sta2_mac_addr
- frame.src = sta1_mac_addr
- frame.type = 0x4F50
- frame.payload = "Hello I'm the station 1, this is only a test to send data over the PWL with two stations."
- frame.send (maximus, sta1)
-
- rsp = recv(maximus, count=1, filter=data_msdu_filter, timeout = 100000)
- self.failUnless (rsp != None)
- if rsp != None:
- rsp = rsp[0]
- self.failUnless (rsp.dst == frame.dst)
- self.failUnless (rsp.src == frame.src)
- self.failUnless (rsp.type == frame.type)
- self.failUnless (rsp.payload == frame.payload)
+sta1 = avln.sta_add ("10:11:12:13:14:15", False, False, "Homeplug_Station1",
+ "spidcom_spc300_sta1", "station1", 1, sta1_debug)
+sta2 = avln.sta_add ("20:22:22:23:24:25", False, False, "Homeplug_Station2",
+ "spidcom_spc300_sta2", "station2", 1, sta2_debug)
-suite = unittest.TestLoader().loadTestsFromTestCase(TestInitFunctions)
+csi.process_init (sys.argv + ['-e', 'obj/cco0s1.elf'])
+csi.process_avlns_launch ()
+csi.process_wait_association ()
+csi.process_wait_authentication ()
+csi.authentication_status (avln)
-testResult = unittest.TextTestRunner(verbosity=2).run(suite)
+packets = csi.avln_create_traffic (avln, 10)
-# Delete the station.
-print "Removing STA 1 ..."
-sta1.remove()
-print "Removing STA 2 ..."
-sta2.remove()
+csi.process_data_send_traffic (packets)
+csi.process_wait_sec (3)
+csi.process_verify_transmission ()
+csi.process_avlns_remove()
+result = csi.process_end_get_result ()
# For nightly build errors
-sys.exit ((1, 0)[testResult.wasSuccessful ()])
+sys.exit ((1, 0)[result])
diff --git a/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication_multiple_frames.py b/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication_multiple_frames.py
deleted file mode 100644
index 7ff102fde5..0000000000
--- a/cesar/test_general/station/cco0/s1/py/sc3_two_sta_communication_multiple_frames.py
+++ /dev/null
@@ -1,200 +0,0 @@
-#!/usr/bin/env python
-
-##############################################################################
-# Two station with the Mac start order #
-# ---------------------------------------------------------------------- #
-# #
-# Objective: Two station which starts at the same time, one will become #
-# the CCo the other one shall become an unassociated station. #
-# At this point the station shall try an association with the #
-# CCo. #
-# Hint: The two station have the same NID. #
-##############################################################################
-
-import sys
-sys.path.append('../../../../maximus/python/obj');
-sys.path.append('../../../../maximus/python');
-sys.path.append('../../../../maximus/python/lib/cesar');
-
-from interface import *
-from string import *
-from struct import *
-from maximus import *
-from sta_cesar import STACesar
-from own_data import Station_own_data
-from sniffer import *
-
-import unittest
-
-def my_mpdu_filter(mpdu):
- if mpdu.get_type() is 'PHY_TYPE_MPDU_PAYLOAD':
- # The received object is an MPDU
- return True
- else:
- return False
-
-def data_msdu_filter (msdu):
- if msdu.get_type () is 'ETHERNET_TYPE_DATA':
- # The received object is an MSDU
- return True
- else:
- return False
-
-# Initialise Maximus with the station.
-maximus = Maximus()
-maximus.init (sys.argv + ['-e', './obj/cco0s1.elf'])
-
-realloc_buffer ( True )
-
-# Configure the first station.
-sta1_conf = Config ()
-sta1_conf.mac_address = (0x10, 0x11, 0x12, 0x13, 0x14, 0x15)
-sta1_conf.cco_preference = True
-sta1_conf.was_cco = True
-sta1_conf.npw = "HomePlugAV0123"
-sta1_conf.dpw = "STATION_DEVICE_PASSWORD"
-sta1_conf.m_sta_hfid = "HPAV_STATION"
-sta1_conf.u_sta_hfid = "STA1"
-sta1_conf.avln_hfid = "AVLN1"
-sta1_conf.tonemask = (85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535)
-sta1_conf.sl = 1
-sta1_conf.snid = None
-
-# Configure the second station.
-sta2_conf = Config ()
-sta2_conf.mac_address = (0x20, 0x21, 0x22, 0x23, 0x24, 0x25)
-sta2_conf.cco_preference = False
-sta2_conf.was_cco = False
-sta2_conf.npw = "HomePlugAV0123"
-sta2_conf.dpw = "STATION_DEVICE_PASSWORD"
-sta2_conf.m_sta_hfid = "HPAV_STATION"
-sta2_conf.u_sta_hfid = "STA2"
-sta2_conf.avln_hfid = "AVLN1"
-sta2_conf.tonemask = (85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535)
-sta2_conf.sl = 1
-sta2_conf.snid = None
-
-sta1_mac_addr = '10:11:12:13:14:15'
-sta2_mac_addr = '20:21:22:23:24:25'
-
-# Create the station
-print ""
-print "Starting the test ..."
-sta1 = STACesar (maximus, config=sta1_conf, mme_buffer_nb=1, debug=False, config_mode="MME", seed = 0x2345)
-sta2 = STACesar (maximus, config=sta2_conf, mme_buffer_nb=1, debug=False, config_mode="MME", seed = 0x1234)
-
-maximus.wait (50000000)
-
-
-class TestInitFunctions(unittest.TestCase):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def testAuthenticateSta1(self):
- sta_data = Station_own_data ()
- data = sta_data.get_data (maximus, sta1)
-
- self.failUnless (data.mac_address == sta1_conf.mac_address)
- self.failUnless (data.cco_prefered == sta1_conf.cco_preference)
- # The station become CCo so the was_cco status have evolved.
- self.failUnless (data.was_cco == True)
- self.failUnless (data.npw == sta1_conf.npw)
- self.failUnless (data.dpw == sta1_conf.dpw)
- self.failUnless (data.hfid_manufacturer == sta1_conf.m_sta_hfid)
- self.failUnless (data.hfid_user == sta1_conf.u_sta_hfid)
- self.failUnless (data.security_level == sta1_conf.sl)
- # The station is CCo and automatically authenticated.
- self.failUnless (data.tei != 0)
- self.failUnless (data.authenticated != 0)
- self.failUnless (data.is_cco != 0)
-
- def testAuthenticateSta2(self):
- sta_data = Station_own_data ()
- data = sta_data.get_data (maximus, sta2)
-
- self.failUnless (data.mac_address == sta2_conf.mac_address)
- self.failUnless (data.cco_prefered == sta2_conf.cco_preference)
- # The station become CCo so the was_cco status have evolved.
- self.failUnless (data.was_cco == sta2_conf.was_cco)
- self.failUnless (data.npw == sta2_conf.npw)
- self.failUnless (data.dpw == sta2_conf.dpw)
- self.failUnless (data.hfid_manufacturer == sta2_conf.m_sta_hfid)
- self.failUnless (data.hfid_user == sta2_conf.u_sta_hfid)
- self.failUnless (data.security_level == sta2_conf.sl)
- self.failUnless (data.tei != 0)
- self.failUnless (data.authenticated != 0)
- self.failUnless (data.is_cco == 0)
-
- def testDataTransmission1 (self):
- alloc_data_buffer (maximus, sta1)
- frame = Eth()
- frame.dst = sta1_mac_addr
- frame.src = sta2_mac_addr
- frame.type = 0x4F50
- frame.payload = "Hello world, this is only a test to send data over the PWL with two stations."
- frame.send (maximus, sta2)
-
- rsp = recv(maximus, count=1, filter=data_msdu_filter, timeout = 100000)
- self.failUnless (rsp != None)
- if rsp != None:
- rsp = rsp[0]
- self.failUnless (rsp.dst == frame.dst)
- self.failUnless (rsp.src == frame.src)
- self.failUnless (rsp.type == frame.type)
- self.failUnless (rsp.payload == frame.payload)
-
- def testDataTransmission2 (self):
- alloc_data_buffer (maximus, sta2)
- frame = Eth()
- frame.dst = sta2_mac_addr
- frame.src = sta1_mac_addr
- frame.type = 0x4F50
- frame.payload = "Hello I'm the station 1, this is only a test to send data over the PWL with two stations."
- frame.send (maximus, sta1)
-
- alloc_data_buffer (maximus, sta2)
- frame2 = Eth()
- frame2.dst = sta2_mac_addr
- frame2.src = sta1_mac_addr
- frame2.type = 0x4F50
- frame2.payload = "SPiDCOM Technologies is a fabless semi-conductor company. We specialise in integrated circuits and Linux-based software bundles for broadband Access & In-Home networks applications over any wire (powerline, coax, phoneline). We provide integrated circuits, reference designs, evaluation kits and software to our OEM/ODM partners, and help them develop their end products (SPiDCOM does not manufacture end products itself). PLC (PowerLine Communication) is a technology that enables the transferring of digital data over the classical electrical wires, converting them into a high-speed communication medium. It consists of adding to the low frequency electrical signal (50/60 Hz) a new one, higher in the spectrum (between 2 MHz and 30 Mhz)."
- frame2.send (maximus, sta1)
-
- rsp = recv(maximus, count=2, filter=data_msdu_filter, timeout = 300000)
- self.failUnless (rsp != None)
- if rsp != None:
- self.failUnless (rsp[0].dst == frame.dst)
- self.failUnless (rsp[0].src == frame.src)
- self.failUnless (rsp[0].type == frame.type)
- self.failUnless (rsp[0].payload == frame.payload)
- print ""
- print " -------------- First frame ------------\n"
- print rsp[0].payload
- print " ---------------------------------------\n"
-
- self.failUnless (rsp[1].dst == frame2.dst)
- self.failUnless (rsp[1].src == frame2.src)
- self.failUnless (rsp[1].type == frame2.type)
- self.failUnless (rsp[1].payload == frame2.payload)
- print ""
- print " -------------- Second frame ------------\n"
- print rsp[1].payload
- print " ---------------------------------------\n"
-
-suite = unittest.TestLoader().loadTestsFromTestCase(TestInitFunctions)
-
-testResult = unittest.TextTestRunner(verbosity=2).run(suite)
-
-# Delete the station.
-print "Removing STA 1 ..."
-sta1.remove()
-print "Removing STA 2 ..."
-sta2.remove()
-
-
-# For nightly build errors
-sys.exit ((1, 0)[testResult.wasSuccessful ()])