summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThierry Carré2013-04-25 16:41:23 +0200
committerThierry Carré2013-04-26 17:03:20 +0200
commit8dc1cc159e1749659b49a27c592344b74dd2e3a6 (patch)
tree2ea855b9e763eda68b4b1758ff007a78f268e6f3
parent0fe0af0214b8c073bde08527614a9d635e129b76 (diff)
cesar: add maximus test about force_role, closes #3789
-rw-r--r--cesar/maximus/python/maximus/station/config.py14
-rw-r--r--cesar/maximus/python/maximus/station/sta.py62
-rw-r--r--cesar/maximus/python/tools/csi/csiavln.py5
-rw-r--r--cesar/maximus/python/tools/csi/csistation.py7
-rw-r--r--cesar/test_general/station/scenario/av/Makefile3
-rw-r--r--cesar/test_general/station/scenario/av/py/sc16_force_role.py86
-rw-r--r--common/lib/scammer/drv.py13
-rw-r--r--common/tests/tests1
8 files changed, 186 insertions, 5 deletions
diff --git a/cesar/maximus/python/maximus/station/config.py b/cesar/maximus/python/maximus/station/config.py
index 2b49a0b86b..630968c997 100644
--- a/cesar/maximus/python/maximus/station/config.py
+++ b/cesar/maximus/python/maximus/station/config.py
@@ -12,6 +12,7 @@ DEFAULT_M_STA_HFID = "M_STA_HFID_"
DEFAULT_U_STA_HFID = "U_STA_HFID_"
DEFAULT_AVLN_HFID = "AVLN_HomePlugAV0123"
DEFAULT_SL = 0
+DEFAULT_FORCE_ROLE = 0
DEFAULT_TONEMASK = 85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535
DEFAULT_SNID = 0
DEFAULT_INTERNAL_CONF = None
@@ -27,11 +28,16 @@ class Config:
u_sta_hfid - a Python string of length from 0 to 64 octets
avln_hfid - a Python string of length from 0 to 64 octets
sl - a Python integer from 0 to 2
+ force_role - a Python integer from 0 to 2 included
tonemask - a Python tuple of Python integers
snid - a Python integer from 0 to 15
internal_conf - a Python string giving the full path and name of the internal.conf file
"""
- def __init__(self, default_config=True, mac_address=None, cco_preference=None, was_cco=None, npw=None, dpw=None, m_sta_hfid=None, u_sta_hfid=None, avln_hfid=None, sl=None, tonemask=None, snid=None, internal_conf=None):
+ def __init__(
+ self, default_config=True, mac_address=None, cco_preference=None,
+ was_cco=None, npw=None, dpw=None, m_sta_hfid=None,
+ u_sta_hfid=None, avln_hfid=None, sl=None, tonemask=None,
+ snid=None, internal_conf=None, force_role=None):
# MAC address
if mac_address is None and default_config is True:
@@ -87,6 +93,12 @@ class Config:
else:
self.sl = sl
+ # Force Role
+ if force_role is None and default_config is True:
+ self.force_role = DEFAULT_FORCE_ROLE
+ else:
+ self.force_role = force_role
+
# Tonemask
if tonemask is None and default_config is True:
self.tonemask = DEFAULT_TONEMASK
diff --git a/cesar/maximus/python/maximus/station/sta.py b/cesar/maximus/python/maximus/station/sta.py
index 12735970a8..c69f0ebcdb 100644
--- a/cesar/maximus/python/maximus/station/sta.py
+++ b/cesar/maximus/python/maximus/station/sta.py
@@ -21,6 +21,7 @@ MAX_SIZE_OF_NPW = 64 # in octets
MAX_SIZE_OF_DPW = MAX_SIZE_OF_NPW
MAX_SIZE_OF_HFID = MAX_SIZE_OF_DPW
MAX_VALUE_OF_SL = 1
+MAX_VALUE_OF_FORCE_ROLE = 2
MAX_VALUE_OF_SNID = 15
# Constants from 'hal/phy/defs.h'
@@ -251,6 +252,7 @@ class STA:
self.set_u_sta_hfid(config.u_sta_hfid)
self.set_avln_hfid(config.avln_hfid)
self.set_sl(config.sl)
+ self.set_force_role (config.force_role)
self.set_tonemask(config.tonemask)
self.set_snid(config.snid)
self.set_internal_conf(config.internal_conf)
@@ -426,6 +428,24 @@ class STA:
# Send a message to the station to configure the SL
self.__send_sl()
+ def set_force_role (self, force_role):
+ """Set the Force Role.
+ The FR must be a Python integer
+ from 0 to MAX_VALUE_OF_FORCE_ROLE.
+ """
+ if type (force_role) is int:
+ if force_role >= 0 and force_role <= MAX_VALUE_OF_FORCE_ROLE:
+ self.__force_role = force_role
+ else:
+ raise OutOfRangeError ("Force Role")
+ elif force_role is None:
+ self.__force_role = force_role
+ else:
+ raise TypeError("FR")
+
+ # Send a message to the station to configure the SL
+ self.__send_force_role ()
+
def set_tonemask(self, carriers):
"""Set the tonemask.
The carriers must be a Python tuple of Python integers.
@@ -605,6 +625,13 @@ class STA:
"""
return self.__sl
+ def get_force_role (self):
+ """Get the Force Role.
+ The FR is a Python integer.
+ """
+ return self.__force_role
+
+
def get_tonemask(self):
"""Get the tonemask.
The tonemask is a bits field.
@@ -964,6 +991,41 @@ class STA:
self.__check_cnf(rsp,
mmtype.DRV_STA_SET_SL_CNF)
+ def __send_force_role (self):
+ """Send a message to the station to configure the FR.
+ """
+ if self.__force_role is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ m = self.__get_maximus().create_fcall("maximus_set_fr")
+ m.add_param_uchar("fr", self.__force_role)
+ m.send(self.get())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ mmheader = MMHeader(ODA=self.__get_oda(),
+ OSA=DEFAULT_MAC_ADDRESS, MMV=0x01,
+ MMTYPE = mmtype.DRV_STA_FORCE_ROLE_REQ)
+
+ # Computes MM Entry: Security Level for New NMK
+ # Octet Number = 0
+ # Field Size (Octets) = 1
+ FR = htohp8(self.__force_role)
+ FR = data_pad (FR, 41 - len (FR))
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=FR)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ m = self.__get_maximus().create_fcall("maximus_set_fr")
+ m.add_param("mme", mme.get())
+ m.send(self.get())
+ else: # 'MME'
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(),
+ station=self.get(), filter=mme_filter)
+ self.__check_cnf(rsp,
+ mmtype.DRV_STA_FORCE_ROLE_CNF)
+
def __send_tonemask(self):
"""Send a message to the station to configure the tonemask.
"""
diff --git a/cesar/maximus/python/tools/csi/csiavln.py b/cesar/maximus/python/tools/csi/csiavln.py
index 069c9f1b7a..4d05356bce 100644
--- a/cesar/maximus/python/tools/csi/csiavln.py
+++ b/cesar/maximus/python/tools/csi/csiavln.py
@@ -30,7 +30,7 @@ class csiAvln:
def sta_add (self, mac_addr, cco_pref, was_cco, dpw, mhfid, uhfid, sl,
debug = False, delay_ms = 0, executable = None,
- internal_conf = None):
+ internal_conf = None, force_role = 0):
if self.__hp_sta_max <= self.get_nb_sta():
return None
@@ -48,7 +48,8 @@ class csiAvln:
sta = csiSta (mac_addr, cco_pref, was_cco, self.get_npw(), dpw,
mhfid, uhfid, self.get_ahfid(), sl, debug, delay_ms,
- executable=executable, internal_conf = internal_conf)
+ executable=executable, internal_conf = internal_conf,
+ force_role = force_role)
self.__sta_list.append (sta)
return sta
diff --git a/cesar/maximus/python/tools/csi/csistation.py b/cesar/maximus/python/tools/csi/csistation.py
index 338eddfd76..a9d41229fb 100644
--- a/cesar/maximus/python/tools/csi/csistation.py
+++ b/cesar/maximus/python/tools/csi/csistation.py
@@ -3,7 +3,8 @@ from maximus.station.config import Config
class csiSta:
def __init__(self, mac_addr, cco_pref, was_cco, npw, dpw, mhfid, uhfid,
- ahfid, sl, debug, delay_ms = 0, executable = None, internal_conf=None):
+ ahfid, sl, debug, delay_ms = 0, executable = None,
+ internal_conf=None, force_role = 0):
self.__stacesar = None
self.__debug = debug
self.__config = Config ()
@@ -19,6 +20,7 @@ class csiSta:
self.__config.sl = sl
self.__config.tonemask = (85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535)
self.__config.snid = None
+ self.__config.force_role = force_role
self.__config.internal_conf = internal_conf
self.__config.default_config = True if not internal_conf else False
@@ -39,6 +41,9 @@ class csiSta:
def get_dpw (self):
return self.__config.dpw
+ def get_force_role (self):
+ return self.__config.force_role
+
def get_manufacturer_hfid (self):
return self.__config.m_sta_hfid
diff --git a/cesar/test_general/station/scenario/av/Makefile b/cesar/test_general/station/scenario/av/Makefile
index 6dfeb65f78..e0992943b4 100644
--- a/cesar/test_general/station/scenario/av/Makefile
+++ b/cesar/test_general/station/scenario/av/Makefile
@@ -18,7 +18,8 @@ testbook: py/sc01_assoc_auth.py py/sc02_stas_communication.py \
py/sc07_bridge.py py/sc08_bentry_change.py \
py/sc09_simple_connect.py py/sc10_short_messages.py \
py/sc11_cm_nw_info.py py/sc12_change_nmk.py \
- py/sc13_data_rate.py py/sc14_igmp.py
+ py/sc14_igmp.py \
+ py/sc15_hide.py py/sc16_force_role.py
python testbook.py $^ > $@.rst
CLEAN_FILES += testbook.rst
diff --git a/cesar/test_general/station/scenario/av/py/sc16_force_role.py b/cesar/test_general/station/scenario/av/py/sc16_force_role.py
new file mode 100644
index 0000000000..3f456f712f
--- /dev/null
+++ b/cesar/test_general/station/scenario/av/py/sc16_force_role.py
@@ -0,0 +1,86 @@
+#!/usr/bin/python
+
+#############################################################################
+# Copyright (C) 2011 Spidcom
+#############################################################################
+
+import sys
+sys.path.append ('py')
+from scenario_init import *
+
+class TestForceRole(unittest.TestCase):
+ """Test the communication on an AVLN with force role in config.
+ Feature for Zenitel"""
+
+ def setUp(self):
+ self.csi = csiCore (1234)
+ self.csi.process_init (args)
+
+ def tearDown(self):
+ self.csi.process_uninit ()
+ del self.csi
+
+ def testCommunication2StaForced (self):
+ """Communication between 2 stations.
+ Send some data between the stations.
+
+ Expected result: Data should be received from each station.
+
+ Check DRV_MAC_STOP and START also on CCO.
+ """
+ # Generate a NPW.
+ npw = str (random.getrandbits(64))
+ ahfid = str (random.getrandbits(32))
+ avln = self.csi.avln_add (npw, ahfid)
+
+ """ force_role = 1 is CCO_FORCED """
+ cco_mac = "00:13:d7:00:00:10"
+ dpw = "HomePlugAV_station10"
+ mhfid = "HomePlugAV_station10"
+ uhfid = "HomePlugAV_station10"
+ avln.sta_add (
+ cco_mac, False, False, dpw, mhfid, uhfid, 0,
+ force_role = 1)
+
+ """ force_role = 2 is STA_FORCED """
+ sta_mac = "00:13:d7:00:00:60"
+ dpw = "HomePlugAV_station60"
+ mhfid = "HomePlugAV_station60"
+ uhfid = "HomePlugAV_station60"
+ avln.sta_add (
+ sta_mac, False, False, dpw, mhfid, uhfid, 0,
+ force_role = 2)
+
+ self.csi.process_avlns_launch ()
+ self.csi.process_wait_association ()
+ self.csi.process_wait_authentication ()
+ cco = avln.get_cco (self.csi.get_maximus ())
+ self.failUnless (cco_mac == cco.get_mac_addr())
+
+ self.failUnless (send_check_datas (self.csi, avln))
+ cco = avln.get_cco (self.csi.get_maximus ())
+ drv_sta_mac_stop_req = Ether (
+ src = "10:10:10:10:10:10", dst = cco_mac) \
+ / scammer.MME () \
+ / scammer.DRV_STA_MAC_STOP_REQ ()
+
+ send_mme (self.csi.get_maximus (), cco, drv_sta_mac_stop_req)
+ self.csi.process_wait_sec (10)
+ self.failIf (send_check_datas (self.csi, avln))
+
+ drv_sta_mac_start_req = Ether (
+ src = "10:10:10:10:10:10", dst = cco_mac) \
+ / scammer.MME () \
+ / scammer.DRV_STA_MAC_START_REQ ()
+
+ send_mme (self.csi.get_maximus (), cco, drv_sta_mac_start_req)
+ self.csi.process_wait_association ()
+ self.csi.process_wait_authentication ()
+ self.failUnless (cco_mac == cco.get_mac_addr())
+ self.failUnless (send_check_datas (self.csi, avln))
+ self.csi.process_avlns_remove ()
+
+if __name__ == '__main__':
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestForceRole)
+ testResult = unittest.TextTestRunner(verbosity=2).run(suite)
+ sys.exit ((1, 0)[testResult.wasSuccessful ()])
diff --git a/common/lib/scammer/drv.py b/common/lib/scammer/drv.py
index b1ff6f5610..b9879e84bb 100644
--- a/common/lib/scammer/drv.py
+++ b/common/lib/scammer/drv.py
@@ -10,6 +10,19 @@ from commonfields import *
from scammer import MMEPayload
from common import HPAV_RESULT
+class DRV_STA_MAC_START_REQ (MMEPayload):
+ """Handles the DRV_STA_MAC_START.REQ MME."""
+ name = "HomePlug AV DRV_STA_MAC_START.REQ"""
+ fields_desc = [
+ ]
+
+class DRV_STA_MAC_START_CNF (MMEPayload):
+ """Handles the DRV_STA_MAC_START.CNF MME."""
+ name = "HomePlug AV DRV_STA_MAC_START.CNF"""
+ fields_desc = [
+ XByteField ("result", 0),
+ ]
+
class DRV_STA_MAC_STOP_REQ (MMEPayload):
"""Handles the DRV_STA_MAC_STOP.REQ MME."""
name = "HomePlug AV DRV_STA_MAC_STOP.REQ"""
diff --git a/common/tests/tests b/common/tests/tests
index 0124a69a82..16ec51c6ec 100644
--- a/common/tests/tests
+++ b/common/tests/tests
@@ -451,6 +451,7 @@ sc11_cm_nw_info: python py/sc11_cm_nw_info.py -d false -t 25000000000
sc12_change_nmk: python py/sc12_change_nmk.py -d false -t 25000000000
sc14_igmp: python py/sc14_igmp.py -d false -t 25000000000
sc15_hide: python py/sc15_hide.py -d false -t 25000000000
+sc16_force_role: python py/sc16_force_role.py -d false -t 25000000000
cesar/test_general/station/maximus:
make