summaryrefslogtreecommitdiff
path: root/maximus
diff options
context:
space:
mode:
authorburet2008-02-07 17:55:25 +0000
committerburet2008-02-07 17:55:25 +0000
commitc58689bdba3d9db2568c5bbc716a8be990a7eaaa (patch)
tree25f1844ccee4aa93d469dfae2d3c02b975fdc445 /maximus
parent0ca0238fed35ed8e53f21be637e100b5d75a25ca (diff)
Maximus Python:
- Update station initialisation. - Better handle the little-endian/big-endian problematic between ethernet and homeplug with dedicated functions "ntoh"/"hton" and "htohp"/"hptoh". git-svn-id: svn+ssh://pessac/svn/cesar/trunk@1389 017c9cb6-072f-447c-8318-d5b54f68fe89
Diffstat (limited to 'maximus')
-rw-r--r--maximus/python/maximus/ethernet/buffer.py25
-rw-r--r--maximus/python/maximus/ethernet/eth.py98
-rw-r--r--maximus/python/maximus/macframe/fc_10.py19
-rw-r--r--maximus/python/maximus/macframe/fc_av.py47
-rw-r--r--maximus/python/maximus/macframe/macframe.py72
-rw-r--r--maximus/python/maximus/macframe/macframeheader.py9
-rw-r--r--maximus/python/maximus/macframe/mpdu.py49
-rw-r--r--maximus/python/maximus/macframe/msdu.py9
-rw-r--r--maximus/python/maximus/mme/__init__.py3
-rw-r--r--maximus/python/maximus/mme/mme.py4
-rw-r--r--maximus/python/maximus/mme/mmheader.py141
-rw-r--r--maximus/python/maximus/mme/mmtype.py20
-rw-r--r--maximus/python/maximus/station/config.py94
-rw-r--r--maximus/python/maximus/station/sta.py791
-rw-r--r--maximus/python/maximus/utils/__init__.py3
-rw-r--r--maximus/python/maximus/utils/format.py255
-rw-r--r--maximus/python/py/test_ether.py20
-rw-r--r--maximus/python/test/test.txt83
-rw-r--r--maximus/python/test/test_ethernet.py6
-rw-r--r--maximus/python/test/test_fsm.py2
-rw-r--r--maximus/python/test/test_macframe.py2
-rw-r--r--maximus/python/test/test_mme.py8
-rw-r--r--maximus/python/test/test_station.py73
-rw-r--r--maximus/python/test/test_utils.py2
-rw-r--r--maximus/stationtest/Makefile3
-rw-r--r--maximus/stationtest/src/test_station.c47
-rwxr-xr-xmaximus/test/test.sh18
-rwxr-xr-xmaximus/test/test_python.sh69
28 files changed, 1533 insertions, 439 deletions
diff --git a/maximus/python/maximus/ethernet/buffer.py b/maximus/python/maximus/ethernet/buffer.py
index a145634e6c..b4831abedd 100644
--- a/maximus/python/maximus/ethernet/buffer.py
+++ b/maximus/python/maximus/ethernet/buffer.py
@@ -4,21 +4,16 @@
from maximus.macframe.msdu import MSDU_TYPES, MSDU
from maximus.utils.exception import Error, OutOfRangeError
-from struct import pack, unpack
+from maximus.utils.format import *
# Constants to check arguments validity
-MAX_VALUE_OF_UINT32_T = 0xFFFFFFFF
-MAX_VALUE_OF_BUFFER_NB = MAX_VALUE_OF_UINT32_T
-SIZE_OF_UINT32_T = 4 # in octets
-SIZE_OF_BUFFER_ID = SIZE_OF_UINT32_T
-
-# Constant for 'struct.pack()' and 'struct.unpack()' functions
-uint32_t = '=I' # native
+MAX_VALUE_OF_BUFFER_NB = MAX_VALUE_OF_U32
+SIZE_OF_BUFFER_ID = SIZE_OF_U32
class Buffer(MSDU):
"""The Buffer object is composed of the 2 following fields:
- number of buffers to allocate (uint32_t)
- ID of each buffer to allocate (uint32_t)
+ number of buffers to allocate (u32)
+ ID of each buffer to allocate (u32)
"""
# Static attributes of the class
buffer_id = 0
@@ -42,7 +37,7 @@ class Buffer(MSDU):
def set_buffer_nb(self, buffer_nb):
"""This function sets the number of buffers to allocate.
- The number of buffers must be a Python integer (uint32_t).
+ The number of buffers must be a Python integer (u32).
"""
self.__buffer_nb = 0
if type(buffer_nb) is not int or buffer_nb <= 0 or buffer_nb > MAX_VALUE_OF_BUFFER_NB:
@@ -70,15 +65,15 @@ class Buffer(MSDU):
"""
if len(payload) != SIZE_OF_BUFFER_ID:
raise Error('Buffer ID length')
- self.__remove_buffer(unpack(uint32_t, payload)[0])
+ self.__remove_buffer(hptoh32(payload))
def get(self):
"""This function returns the Buffer object into a string.
"""
- buffer = pack(uint32_t, self.get_buffer_nb())
+ buffer = htohp32(self.get_buffer_nb())
for n in range(0, self.get_buffer_nb()):
self.__inc_buffer_id()
- buffer += pack(uint32_t, self.get_buffer_id())
+ buffer += htohp32(self.get_buffer_id())
for i, j in enumerate(MSDU_TYPES):
if self.get_ether_type() == i:
self.__add_buffer(self.get_buffer_id(), j)
@@ -96,7 +91,7 @@ class Buffer(MSDU):
def get_buffer_nb(self):
"""This function gets the number of buffers to allocate.
- The number of buffers is a Python integer (uint32_t).
+ The number of buffers is a Python integer (u32).
"""
return self.__buffer_nb
diff --git a/maximus/python/maximus/ethernet/eth.py b/maximus/python/maximus/ethernet/eth.py
index f55aad84b1..f9bfb4213c 100644
--- a/maximus/python/maximus/ethernet/eth.py
+++ b/maximus/python/maximus/ethernet/eth.py
@@ -3,18 +3,16 @@
#print __name__
from maximus.ethernet.scapy import *
-from maximus.macframe.msdu import MIN_SIZE_OF_MSDU, uint8_t, MSDU_TYPES, MSDU
+from maximus.macframe.msdu import MIN_SIZE_OF_MSDU, MSDU_TYPES, MSDU
from maximus.utils.exception import Error
+from maximus.utils.format import *
from struct import pack, unpack
# Constants to check arguments validity
-SIZE_OF_UINT8_T = 1 # in octets
-SIZE_OF_UINT16_T = 2 # in octets
-SIZE_OF_UINT32_T = 4 # in octets
-SIZE_OF_DST = SIZE_OF_UINT32_T + SIZE_OF_UINT16_T # in octets
+SIZE_OF_DST = SIZE_OF_U48 # in octets
SIZE_OF_SRC = SIZE_OF_DST # in octets
-SIZE_OF_VLANTAG = SIZE_OF_UINT32_T # in octets
-SIZE_OF_TYPE = SIZE_OF_UINT16_T # in octets
+SIZE_OF_VLANTAG = SIZE_OF_U32 # in octets
+SIZE_OF_TYPE = SIZE_OF_U16 # in octets
SIZE_OF_HEADER = SIZE_OF_DST + SIZE_OF_SRC + SIZE_OF_VLANTAG + SIZE_OF_TYPE # in octets
# Constants for 'struct.pack()' and 'struct.unpack()' functions
@@ -39,68 +37,47 @@ class Eth(MSDU, Packet):
raise Error('Ethernet Frame length')
# Set Eth Header: dst, src, vlantag, type
-
- # Swap received Eth Header MAC addresses
- header = ''
- for i in range (0,2):
- header += self.__swap(payload[0:SIZE_OF_UINT32_T])
- payload = payload[SIZE_OF_UINT32_T:]
- header += self.__swap(payload[0:SIZE_OF_UINT16_T])
- payload = payload[SIZE_OF_UINT16_T:]
- header += payload[0:SIZE_OF_UINT32_T]
- payload = payload[SIZE_OF_UINT32_T:]
- header += payload[0:SIZE_OF_UINT16_T]
- payload = payload[SIZE_OF_UINT16_T:]
+ #
# Set dst
- for begin in range (0, SIZE_OF_DST):
- end = begin + 1
- address = unpack(uint8_t, header[begin:end])[0]
- if begin == 0:
- dst = hex(address).strip('0x')
- else:
- dst += ':' + hex(address).strip('0x')
+ begin = 0
+ end = SIZE_OF_DST
+ address = hex(ntoh48(payload[begin:end])).strip('0x').strip('L')
+ dst = ''
+ for i in range (0, len(address), 2):
+ dst += address[i:i+2]
+ if i < len(address)-2:
+ dst += ':'
self.dst = dst
# Set src
- for begin in range (SIZE_OF_DST, SIZE_OF_DST + SIZE_OF_SRC):
- end = begin + 1
- address = unpack(uint8_t, header[begin:end])[0]
- if begin == SIZE_OF_DST:
- src = hex(address).strip('0x')
- else:
- src += ':' + hex(address).strip('0x')
+ begin = end
+ end += SIZE_OF_SRC
+ address = hex(ntoh48(payload[begin:end])).strip('0x').strip('L')
+ src = ''
+ for i in range (0, len(address), 2):
+ src += address[i:i+2]
+ if i < len(address)-2:
+ src += ':'
self.src = src
# Set vlantag
- begin = SIZE_OF_DST + SIZE_OF_SRC
- end = begin + SIZE_OF_VLANTAG
- self.vlantag = unpack(little_uint32_t, header[begin:end])[0]
+ begin = end
+ end += SIZE_OF_VLANTAG
+ self.vlantag = ntoh32(payload[begin:end])
# Set type
- begin = SIZE_OF_DST + SIZE_OF_SRC + SIZE_OF_VLANTAG
- end = SIZE_OF_HEADER
- self.type = unpack(little_uint16_t, header[begin:end])[0]
+ begin = end
+ end += SIZE_OF_TYPE
+ self.type = ntoh16(payload[begin:end])
# Set Eth Payload
- self.payload = payload
+ self.payload = payload[end:]
def get(self):
"""This function returns the Ethernet frame into a string.
"""
- payload = str(self)
-
- # Swap Eth Header to send
- header = ''
- for i in range (0,3):
- header += self.__swap(payload[0:SIZE_OF_UINT32_T])
- payload = payload[SIZE_OF_UINT32_T:]
- header += self.__swap(payload[0:SIZE_OF_UINT16_T])
- payload = payload[SIZE_OF_UINT16_T:]
-
- ethpad = self.pad(MIN_SIZE_OF_MSDU - len(header) - len(payload))
- eth = header + payload + ethpad
- return eth
+ return str(self) + self.pad(MIN_SIZE_OF_MSDU - len(str(self)))
def get_ether_type(self):
"""This function returns the Ethernet Frame object type into a Python integer (uint8_t).
@@ -115,18 +92,3 @@ class Eth(MSDU, Packet):
"""This function returns the Ethernet Frame object type into a Python string.
"""
return MSDU_TYPES[1] # 'ETHERNET_TYPE_DATA'
-
- def __swap(self, string):
- """This function swaps the contents of a string of length 1, 2 or 4 octets.
- It returns the swapped string.
- """
- swap = None
- if len(string) is SIZE_OF_UINT8_T:
- swap = string
- elif len(string) is SIZE_OF_UINT16_T:
- swap = pack(little_uint16_t, unpack(big_uint16_t, string)[0])
- elif len(string) is SIZE_OF_UINT32_T:
- swap = pack(little_uint32_t, unpack(big_uint32_t, string)[0])
- else:
- raise Error("Swap arguments")
- return swap
diff --git a/maximus/python/maximus/macframe/fc_10.py b/maximus/python/maximus/macframe/fc_10.py
index 54c789b2b5..1b60614636 100644
--- a/maximus/python/maximus/macframe/fc_10.py
+++ b/maximus/python/maximus/macframe/fc_10.py
@@ -4,23 +4,20 @@
from maximus.utils.crc import crc8
from maximus.utils.exception import OutOfRangeError
-from struct import unpack
+from maximus.utils.format import *
# Constants to check arguments validity
MAX_VALUE_OF_CC = 0x01
DT_DICTIONARY = {'SOF':0x00, 'SOF_RSP':0x01, 'EOF':0x02, 'EOF_RSP':0x03, 'ACK':0x04, 'NACK':0x05}
MAX_VALUE_OF_DT = 0x07
MAX_VALUE_OF_VF = 0x1FFF
-MAX_VALUE_OF_FCCS = 0xFF
-SIZE_OF_FCCS = 1 # in octets
-
-# Constant for 'struct.unpack()' function
-uint8_t = '=B' # native
+MAX_VALUE_OF_FCCS = MAX_VALUE_OF_U8
+SIZE_OF_FCCS = SIZE_OF_U8 # in octets
# Constants for 'self.get()' function
size_of_dt = 3 # in bits
size_of_vf = 13 # in bits
-size_of_fccs = 8*SIZE_OF_FCCS # in bits
+size_of_fccs = 8 * SIZE_OF_FCCS # in bits
class FC_10:
"""The FC 1.0 is composed of the 4 following fields:
@@ -68,13 +65,13 @@ class FC_10:
def set_fccs(self, fccs):
"""Set the FCCS.
The FCCS can be a Python integer (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint8_t).
+ or a Python string of length equals to sizeof(u8).
"""
if fccs != None:
try:
if len(fccs) == SIZE_OF_FCCS:
- # FCCS is a Python string of length equals to sizeof(uint8_t)
- self.__fccs = unpack(uint8_t, fccs)[0]
+ # FCCS is a Python string of length equals to sizeof(u8)
+ self.__fccs = hptoh8(fccs)
else:
raise OutOfRangeError("FCCS")
except TypeError:
@@ -111,7 +108,7 @@ class FC_10:
return self.__fccs
def get(self):
- """This function returns the FC 1.0 into a Python long (uint32_t).
+ """This function returns the FC 1.0 into a Python long (u32).
"""
if self.get_fccs() == None:
# The 8-bit CRC must be calculated
diff --git a/maximus/python/maximus/macframe/fc_av.py b/maximus/python/maximus/macframe/fc_av.py
index f6703bdb87..1535193910 100644
--- a/maximus/python/maximus/macframe/fc_av.py
+++ b/maximus/python/maximus/macframe/fc_av.py
@@ -4,35 +4,25 @@
from maximus.utils.crc import crc32
from maximus.utils.exception import OutOfRangeError
-from struct import unpack
+from maximus.utils.format import *
# Constants to check arguments validity
-
-SIZE_OF_UINT8_T = 1 # in octets
-SIZE_OF_UINT32_T = 4 # in octets
-size_of_uint8_t = 8*SIZE_OF_UINT8_T # in bits
-size_of_uint32_t = 8*SIZE_OF_UINT32_T # in bits
-MAX_VALUE_OF_UINT8_T = 0xFF
-MAX_VALUE_OF_UINT32_T = 0xFFFFFFFF
-
+size_of_u8 = 8 * SIZE_OF_U8 # in bits
+size_of_u32 = 8 * SIZE_OF_U32 # in bits
DT_AV_DICTIONARY = {'Beacon':0x00, 'SOF':0x01, 'SACK':0x02, 'RTS_CTS':0x03, 'Sound':0x04, 'RSOF':0x05}
MAX_VALUE_OF_DT_AV = 0x07
MAX_VALUE_OF_ACCESS = 0x01
MAX_VALUE_OF_SNID = 0x0F
MAX_VALUE_OF_VF_AV = 0xFFFFFFFFFFFFFFFFFFFFFFFF
-SIZE_OF_VF_AV = 3*SIZE_OF_UINT32_T # in octets
+SIZE_OF_VF_AV = 3 * SIZE_OF_U32 # in octets
MAX_VALUE_OF_FCCS_AV = 0xFFFFFF
-SIZE_OF_FCCS_AV = 3*SIZE_OF_UINT8_T # in octets
-
-# Constants for 'struct.unpack()' function
-uint8_t = '=B' # native
-uint32_t = '=I' # native
+SIZE_OF_FCCS_AV = 3 * SIZE_OF_U8 # in octets
# Constants for 'self.get()' function
size_of_dt_av = 3 # in bits
size_of_access = 1 # in bits
-size_of_vf_av = 8*SIZE_OF_VF_AV # in bits
-size_of_fccs_av = 8*SIZE_OF_FCCS_AV # in bits
+size_of_vf_av = 8 * SIZE_OF_VF_AV # in bits
+size_of_fccs_av = 8 * SIZE_OF_FCCS_AV # in bits
class FC_AV:
"""The FC AV is composed of the 5 following fields:
@@ -87,8 +77,8 @@ class FC_AV:
try:
if len(vf_av) == SIZE_OF_VF_AV:
# VF AV is a Python string of length equals to 12 octets
- vf_av = unpack(uint32_t + 2*'I', vf_av)
- self.__vf_av = (vf_av[2] << 2*size_of_uint32_t) + (vf_av[1] << size_of_uint32_t) + vf_av[0]
+ vf_av = unpack(hp + 3 * u32, vf_av)
+ self.__vf_av = (vf_av[2] << 2*size_of_u32) + (vf_av[1] << size_of_u32) + vf_av[0]
else:
raise OutOfRangeError("VF AV")
except TypeError:
@@ -107,8 +97,7 @@ class FC_AV:
try:
if len(fccs_av) == SIZE_OF_FCCS_AV:
# FCCS AV is a Python string of length equals to 3 octets
- fccs_av = unpack(uint8_t + 2*'B', fccs_av)
- self.__fccs_av = (fccs_av[2] << 2*size_of_uint8_t) + (fccs_av[1] << size_of_uint8_t) + fccs_av[0]
+ self.__fccs_av = hptoh24(fccs_av)
else:
raise OutOfRangeError("FCCS AV")
except TypeError:
@@ -151,20 +140,20 @@ class FC_AV:
return self.__fccs_av
def get(self):
- """This function returns the FC AV into a tuple of 4 Python longs (uint32_t).
+ """This function returns the FC AV into a tuple of 4 Python longs (u32).
"""
if self.get_fccs_av() == None:
# The 24-bit CRC must be calculated
fccs_av = crc32(0) # TODO
else:
fccs_av = self.get_fccs_av()
- tuple0 = self.get_snid() << (size_of_access + size_of_dt_av + 3*size_of_uint8_t)
- tuple0 += self.get_access() << (size_of_dt_av + 3*size_of_uint8_t)
- tuple0 += self.get_dt_av() << (3*size_of_uint8_t)
- tuple0 += self.get_vf_av() >> (2*size_of_uint32_t + size_of_uint8_t)
- tuple1 = ((self.get_vf_av() >> (size_of_uint32_t + size_of_uint8_t))) & MAX_VALUE_OF_UINT32_T
- tuple2 = (self.get_vf_av() >> size_of_uint8_t) & MAX_VALUE_OF_UINT32_T
- tuple3 = ((self.get_vf_av() & MAX_VALUE_OF_UINT8_T) << size_of_fccs_av) + fccs_av
+ tuple0 = self.get_snid() << (size_of_access + size_of_dt_av + 3*size_of_u8)
+ tuple0 += self.get_access() << (size_of_dt_av + 3*size_of_u8)
+ tuple0 += self.get_dt_av() << (3*size_of_u8)
+ tuple0 += self.get_vf_av() >> (2*size_of_u32 + size_of_u8)
+ tuple1 = ((self.get_vf_av() >> (size_of_u32 + size_of_u8))) & MAX_VALUE_OF_U32
+ tuple2 = (self.get_vf_av() >> size_of_u8) & MAX_VALUE_OF_U32
+ tuple3 = ((self.get_vf_av() & MAX_VALUE_OF_U8) << size_of_fccs_av) + fccs_av
tuple = tuple0, tuple1, tuple2, tuple3
return tuple
diff --git a/maximus/python/maximus/macframe/macframe.py b/maximus/python/maximus/macframe/macframe.py
index 288780533e..fef26e8060 100644
--- a/maximus/python/maximus/macframe/macframe.py
+++ b/maximus/python/maximus/macframe/macframe.py
@@ -8,34 +8,22 @@ from maximus.macframe.mpdu import SIZE_OF_IV, SIZE_OF_NEK, SIZE_OF_PB136, SIZE_O
from maximus.macframe.msdu import MAX_SIZE_OF_MSDU
from maximus.utils.crc import crc32
from maximus.utils.exception import Error, OutOfRangeError
-from struct import pack
+from maximus.utils.format import *
# Constants to check arguments validity
-
-SIZE_OF_UINT16_T = 2 # in octets
-SIZE_OF_UINT32_T = 4 # in octets
-MAX_VALUE_OF_UINT16_T = 0xFFFF
-MAX_VALUE_OF_UINT32_T = 0xFFFFFFFF
-
-SIZE_OF_FC_10 = SIZE_OF_UINT32_T # in octets
-SIZE_OF_FC_AV = 4*SIZE_OF_UINT32_T # in octets
-SIZE_OF_MACFRAMEHEADER = SIZE_OF_UINT16_T # in octets
-SIZE_OF_ATS_CONFOUNDER = SIZE_OF_UINT32_T # in octets
-SIZE_OF_ICV = SIZE_OF_UINT32_T # in octets
-
-SIZE_OF_IV = 3*SIZE_OF_UINT32_T # in octets
-SIZE_OF_NEK = 4*SIZE_OF_UINT32_T # in octets
-MAX_VALUE_OF_FC_10 = MAX_VALUE_OF_UINT32_T
-MAX_VALUE_OF_MACFRAMEHEADER = MAX_VALUE_OF_UINT16_T
-MAX_VALUE_OF_ATS_CONFOUNDER = MAX_VALUE_OF_UINT32_T
-MAX_VALUE_OF_ICV = MAX_VALUE_OF_UINT32_T
-
+SIZE_OF_FC_10 = SIZE_OF_U32 # in octets
+SIZE_OF_FC_AV = 4 * SIZE_OF_U32 # in octets
+SIZE_OF_MACFRAMEHEADER = SIZE_OF_U16 # in octets
+SIZE_OF_ATS_CONFOUNDER = SIZE_OF_U32 # in octets
+SIZE_OF_ICV = SIZE_OF_U32 # in octets
+SIZE_OF_IV = 3 * SIZE_OF_U32 # in octets
+SIZE_OF_NEK = 4 * SIZE_OF_U32 # in octets
+MAX_VALUE_OF_FC_10 = MAX_VALUE_OF_U32
+MAX_VALUE_OF_MACFRAMEHEADER = MAX_VALUE_OF_U16
+MAX_VALUE_OF_ATS_CONFOUNDER = MAX_VALUE_OF_U32
+MAX_VALUE_OF_ICV = MAX_VALUE_OF_U32
MAX_SIZE_OF_MACFRAME = MAX_SIZE_OF_MSDU + SIZE_OF_MACFRAMEHEADER + SIZE_OF_ATS_CONFOUNDER + SIZE_OF_ICV # 1528 octets
-# Constants for 'struct.pack()' function
-uint16_t = '=H' # native
-uint32_t = '=I' # native
-
class MACFrame(MPDU):
"""The MAC Frame is composed of the 4 following fields:
MACFrameHeader (2-octet)
@@ -69,23 +57,23 @@ class MACFrame(MPDU):
"""Set the MAC Frame Header.
The MAC Frame Header can be a MAC Frame Header object,
a Python integer (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint16_t).
+ or a Python string of length equals to sizeof(u16).
"""
if macframeheader is not None:
try:
# MAC Frame Header is a MAC Frame Header object
- self.__macframeheader = macframeheader.get() # set MAC Frame Header into a Python string of length equals to sizeof(uint16_t)
+ self.__macframeheader = macframeheader.get() # set MAC Frame Header into a Python string of length equals to sizeof(u16)
except AttributeError:
try:
if len(macframeheader) == SIZE_OF_MACFRAMEHEADER:
- # MAC Frame Header is a Python string of length equals to sizeof(uint16_t)
+ # MAC Frame Header is a Python string of length equals to sizeof(u16)
self.__macframeheader = macframeheader
else:
raise OutOfRangeError("MAC Frame Header")
except TypeError:
# MAC Frame Header is a Python integer
if macframeheader <= MAX_VALUE_OF_MACFRAMEHEADER and macframeheader >= 0:
- self.__macframeheader = pack(uint16_t, macframeheader) # set MAC Frame Header into a Python string of length equals to sizeof(uint16_t)
+ self.__macframeheader = htohp16(macframeheader) # set MAC Frame Header into a Python string of length equals to sizeof(u16)
else:
raise OutOfRangeError("MAC Frame Header")
else:
@@ -94,19 +82,19 @@ class MACFrame(MPDU):
def set_ats(self, ats):
"""Set the ATS.
The ATS can be a Python long (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint32_t).
+ or a Python string of length equals to sizeof(u32).
"""
if ats != None:
try:
if len(ats) == SIZE_OF_ATS_CONFOUNDER:
- # ATS is a Python string of length equals to sizeof(uint32_t)
+ # ATS is a Python string of length equals to sizeof(u32)
self.__ats = ats
else:
raise OutOfRangeError("ATS")
except TypeError:
# ATS is a Python long
if ats <= MAX_VALUE_OF_ATS_CONFOUNDER and ats >= 0:
- self.__ats = pack(uint32_t, ats) # set ATS into a Python string of length equals to sizeof(uint32_t)
+ self.__ats = htohp32(ats) # set ATS into a Python string of length equals to sizeof(u32)
else:
raise OutOfRangeError("ATS")
else:
@@ -115,19 +103,19 @@ class MACFrame(MPDU):
def set_confounder(self, confounder):
"""Set the Confounder.
The Confounder can be a Python long (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint32_t).
+ or a Python string of length equals to sizeof(u32).
"""
if confounder != None:
try:
if len(confounder) == SIZE_OF_ATS_CONFOUNDER:
- # Confounder is a Python string of length equals to sizeof(uint32_t)
+ # Confounder is a Python string of length equals to sizeof(u32)
self.__confounder = confounder
else:
raise OutOfRangeError("Confounder")
except TypeError:
# Confounder is a Python long
if confounder <= MAX_VALUE_OF_ATS_CONFOUNDER and confounder >= 0:
- self.__confounder = pack(uint32_t, confounder) # set Confounder into a Python string of length equals to sizeof(uint32_t)
+ self.__confounder = htohp32(confounder) # set Confounder into a Python string of length equals to sizeof(u32)
else:
raise OutOfRangeError("Confounder")
else:
@@ -136,19 +124,19 @@ class MACFrame(MPDU):
def set_icv(self, icv):
"""Set the ICV.
The ICV can be a Python long (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint32_t).
+ or a Python string of length equals to sizeof(u32).
"""
if icv != None:
try:
if len(icv) == SIZE_OF_ICV:
- # ICV is a Python string of length equals to sizeof(uint32_t)
+ # ICV is a Python string of length equals to sizeof(u32)
self.__icv = icv
else:
raise OutOfRangeError("ICV")
except TypeError:
# ICV is a Python long
if icv <= MAX_VALUE_OF_ICV and icv >= 0:
- self.__icv = pack(uint32_t, icv) # set ICV into a Python string of length equals to sizeof(uint32_t)
+ self.__icv = htohp32(icv) # set ICV into a Python string of length equals to sizeof(u32)
else:
raise OutOfRangeError("ICV")
else:
@@ -174,25 +162,25 @@ class MACFrame(MPDU):
def get_macframeheader(self):
"""Get the MAC Frame Header.
- The MAC Frame Header is a Python string of length equals to sizeof(uint16_t).
+ The MAC Frame Header is a Python string of length equals to sizeof(u16).
"""
return self.__macframeheader
def get_ats(self):
"""Get the ATS.
- The ATS is Python string of length equals to sizeof(uint32_t).
+ The ATS is Python string of length equals to sizeof(u32).
"""
return self.__ats
def get_confounder(self):
"""Get the Confounder.
- The Confounder is a Python string of length equals to sizeof(uint32_t).
+ The Confounder is a Python string of length equals to sizeof(u32).
"""
return self.__confounder
def get_icv(self):
"""Get the ICV.
- The ICV is a Python string of length equals to sizeof(uint32_t).
+ The ICV is a Python string of length equals to sizeof(u32).
"""
return self.__icv
@@ -229,7 +217,7 @@ class MACFrame(MPDU):
# Set ICV
if self.__icv == None:
# The 32-bit CRC must be calculated
- self.__icv = pack(uint32_t, crc32(self.__msdu)) # TODO
+ self.__icv = htohp32(crc32(self.__msdu)) # TODO
# Set the MFT and the MFL of the MAC Frame Header
# "MAC Frame Length (MFL) is a 14-bit field that specifies the MAC Frame length in octets,
diff --git a/maximus/python/maximus/macframe/macframeheader.py b/maximus/python/maximus/macframe/macframeheader.py
index c893bddae1..e0ce5b1014 100644
--- a/maximus/python/maximus/macframe/macframeheader.py
+++ b/maximus/python/maximus/macframe/macframeheader.py
@@ -3,15 +3,12 @@
#print __name__
from maximus.utils.exception import OutOfRangeError, MACFrameSendError
-from struct import pack
+from maximus.utils.format import *
# Constants to check arguments validity
MAX_VALUE_OF_MFT = 0x03 # MFT is coded on 2 bits (max value is 0b11)
MAX_VALUE_OF_MFL = 0x3FFF # MFL is coded on 14 bits
-# Constant for 'struct.pack()' function
-uint16_t = '=H' # native
-
# Constant for 'self.get()' function
size_of_mft = 2 # in bits
@@ -70,10 +67,10 @@ class MACFrameHeader:
def get(self):
"""This function returns the MAC Frame Header into a string
- of length equals to sizeof(uint16_t).
+ of length equals to sizeof(u16).
"""
header = (self.get_mfl() << size_of_mft) + self.get_mft()
- return pack(uint16_t, header)
+ return htohp16(header)
def display(self):
"""This function displays the MAC Frame Header in hexadecimal values.
diff --git a/maximus/python/maximus/macframe/mpdu.py b/maximus/python/maximus/macframe/mpdu.py
index 5b663e2816..0377364ccb 100644
--- a/maximus/python/maximus/macframe/mpdu.py
+++ b/maximus/python/maximus/macframe/mpdu.py
@@ -4,26 +4,21 @@
from maximus.macframe.fc_10 import FC_10
from maximus.macframe.fc_av import FC_AV
+from maximus.utils.format import *
from interface import *
-from struct import unpack
# Constants to check arguments validity
-SIZE_OF_UINT32_T = 4 # in octets
-MAX_VALUE_OF_UINT32_T = 0xFFFFFFFF
-SIZE_OF_FC_10 = SIZE_OF_UINT32_T # in octets
-MAX_VALUE_OF_FC_10 = MAX_VALUE_OF_UINT32_T
-SIZE_OF_FC_AV = 4*SIZE_OF_UINT32_T # in octets
-SIZE_OF_IV = 3*SIZE_OF_UINT32_T # in octets
-SIZE_OF_NEK = 4*SIZE_OF_UINT32_T # in octets
+SIZE_OF_FC_10 = SIZE_OF_U32 # in octets
+MAX_VALUE_OF_FC_10 = MAX_VALUE_OF_U32
+SIZE_OF_FC_AV = 4 * SIZE_OF_U32 # in octets
+SIZE_OF_IV = 3 * SIZE_OF_U32 # in octets
+SIZE_OF_NEK = 4 * SIZE_OF_U32 # in octets
SIZE_OF_PB136 = 128 # in octets
SIZE_OF_PB520 = 512 # in octets
MAX_PB_NB_PER_MPDU = 236
-# Constant for 'struct.unpack()' function
-uint32_t = '=I' # native
-
# MPDU object type
MPDU_TYPES = ['PHY_TYPE_NONE',\
'PHY_TYPE_MACFrame',\
@@ -41,17 +36,17 @@ class MPDU:
"""Set the FC 1.0 (FC 1.0 is optional).
The FC 1.0 can be an FC 1.0 object,
a Python long (decimal or hexadecimal value),
- or a Python string of length equals to sizeof(uint32_t).
+ or a Python string of length equals to sizeof(u32).
"""
if fc_10 != None:
try:
# FC 1.0 is an FC 1.0 object
- self.fc_10 = fc_10.get() # set FC 1.0 into a Python long (uint32_t)
+ self.fc_10 = fc_10.get() # set FC 1.0 into a Python long (u32)
except AttributeError:
try:
if len(fc_10) == SIZE_OF_FC_10:
- # FC 1.0 is a Python string of length equals to sizeof(uint32_t)
- self.fc_10 = unpack(uint32_t, fc_10) # set FC 1.0 into a Python long (uint32_t)
+ # FC 1.0 is a Python string of length equals to sizeof(u32)
+ self.fc_10 = hptoh32(fc_10) # set FC 1.0 into a Python long (u32)
else:
raise OutOfRangeError("FC 1.0")
except TypeError:
@@ -66,22 +61,22 @@ class MPDU:
def set_fc_av(self, fc_av):
"""Set the FC AV.
The FC AV can be an FC AV object,
- a Python string of length equals to 4*sizeof(uint32_t),
- or a tuple of 4 Python longs (uint32_t).
+ a Python string of length equals to 4 * sizeof(u32),
+ or a tuple of 4 Python longs (u32).
"""
try:
# FC AV is an FC AV object
- self.fc_av = fc_av.get() # set FC AV into a tuple of 4 Python longs (uint32_t)
+ self.fc_av = fc_av.get() # set FC AV into a tuple of 4 Python longs (u32)
except AttributeError:
if type(fc_av) is str:
if len(fc_av) == SIZE_OF_FC_AV:
- # FC AV is a Python string of length equals to 4*sizeof(uint32_t)
- self.fc_av = unpack(uint32_t + 3*'I', fc_av) # set FC AV into a tuple of 4 Python longs (uint32_t)
+ # FC AV is a Python string of length equals to 4 * sizeof(32)
+ self.fc_av = unpack(hp + 4 * u32, fc_av) # set FC AV into a tuple of 4 Python longs (u32)
else:
raise OutOfRangeError("FC AV")
elif type(fc_av) is tuple:
if len(fc_av) == 4:
- # FC AV is a tuple of 4 Python longs (uint32_t)
+ # FC AV is a tuple of 4 Python longs (u32)
self.fc_av = fc_av
else:
raise OutOfRangeError("FC AV")
@@ -90,7 +85,7 @@ class MPDU:
def set_iv(self, iv):
"""Set the IV.
- IV must be a Python string of length equals to 3*sizeof(uint32_t).
+ IV must be a Python string of length equals to 3 * sizeof(u32).
"""
if len(iv) == SIZE_OF_IV:
self.iv = iv
@@ -99,7 +94,7 @@ class MPDU:
def set_nek(self, nek):
"""Set the NEK.
- NEK must be a Python string of length equals to 4*sizeof(uint32_t).
+ NEK must be a Python string of length equals to 4 * sizeof(u32).
"""
if len(nek) == SIZE_OF_NEK:
self.nek = nek
@@ -108,25 +103,25 @@ class MPDU:
def get_fc_10(self):
"""Get the FC 1.0.
- The FC 1.0 is a Python long (uint32_t).
+ The FC 1.0 is a Python long (u32).
"""
return self.fc_10
def get_fc_av(self):
"""Get the FC AV.
- The FC AV is a tuple of 4 Python longs (uint32_t).
+ The FC AV is a tuple of 4 Python longs (u32).
"""
return self.fc_av
def get_iv(self):
"""Get the IV.
- IV is a string of length equals to 3*sizeof(uint32_t).
+ IV is a string of length equals to 3 * sizeof(u32).
"""
return self.iv
def get_nek(self):
"""Get the NEK.
- NEK is a string of length equals to 4*sizeof(uint32_t).
+ NEK is a string of length equals to 4 * sizeof(u32).
"""
return self.nek
diff --git a/maximus/python/maximus/macframe/msdu.py b/maximus/python/maximus/macframe/msdu.py
index 2c84369bbf..4861bec604 100644
--- a/maximus/python/maximus/macframe/msdu.py
+++ b/maximus/python/maximus/macframe/msdu.py
@@ -2,15 +2,12 @@
#print __name__
-from struct import pack
+from maximus.utils.format import *
# Constants to check arguments validity
MIN_SIZE_OF_MSDU = 60 # in octets
MAX_SIZE_OF_MSDU = 1518 # in octets
-# Constant for 'struct.pack()' function
-uint8_t = '=B' # native
-
# MSDU object type
MSDU_TYPES = ['ETHERNET_TYPE_NONE',\
'ETHERNET_TYPE_DATA',\
@@ -51,7 +48,7 @@ class MSDU:
"""
def get_ether_type(self):
- """This function returns the MSDU object type into a Python integer (uint8_t).
+ """This function returns the MSDU object type into a Python integer (u8).
"""
return 0
@@ -65,5 +62,5 @@ class MSDU:
"""
pad = ''
for i in range (0, length):
- pad += pack(uint8_t, 0)
+ pad += htohp8(0)
return pad
diff --git a/maximus/python/maximus/mme/__init__.py b/maximus/python/maximus/mme/__init__.py
index 0179afe199..a1178d6a80 100644
--- a/maximus/python/maximus/mme/__init__.py
+++ b/maximus/python/maximus/mme/__init__.py
@@ -2,8 +2,9 @@
#print __name__
-#__all__ = ["create", "mme", "mmentry", "mmheader"]
+#__all__ = ["create", "mme", "mmentry", "mmheader", "mmtype"]
from mme import MME
from mmentry import MMEntry
from mmheader import MMHeader
+from mmtype import *
diff --git a/maximus/python/maximus/mme/mme.py b/maximus/python/maximus/mme/mme.py
index b3b4515c99..cc1b8a2a4c 100644
--- a/maximus/python/maximus/mme/mme.py
+++ b/maximus/python/maximus/mme/mme.py
@@ -20,13 +20,11 @@ class MME(MSDU):
MMHeader (23-octet)
MMEntry
"""
- def __init__(self, name=None, type="REQ", MMHeader=MMHeader(), MMEntry=MMEntry()):
+ def __init__(self, MMHeader=MMHeader(), MMEntry=MMEntry()):
# Create an MME structure for Maximus
self.__mme = MMEStruct()
- self.__name = name
- self.__type = type
self.set_mmheader(MMHeader)
self.set_mmentry(MMEntry)
diff --git a/maximus/python/maximus/mme/mmheader.py b/maximus/python/maximus/mme/mmheader.py
index b55208054d..15d185c434 100644
--- a/maximus/python/maximus/mme/mmheader.py
+++ b/maximus/python/maximus/mme/mmheader.py
@@ -3,41 +3,24 @@
#print __name__
from maximus.utils.exception import OutOfRangeError
-from struct import pack, unpack
+from maximus.utils.format import *
# Constants to check arguments validity
-
SIZE_OF_MMHEADER = 23 # in octets
-
-SIZE_OF_UINT8_T = 1 # in octets
-MAX_VALUE_OF_UINT8_T = 0xFF
-SIZE_OF_UINT16_T = 2 # in octets
-MAX_VALUE_OF_UINT16_T = 0xFFFF
-SIZE_OF_UINT32_T = 4 # in octets
-MAX_VALUE_OF_UINT32_T = 0xFFFFFFFF
-
-SIZE_OF_ODA = 6 # in octets
-MAX_VALUE_OF_ODA = 0xFFFFFFFFFFFF
+SIZE_OF_ODA = SIZE_OF_U48
+MAX_VALUE_OF_ODA = MAX_VALUE_OF_U48
SIZE_OF_OSA = SIZE_OF_ODA
MAX_VALUE_OF_OSA = MAX_VALUE_OF_ODA
-SIZE_OF_VLANTAG = SIZE_OF_UINT32_T
-MAX_VALUE_OF_VLANTAG = MAX_VALUE_OF_UINT32_T
-SIZE_OF_MTYPE = SIZE_OF_UINT16_T
-MAX_VALUE_OF_MTYPE = MAX_VALUE_OF_UINT16_T
-SIZE_OF_MMV = SIZE_OF_UINT8_T
-MAX_VALUE_OF_MMV = MAX_VALUE_OF_UINT8_T
-SIZE_OF_MMTYPE = SIZE_OF_UINT16_T
-MAX_VALUE_OF_MMTYPE = MAX_VALUE_OF_UINT16_T
-SIZE_OF_FMI = SIZE_OF_UINT16_T
-MAX_VALUE_OF_FMI = MAX_VALUE_OF_UINT16_T
-
-# Constants for 'struct.pack()' and 'struct.unpack()' functions
-uint8_t = '=B' # native
-uint16_t = '=H' # native
-uint32_t = '=I' # native
-uint64_t = '=Q' # native
-u8 = '!B' # network (= big-endian)
-u16 = '!H' # network (= big-endian)
+SIZE_OF_VLANTAG = SIZE_OF_U32
+MAX_VALUE_OF_VLANTAG = MAX_VALUE_OF_U32
+SIZE_OF_MTYPE = SIZE_OF_U16
+MAX_VALUE_OF_MTYPE = MAX_VALUE_OF_U16
+SIZE_OF_MMV = SIZE_OF_U8
+MAX_VALUE_OF_MMV = MAX_VALUE_OF_U8
+SIZE_OF_MMTYPE = SIZE_OF_U16
+MAX_VALUE_OF_MMTYPE = MAX_VALUE_OF_U16
+SIZE_OF_FMI = SIZE_OF_U16
+MAX_VALUE_OF_FMI = MAX_VALUE_OF_U16
# Define an empty MM Header structure for Maximus
class MMHeaderStruct:
@@ -54,7 +37,7 @@ class MMHeaderStruct:
class MMHeader:
- def __init__(self, ODA=MAX_VALUE_OF_ODA, OSA=0, VLANTag=0, MTYPE=0xe188, MMV=0, MMTYPE=0, FMI=0):
+ def __init__(self, ODA=MAX_VALUE_OF_ODA, OSA=0, VLANTag=0, MTYPE=0x88e1, MMV=0, MMTYPE=0, FMI=0):
# Create an MME structure for Maximus
self.__mmheader = MMHeaderStruct()
@@ -69,43 +52,73 @@ class MMHeader:
def set_oda(self, oda):
"""Set the ODA.
- The ODA can be a Python long (decimal or hexadecimal value),
- or a Python string of length equals to 6 octets.
+ The ODA can be a Python tuple of 6 Python integers,
+ a Python long (decimal or hexadecimal value),
+ a Python string of length equals to 6 octets,
+ or a Python string of length equals to 17 octets.
"""
self.__oda = oda
- try:
- if len(oda) == SIZE_OF_ODA:
+ if type(oda) is str:
+ if len(oda) == 3 * SIZE_OF_ODA - 1:
+ # ODA is a Python string of length equals to 17 octets ('XX:XX:XX:XX:XX:XX')
+ if oda.count(':') == SIZE_OF_ODA - 1:
+ self.__mmheader.oda = pack(n + u64, int(oda.replace(':',''), 16))[SIZE_OF_U16:]
+ else:
+ raise OutOfRangeError("ODA")
+ elif len(oda) == SIZE_OF_ODA:
# ODA is a Python string of length equals to 6 octets
self.__mmheader.oda = oda
else:
raise OutOfRangeError("ODA")
- except TypeError:
+ elif type(oda) is long or type(oda) is int:
# ODA is a Python long
if oda <= MAX_VALUE_OF_ODA and oda >= 0:
- s = pack(uint64_t, oda)
- self.__mmheader.oda = s[SIZE_OF_UINT16_T:SIZE_OF_UINT16_T + SIZE_OF_UINT32_T] + s[:SIZE_OF_UINT16_T]
+ self.__mmheader.oda = hton48(oda)
+ else:
+ raise OutOfRangeError("ODA")
+ elif type(oda) is tuple:
+ # ODA is a Python tuple
+ if len(oda) == SIZE_OF_ODA:
+ self.__mmheader.oda = hton48_tuple(oda)
else:
raise OutOfRangeError("ODA")
+ else:
+ raise TypeError("ODA")
def set_osa(self, osa):
"""Set the OSA.
- The OSA can be a Python long (decimal or hexadecimal value),
- or a Python string of length equals to 6 octets.
+ The OSA can be a Python tuple of 6 Python integers,
+ a Python long (decimal or hexadecimal value),
+ a Python string of length equals to 6 octets,
+ or a Python string of length equals to 17 octets.
"""
self.__osa = osa
- try:
- if len(osa) == SIZE_OF_OSA:
+ if type(osa) is str:
+ if len(osa) == 3 * SIZE_OF_OSA - 1:
+ # OSA is a Python string of length equals to 17 octets ('XX:XX:XX:XX:XX:XX')
+ if osa.count(':') == SIZE_OF_OSA - 1:
+ self.__mmheader.osa = pack(n + u64, int(osa.replace(':',''), 16))[SIZE_OF_U16:]
+ else:
+ raise OutOfRangeError("OSA")
+ elif len(osa) == SIZE_OF_OSA:
# OSA is a Python string of length equals to 6 octets
self.__mmheader.osa = osa
else:
raise OutOfRangeError("OSA")
- except TypeError:
+ elif type(osa) is long or type(osa) is int:
# OSA is a Python long
if osa <= MAX_VALUE_OF_OSA and osa >= 0:
- s = pack(uint64_t, osa)
- self.__mmheader.osa = s[SIZE_OF_UINT16_T:SIZE_OF_UINT16_T + SIZE_OF_UINT32_T] + s[:SIZE_OF_UINT16_T]
+ self.__mmheader.osa = hton48(osa)
+ else:
+ raise OutOfRangeError("OSA")
+ elif type(osa) is tuple:
+ # OSA is a Python tuple
+ if len(osa) == SIZE_OF_OSA:
+ self.__mmheader.osa = hton48_tuple(osa)
else:
raise OutOfRangeError("OSA")
+ else:
+ raise TypeError("OSA")
def set_vlantag(self, vlantag):
"""Set the VLAN Tag.
@@ -122,7 +135,7 @@ class MMHeader:
except TypeError:
# VLAN Tag is a Python integer
if vlantag <= MAX_VALUE_OF_VLANTAG and vlantag >= 0:
- self.__mmheader.vlantag = pack(uint32_t, vlantag)
+ self.__mmheader.vlantag = hton32(vlantag)
else:
raise OutOfRangeError("VLAN Tag")
@@ -141,7 +154,7 @@ class MMHeader:
except TypeError:
# MTYPE is a Python integer
if mtype <= MAX_VALUE_OF_MTYPE and mtype >= 0:
- self.__mmheader.mtype = pack(uint16_t, mtype)
+ self.__mmheader.mtype = hton16(mtype)
else:
raise OutOfRangeError("MTYPE")
@@ -160,7 +173,7 @@ class MMHeader:
except TypeError:
# MMV is a Python integer
if mmv <= MAX_VALUE_OF_MMV and mmv >= 0:
- self.__mmheader.mmv = pack(u8, mmv)
+ self.__mmheader.mmv = htohp8(mmv)
else:
raise OutOfRangeError("MMV")
@@ -180,7 +193,7 @@ class MMHeader:
except TypeError:
# MMTYPE is a Python integer
if mmtype <= MAX_VALUE_OF_MMTYPE and mmtype >= 0:
- self.__mmheader.mmtype = pack(u16, mmtype)
+ self.__mmheader.mmtype = htohp16(mmtype)
else:
raise OutOfRangeError("MMTYPE")
@@ -199,21 +212,25 @@ class MMHeader:
except TypeError:
# FMI is a Python integer
if fmi <= MAX_VALUE_OF_FMI and fmi >= 0:
- self.__mmheader.fmi = pack(u16, fmi)
+ self.__mmheader.fmi = htohp16(fmi)
else:
raise OutOfRangeError("FMI")
def get_oda(self):
"""Get the ODA.
- The ODA can be a Python long,
- or a Python string of length equals to 6 octets.
+ The ODA can be a Python tuple of 6 Python integers,
+ a Python long (decimal or hexadecimal value),
+ a Python string of length equals to 6 octets,
+ or a Python string of length equals to 17 octets.
"""
return self.__oda
def get_osa(self):
"""Get the OSA.
- The OSA can be a Python long,
- or a Python string of length equals to 6 octets.
+ The OSA can be a Python tuple of 6 Python integers,
+ a Python long (decimal or hexadecimal value),
+ a Python string of length equals to 6 octets,
+ or a Python string of length equals to 17 octets.
"""
return self.__osa
@@ -262,12 +279,10 @@ class MMHeader:
"""This function displays the MM Header in hexadecimal values.
"""
print "display the MM Header"
- oda = unpack('=IH', self.__mmheader.oda)
- print "ODA =", hex(oda[0])[:len('0x') + 2*SIZE_OF_UINT32_T] + hex(oda[1])[len('0x'):]
- osa = unpack('=IH', self.__mmheader.osa)
- print "OSA =", hex(osa[0])[:len('0x') + 2*SIZE_OF_UINT32_T] + hex(osa[1])[len('0x'):]
- print "VLAN Tag =", hex(unpack(uint32_t, self.__mmheader.vlantag)[0])
- print "MTYPE =", hex(unpack(uint16_t, self.__mmheader.mtype)[0])
- print "MMV =", hex(unpack(u8, self.__mmheader.mmv)[0])
- print "MMTYPE =", hex(unpack(u16, self.__mmheader.mmtype)[0])
- print "FMI =", hex(unpack(u16, self.__mmheader.fmi)[0])
+ print "ODA =", hex(ntoh48(self.__mmheader.oda))
+ print "OSA =", hex(ntoh48(self.__mmheader.oda))
+ print "VLAN Tag =", hex(ntoh16(self.__mmheader.vlantag))
+ print "MTYPE =", hex(ntoh16(self.__mmheader.mtype))
+ print "MMV =", hex(hptoh8(self.__mmheader.mmv))
+ print "MMTYPE =", hex(hptoh16(self.__mmheader.mmtype))
+ print "FMI =", hex(hptoh16(self.__mmheader.fmi))
diff --git a/maximus/python/maximus/mme/mmtype.py b/maximus/python/maximus/mme/mmtype.py
new file mode 100644
index 0000000000..7571513d51
--- /dev/null
+++ b/maximus/python/maximus/mme/mmtype.py
@@ -0,0 +1,20 @@
+#! usr/bin/env python
+
+#print __name__
+
+# MME name constants to compute MMTYPE
+DR_STA_SET_MAC_ADDR = 0xA000
+DR_STA_SET_CCO_PREF = 0xA004
+DR_STA_SET_WAS_CCO = 0xA008
+DR_STA_SET_NPW = 0xA00C
+DR_STA_SET_DPW = 0xA010
+DR_STA_SET_SL = 0xA014
+DR_STA_SET_M_STA_HFID = 0xA018
+DR_STA_SET_U_STA_HFID = 0xA01C
+DR_STA_SET_AVLN_HFID = 0xA020
+DR_STA_SET_TONEMASK = 0xA024
+DR_STA_START = 0xA028
+DR_STA_STOP = 0xA02C
+
+# MME types dictionnary to compute MMTYPE
+MME_TYPES = {'REQ':0x00, 'CNF':0x01, 'IND':0x02, 'RSP':0x03}
diff --git a/maximus/python/maximus/station/config.py b/maximus/python/maximus/station/config.py
index 822f3dc8cc..1a1b0cf7b0 100644
--- a/maximus/python/maximus/station/config.py
+++ b/maximus/python/maximus/station/config.py
@@ -2,21 +2,89 @@
#print __name__
+# Default values of station parameters
+DEFAULT_MAC_ADDRESS = 0x123456789ABC
+DEFAULT_CCO_PREFERENCE = False
+DEFAULT_WAS_CCO = False
+DEFAULT_NPW = "HomePlugAV0123"
+DEFAULT_DPW = "DPW_"
+DEFAULT_M_STA_HFID = "M_STA_HFID_"
+DEFAULT_U_STA_HFID = "U_STA_HFID_"
+DEFAULT_AVLN_HFID = "AVLN_HomePlugAV0123"
+DEFAULT_SL = 0
+DEFAULT_TONEMASK = 85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535
+
class Config:
- """The Config structure is composed of the 4 following fields:
+ """The Config structure is composed of the 10 following fields:
mac_address - a Python tuple of 6 Python integers
cco_preference - a Python boolean
was_cco - a Python boolean
- nid - a Python long
- snid - a Python integer
- npw - a Python string
- sl - a Python integer
+ npw - a Python string of length from 8 to 64 octets
+ dpw - a Python string of length from 0 to 64 octets
+ m_sta_hfid - a Python string of length from 0 to 64 octets
+ u_sta_hfid - a Python string of length from 0 to 64 octets
+ avln_hfid - a Python string of length from 0 to 64 octets
+ sl - a Python integer from 0 to 2
+ tonemask - a Python tuple of Python integers
"""
- def __init__(self, mac_address=None, cco_preference=None, was_cco=None, nid=None, snid=None, npw=None, sl=None):
- self.mac_address = mac_address
- self.cco_preference = cco_preference
- self.was_cco = was_cco
- self.nid = nid
- self.snid = snid
- self.npw = npw
- self.sl = sl
+ def __init__(self, default_config=True, mac_address=None, cco_preference=None, was_cco=None, npw=None, dpw=None, m_sta_hfid=None, u_sta_hfid=None, avln_hfid=None, sl=None, tonemask=None):
+
+ # MAC address
+ if mac_address is None and default_config is True:
+ self.mac_address = DEFAULT_MAC_ADDRESS
+ else:
+ self.mac_address = mac_address
+
+ # CCo preference
+ if cco_preference is None and default_config is True:
+ self.cco_preference = DEFAULT_CCO_PREFERENCE
+ else:
+ self.cco_preference = cco_preference
+
+ # Was CCo
+ if was_cco is None and default_config is True:
+ self.was_cco = DEFAULT_WAS_CCO
+ else:
+ self.was_cco = was_cco
+
+ # Network password
+ if npw is None and default_config is True:
+ self.npw = DEFAULT_NPW
+ else:
+ self.npw = npw
+
+ # Device password
+ if dpw is None and default_config is True:
+ self.dpw = DEFAULT_DPW
+ else:
+ self.dpw = dpw
+
+ # Manufacturer STA HFID
+ if m_sta_hfid is None and default_config is True:
+ self.m_sta_hfid = DEFAULT_M_STA_HFID
+ else:
+ self.m_sta_hfid = m_sta_hfid
+
+ # User STA HFID
+ if u_sta_hfid is None and default_config is True:
+ self.u_sta_hfid = DEFAULT_U_STA_HFID
+ else:
+ self.u_sta_hfid = u_sta_hfid
+
+ # AVLN HFID
+ if avln_hfid is None and default_config is True:
+ self.avln_hfid = DEFAULT_AVLN_HFID
+ else:
+ self.avln_hfid = avln_hfid
+
+ # Security level
+ if sl is None and default_config is True:
+ self.sl = DEFAULT_SL
+ else:
+ self.sl = sl
+
+ # Tonemask
+ if tonemask is None and default_config is True:
+ self.tonemask = DEFAULT_TONEMASK
+ else:
+ self.tonemask = tonemask
diff --git a/maximus/python/maximus/station/sta.py b/maximus/python/maximus/station/sta.py
index 7bd5540f60..5091e83b75 100644
--- a/maximus/python/maximus/station/sta.py
+++ b/maximus/python/maximus/station/sta.py
@@ -2,42 +2,88 @@
#print __name__
-from config import Config
-from maximus.utils.exception import OutOfRangeError
-from struct import pack, unpack
-
-SIZE_OF_MAC_ADDRESS = 6 # in octets
-MAX_VALUE_OF_MAC_ADDRESS = 0xFFFFFFFFFFFF
+from maximus.station.config import *
+from maximus.ethernet.buffer import alloc_mme_buffer
+from maximus.macframe.msdu import MSDU_TYPES
+from maximus.mme import *
+from maximus.mme.mmheader import SIZE_OF_MMHEADER, SIZE_OF_MMTYPE, SIZE_OF_FMI
+from maximus.utils.exception import Error, OutOfRangeError
+from maximus.utils.format import *
+
+# Constants to check arguments validity
+SIZE_OF_MAC_ADDRESS = SIZE_OF_U48 # in octets
+MAX_VALUE_OF_MAC_ADDRESS = MAX_VALUE_OF_U48
+MIN_SIZE_OF_NPW = 8 # in octets
+MAX_SIZE_OF_NPW = 64 # in octets
+MAX_SIZE_OF_DPW = MAX_SIZE_OF_NPW
+MAX_SIZE_OF_HFID = MAX_SIZE_OF_DPW
+MAX_VALUE_OF_SL = 2
+
+# Constants from 'hal/phy/defs.h'
+PHY_CARRIER_NB = 1155 # number of OFDM carrier, defined by the hardware
+PHY_CARRIER_OFFSET = 74 # number of first OFDM carrier, defined by the hardware
+PHY_TONEMASK_SIZE = ((PHY_CARRIER_NB + 8 - 1) / 8) # number of bytes needed to define a tonemask
+
+# Possible configuration modes to set station parameters
+CONFIG_MODES = ['MME', 'fcall_process_drv', 'fcall_cp_station']
+
+# Constants for MME responses
+FAILURE = 0x00
+SUCCESS = 0x01
+RESERVED = 0x02 # to 0xFF
+
+# Filter function to receive only MME responses
+def mme_filter(rsp):
+ if rsp.get_type() is MSDU_TYPES[2]: # ETHERNET_TYPE_MME
+ return True
+ elif rsp.get_type() is MSDU_TYPES[5]: # ETHERNET_TYPE_BUFFER_RELEASED
+ return False
+ else:
+ raise Error("Received unexpected message of type " + rsp.get_type())
+ return False
class STA:
- """The STA class is composed of the 7 following private attributes:
+ """The STA class is composed of the 15 following private attributes:
maximus - a Maximus object
station - a Sta object
name - a Python string
+ mme_buffer_nb - a Python integer
+ mme_config - a Python boolean
mac_address - a Python tuple of 6 Python integers
cco_preference - a Python boolean
was_cco - a Python boolean
- nid - a Python long
- snid - a Python integer
- npw - a Python string
- sl - a Python integer
+ npw - a Python string of length from 8 to 64 octets
+ dpw - a Python string of length from 0 to 64 octets
+ m_sta_hfid - a Python string of length from 0 to 64 octets
+ u_sta_hfid - a Python string of length from 0 to 64 octets
+ avln_hfid - a Python string of length from 0 to 64 octets
+ sl - a Python integer from 0 to 2
+ tonemask - a Python tuple of Python integers
"""
- def __init__(self, maximus, executable=None, name=None, config=Config()):
+ def __init__(self, maximus, executable=None, name=None, mme_buffer_nb=1, config_mode=CONFIG_MODES[0], config=Config()):
"""Initialize the STA with following attributes:
maximus - a Maximus object (already initialized)
+ executable - a Python string
name - a Python string
+ mme_buffer_nb - a Python integer
+ mme_config - a Python boolean
config - a Python Config object
"""
# Set Maximus
self.__set_maximus(maximus)
- # Create and set the station
+ # Create station
self.__create_station(executable)
# Set station configuration
self.set_name(name)
+ self.set_mme_buffer_nb(mme_buffer_nb)
+ self.set_config_mode(config_mode)
self.set_config(config)
+
+ # Start station
+ self.__start()
def __set_maximus(self, maximus):
"""Set Maximus.
@@ -46,7 +92,7 @@ class STA:
self.__maximus = maximus
def __create_station(self, executable):
- """Create and set the station.
+ """Create the station.
The station is a Sta object.
"""
if executable is None:
@@ -54,6 +100,46 @@ class STA:
else:
self.__station = self.__get_maximus().create_sta(executable)
+ def __start(self):
+ """Send the MAC_START message to the station.
+ """
+ if self.get_config_mode() is CONFIG_MODES[0]: # 'MME'
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_START + MME_TYPES['REQ'])
+
+ # Create the MME and wait for the response
+ rsp = MME(MMHeader=mmheader).sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_START)
+
+ else: # 'fcall_cp_station' or 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_mac_start") \
+ .send(self.__get_station())
+
+ def stop(self):
+ """Send the MAC_STOP message to the station.
+ """
+ if self.get_config_mode() is CONFIG_MODES[0]: # 'MME'
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_STOP + MME_TYPES['REQ'])
+
+ # Create the MME and wait for the response
+ rsp = MME(MMHeader=mmheader).sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_STOP)
+
+ else: # 'fcall_cp_station' or 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_mac_stop") \
+ .send(self.__get_station())
+
def set_name(self, name):
"""Set the name.
The name must be a Python string.
@@ -66,17 +152,47 @@ class STA:
self.__get_maximus().process()
self.__get_station().set_name(name)
+ def set_mme_buffer_nb(self, mme_buffer_nb):
+ """Set the number of MME buffers to allocate into the station.
+ The number of MME buffers must be a Python integer.
+ """
+ if type(mme_buffer_nb) is int:
+ if mme_buffer_nb >= 0:
+ self.__mme_buffer_nb = mme_buffer_nb
+ else:
+ raise OutOfRangeError("Number of MME buffers")
+ else:
+ raise TypeError
+
+ # Allocate MME buffers
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station(), buffer_nb=mme_buffer_nb)
+
+ def set_config_mode(self, config_mode):
+ """Set the configuration mode.
+ The configuration mode must be a Python string of the CONFIG_MODES Python list.
+ """
+ if type(config_mode) is str:
+ if config_mode in CONFIG_MODES:
+ self.__config_mode = config_mode
+ else:
+ raise OutOfRangeError("Configuration mode")
+ else:
+ raise TypeError
+
def set_config(self, config):
- """Set the config.
- The config must be a Python Config structure.
+ """Set the station parameters.
+ The station configuration must be a Python Config structure.
"""
self.set_mac_address(config.mac_address)
self.set_cco_preference(config.cco_preference)
self.set_was_cco(config.was_cco)
- self.set_nid(config.nid)
- self.set_snid(config.snid)
self.set_npw(config.npw)
+ self.set_dpw(config.dpw)
+ self.set_m_sta_hfid(config.m_sta_hfid)
+ self.set_u_sta_hfid(config.u_sta_hfid)
+ self.set_avln_hfid(config.avln_hfid)
self.set_sl(config.sl)
+ self.set_tonemask(config.tonemask)
def set_mac_address(self, mac_address):
"""Set the MAC address.
@@ -89,18 +205,18 @@ class STA:
if len(mac_address) == 3 * SIZE_OF_MAC_ADDRESS - 1:
# MAC address is a Python string of length equals to 17 octets ('XX:XX:XX:XX:XX:XX')
if mac_address.count(':') == SIZE_OF_MAC_ADDRESS - 1:
- self.__mac_address = unpack('!BBBBBB', pack('!Q', int(mac_address.replace(':',''), 16))[2:8])
+ self.__mac_address = unpack(6 * u8, pack(n + u64, int(mac_address.replace(':',''), 16))[SIZE_OF_U16:])
else:
raise OutOfRangeError("MAC address")
elif len(mac_address) == SIZE_OF_MAC_ADDRESS:
# MAC address is a Python string of length equals to 6 octets
- self.__mac_address = unpack('!BBBBBB', mac_address)
+ self.__mac_address = ntoh48_tuple(mac_address)
else:
raise OutOfRangeError("MAC address")
- elif type(mac_address) is long:
+ elif type(mac_address) is long or type(mac_address) is int:
# MAC address is a Python long
if mac_address <= MAX_VALUE_OF_MAC_ADDRESS and mac_address >= 0:
- self.__mac_address = unpack('!BBBBBB', pack('!Q', 0x414243444546)[2:8])
+ self.__mac_address = ntoh48_tuple(hton48(mac_address))
else:
raise OutOfRangeError("MAC address")
elif type(mac_address) is tuple:
@@ -114,7 +230,7 @@ class STA:
else:
raise TypeError("MAC address")
- # Send a function call to the station to configure the MAC address
+ # Send a message to the station to configure the MAC address
self.__send_mac_address()
def set_cco_preference(self, cco_preference):
@@ -126,7 +242,7 @@ class STA:
else:
raise TypeError("CCo preference")
- # Send a function call to the station to configure the CCo preference
+ # Send a message to the station to configure the CCo preference
self.__send_cco_preference()
def set_was_cco(self, was_cco):
@@ -138,57 +254,149 @@ class STA:
else:
raise TypeError("Was CCo")
- # Send a function call to the station to configure the previous CCo status
+ # Send a message to the station to configure the previous CCo status
self.__send_was_cco()
- def set_nid(self, nid):
- """Set the NID.
- The NID must be a Python long.
+ def set_npw(self, npw):
+ """Set the Network PassWord.
+ The NPW must be a Python string of length from 8 to 64 octets.
+ """
+ if type(npw) is str:
+ if len(npw) >= MIN_SIZE_OF_NPW and len(npw) <= MAX_SIZE_OF_NPW:
+ self.__npw = npw
+ else:
+ raise OutOfRangeError("NPW")
+ elif npw is None:
+ self.__npw = npw
+ else:
+ raise TypeError("NPW")
+
+ # Send a message to the station to configure the NPW
+ self.__send_npw()
+
+ def set_dpw(self, dpw):
+ """Set the Device PassWord.
+ The DPW must be a Python string of length from 0 to 64 octets.
+ """
+ if type(dpw) is str:
+ if dpw is DEFAULT_DPW:
+ self.__dpw = DEFAULT_DPW + str(self.get_station_id())
+ elif len(dpw) <= MAX_SIZE_OF_DPW:
+ self.__dpw = dpw
+ else:
+ raise OutOfRangeError("DPW")
+ elif dpw is None:
+ self.__dpw = dpw
+ else:
+ raise TypeError("DPW")
+
+ # Send a message to the station to configure the DPW
+ self.__send_dpw()
+
+ def set_m_sta_hfid(self, m_sta_hfid):
+ """Set the Manufacturer STA HFID.
+ The Manufacturer STA HFID must be a Python string of length from 0 to 64 octets.
"""
- if type(nid) is int or long or nid is None:
- self.__nid = nid
+ if type(m_sta_hfid) is str:
+ if m_sta_hfid is DEFAULT_M_STA_HFID:
+ self.__m_sta_hfid = DEFAULT_M_STA_HFID + str(self.get_station_id())
+ elif len(m_sta_hfid) <= MAX_SIZE_OF_HFID:
+ self.__m_sta_hfid = m_sta_hfid
+ else:
+ raise OutOfRangeError("Manufacturer STA HFID")
+ elif m_sta_hfid is None:
+ self.__m_sta_hfid = m_sta_hfid
else:
- raise TypeError("NID")
+ raise TypeError("Manufacturer STA HFID")
- # Send a function call to the station to configure the NID
- self.__send_nid()
+ # Send a message to the station to configure the Manufacturer STA HFID
+ self.__send_m_sta_hfid()
- def set_snid(self, snid):
- """Set the SNID.
- The SNID must be a Python integer.
+ def set_u_sta_hfid(self, u_sta_hfid):
+ """Set the User STA HFID.
+ The User STA HFID must be a Python string of length from 0 to 64 octets.
"""
- if type(snid) is int or snid is None:
- self.__snid = snid
+ if type(u_sta_hfid) is str:
+ if u_sta_hfid is DEFAULT_U_STA_HFID:
+ self.__u_sta_hfid = DEFAULT_U_STA_HFID + str(self.get_station_id())
+ elif len(u_sta_hfid) <= MAX_SIZE_OF_HFID:
+ self.__u_sta_hfid = u_sta_hfid
+ else:
+ raise OutOfRangeError("User STA HFID")
+ elif u_sta_hfid is None:
+ self.__u_sta_hfid = u_sta_hfid
else:
- raise TypeError("SNID")
+ raise TypeError("User STA HFID")
- # Send a function call to the station to configure the SNID
- self.__send_snid()
+ # Send a message to the station to configure the User STA HFID
+ self.__send_u_sta_hfid()
- def set_npw(self, npw):
- """Set the NPW.
- The NPW must be a Python string.
+ def set_avln_hfid(self, avln_hfid):
+ """Set the AVLN HFID.
+ The AVLN HFID must be a Python string of length from 0 to 64 octets.
"""
- if type(npw) is str or npw is None:
- self.__npw = npw
+ if type(avln_hfid) is str:
+ if len(avln_hfid) <= MAX_SIZE_OF_HFID:
+ self.__avln_hfid = avln_hfid
+ else:
+ raise OutOfRangeError("AVLN HFID")
+ elif avln_hfid is None:
+ self.__avln_hfid = avln_hfid
else:
- raise TypeError("NPW")
+ raise TypeError("AVLN HFID")
- # Send a function call to the station to configure the NPW
- self.__send_npw()
+ # Send a message to the station to configure the AVLN HFID
+ self.__send_avln_hfid()
def set_sl(self, sl):
- """Set the SL.
- The SL must be a Python integer.
+ """Set the Security Level.
+ The SL must be a Python integer from 0 to 2.
"""
- if type(sl) is int or sl is None:
+ if type(sl) is int:
+ if sl >= 0 and sl <= MAX_VALUE_OF_SL:
+ self.__sl = sl
+ else:
+ raise OutOfRangeError("Security Level")
+ elif sl is None:
self.__sl = sl
else:
raise TypeError("SL")
- # Send a function call to the station to configure the SL
+ # Send a message to the station to configure the SL
self.__send_sl()
+ def set_tonemask(self, carriers):
+ """Set the tonemask.
+ The carriers must be a Python tuple of Python integers.
+ """
+ if type(carriers) is tuple:
+ # Use the algorithm from 'mac/common/src/tonemask.c'
+ dtc_idx = 0
+ dtc_stop = carriers[dtc_idx] - PHY_CARRIER_OFFSET
+ dtc_on = False
+ tonemask = [] # Python list
+ for l in range(0, PHY_TONEMASK_SIZE):
+ tonemask.append(0)
+ for i in range(0, PHY_TONEMASK_SIZE):
+ for j in range(0, 8):
+ if i * 8 + j > dtc_stop:
+ dtc_on = not dtc_on
+ dtc_idx += 1
+ dtc_stop = carriers[dtc_idx] - PHY_CARRIER_OFFSET
+ if dtc_on is True:
+ tonemask[i] |= 1 << j
+
+ self.__tonemask = ''
+ for t in tonemask:
+ self.__tonemask += htohp8(t)
+ elif carriers is None:
+ self.__tonemask = carriers
+ else:
+ raise TypeError("Tonemask")
+
+ # Send a message to the station to configure the tonemask
+ self.__send_tonemask()
+
def __get_maximus(self):
"""Get Maximus.
Maximus is a Maximus object.
@@ -207,17 +415,32 @@ class STA:
"""
return self.__name
+ def get_mme_buffer_nb(self):
+ """Get the number of MME buffers allocated into the station.
+ The number of MME buffers is a Python integer.
+ """
+ return self.__mme_buffer_nb
+
+ def get_config_mode(self):
+ """Get the configuration mode.
+ The configuration mode is a Python string of the CONFIG_MODES Python list.
+ """
+ return self.__config_mode
+
def get_config(self):
- """Get the config.
- The config is a Python Config structure.
+ """Get the station parameters.
+ The station configuration is a Python Config structure.
"""
return Config(mac_address=self.get_mac_address(), \
cco_preference=self.get_cco_preference(), \
was_cco=self.get_was_cco(), \
- nid=self.get_nid(), \
- snid=self.get_snid(), \
npw=self.get_npw(), \
- sl=self.get_sl())
+ dpw=self.get_dpw(), \
+ m_sta_hfid=self.get_m_sta_hfid(), \
+ u_sta_hfid=self.get_u_sta_hfid(), \
+ avln_hfid=self.get_avln_hfid(), \
+ sl=self.get_sl(), \
+ tonemask=self.get_tonemask())
def get_mac_address(self):
"""Get the MAC address.
@@ -237,85 +460,439 @@ class STA:
"""
return self.__was_cco
- def get_nid(self):
- """Get the NID.
- The NID is a Python long.
+ def get_npw(self):
+ """Get the Network PassWord.
+ The NPW is a Python string of length from 8 to 64 octets.
+ """
+ return self.__npw
+
+ def get_dpw(self):
+ """Get the Device PassWord.
+ The DPW is a Python string of length from 0 to 64 octets.
"""
- return self.__nid
+ return self.__dpw
- def get_snid(self):
- """Get the SNID.
- The SNID is a Python integer.
+ def get_m_sta_hfid(self):
+ """Get the Manufacturer STA HFID.
+ The Manufacturer STA HFID is a Python string of length from 0 to 64 octets.
"""
- return self.__snid
+ return self.__m_sta_hfid
- def get_npw(self):
- """Get the NPW.
- The NPW is a Python string.
+ def get_u_sta_hfid(self):
+ """Get the User STA HFID.
+ The User STA HFID is a Python string of length from 0 to 64 octets.
"""
- return self.__npw
+ return self.__u_sta_hfid
+
+ def get_avln_hfid(self):
+ """Get the AVLN HFID.
+ The AVLN HFID is a Python string of length from 0 to 64 octets.
+ """
+ return self.__avln_hfid
def get_sl(self):
- """Get the SL.
+ """Get the Security Level.
The SL is a Python integer.
"""
return self.__sl
+ def get_tonemask(self):
+ """Get the tonemask.
+ The tonemask is a bits field.
+ """
+ return self.__tonemask
+
def __send_mac_address(self):
- """Send a function call to the station to configure the MAC address.
+ """Send a message to the station to configure the MAC address.
"""
if self.get_mac_address() is not None:
- self.__get_maximus().create_fcall("maximus_set_mac_address") \
- .add_param_n_u8("mac_address", self.get_mac_address()) \
- .send(self.__get_station())
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_mac_address") \
+ .add_param_n_u8("mac_address", self.get_mac_address()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=self.get_mac_address(), MMV=0x01, MMTYPE = DR_STA_SET_MAC_ADDR + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Mac address of the station
+ # Octet Number = 0 - 5
+ # Field Size (Octets) = 6
+ StaMAC = ''
+ for t in self.get_mac_address():
+ StaMAC += htohp8(t)
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=StaMAC)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_mac_address") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_MAC_ADDR)
def __send_cco_preference(self):
- """Send a function call to the station to configure the CCo preference.
+ """Send a message to the station to configure the CCo preference.
"""
if self.get_cco_preference() is not None:
- self.__get_maximus().create_fcall("maximus_set_cco_preference") \
- .add_param_bool("cco_preference", self.get_cco_preference()) \
- .send(self.__get_station())
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_cco_preference") \
+ .add_param_bool("cco_preference", self.get_cco_preference()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_CCO_PREF + MME_TYPES['REQ'])
+
+ # Computes MM Entry:
+ # - 0x00 = Station is not CCo
+ # - 0x01 = Station is CCo
+ # - 0x02 - 0xFF = Reserved
+ # Octet Number = 0
+ # Field Size (Octets) = 1
+ if self.get_cco_preference() is True:
+ CCoPref = htohp8(0x01)
+ else:
+ CCoPref = htohp8(0x00)
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=CCoPref)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_cco_preference") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_CCO_PREF)
def __send_was_cco(self):
- """Send a function call to the station to configure the previous CCo status.
+ """Send a message to the station to configure the previous CCo status.
"""
if self.get_was_cco() is not None:
- self.__get_maximus().create_fcall("maximus_set_was_cco") \
- .add_param_bool("was_cco", self.get_was_cco()) \
- .send(self.__get_station())
-
- def __send_nid(self):
- """Send a function call to the station to configure the NID.
- """
- if self.get_nid() is not None:
- self.__get_maximus().create_fcall("maximus_set_nid") \
- .add_param_ulong("nid", self.get_nid()) \
- .send(self.__get_station())
-
- def __send_snid(self):
- """Send a function call to the station to configure the SNID.
- """
- if self.get_snid() is not None:
- self.__get_maximus().create_fcall("maximus_set_snid") \
- .add_param_uchar("snid", self.get_snid()) \
- .send(self.__get_station())
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_was_cco") \
+ .add_param_bool("was_cco", self.get_was_cco()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_WAS_CCO + MME_TYPES['REQ'])
+
+ # Computes MM Entry:
+ # - 0x00 = Station was not CCo
+ # - 0x01 = Station was previously CCo
+ # - 0x02 - 0xFF = Reserved
+ # Octet Number = 0
+ # Field Size (Octets) = 1
+ if self.get_was_cco() is True:
+ WasCCo = htohp8(0x01)
+ else:
+ WasCCo = htohp8(0x00)
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=WasCCo)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_was_cco") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_WAS_CCO)
def __send_npw(self):
- """Send a function call to the station to configure the NPW.
+ """Send a message to the station to configure the NPW.
"""
if self.get_npw() is not None:
- self.__get_maximus().create_fcall("maximus_set_npw") \
- .add_param("npw", self.get_npw()) \
- .send(self.__get_station())
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_npw") \
+ .add_param("npw", self.get_npw()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_NPW + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Human-Readable (ASCII) Station Network Password
+ # Octet Number = 0 - 63
+ # Field Size (Octets) = 64
+ StaNPW = self.get_npw()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=StaNPW)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_npw") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_NPW)
+
+ def __send_dpw(self):
+ """Send a message to the station to configure the DPW.
+ """
+ if self.get_dpw() is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_dpw") \
+ .add_param("dpw", self.get_dpw()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_DPW + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Human-Readable (ASCII) Station Device Password
+ # Octet Number = 0 - 63
+ # Field Size (Octets) = 64
+ StaDPW = self.get_dpw()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=StaDPW)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_dpw") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_DPW)
+
+ def __send_m_sta_hfid(self):
+ """Send a message to the station to configure the Manufacturer STA HFID.
+ """
+ if self.get_m_sta_hfid() is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_m_sta_hfid") \
+ .add_param("m_sta_hfid", self.get_m_sta_hfid()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_M_STA_HFID + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Manufacturer Station Human-Friendly ID in ASCII format
+ # Octet Number = 0 - 63
+ # Field Size (Octets) = 64
+ Sta_m_HFID = self.get_m_sta_hfid()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=Sta_m_HFID)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_m_sta_hfid") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_M_STA_HFID)
+
+ def __send_u_sta_hfid(self):
+ """Send a message to the station to configure the User STA HFID.
+ """
+ if self.get_u_sta_hfid() is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_u_sta_hfid") \
+ .add_param("u_sta_hfid", self.get_u_sta_hfid()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_U_STA_HFID + MME_TYPES['REQ'])
+
+ # Computes MM Entry: User Station Human-Friendly ID in ASCII format
+ # Octet Number = 0 - 63
+ # Field Size (Octets) = 64
+ Sta_u_HFID = self.get_u_sta_hfid()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=Sta_u_HFID)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_u_sta_hfid") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_U_STA_HFID)
+
+ def __send_avln_hfid(self):
+ """Send a message to the station to configure the AVLN HFID.
+ """
+ if self.get_avln_hfid() is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_avln_hfid") \
+ .add_param("avln_hfid", self.get_avln_hfid()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_AVLN_HFID + MME_TYPES['REQ'])
+
+ # Computes MM Entry: AVLN Station Human-Readable ID in ASCII format
+ # Octet Number = 0 - 63
+ # Field Size (Octets) = 64
+ Sta_AVLN_HFID = self.get_avln_hfid()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=Sta_AVLN_HFID)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_avln_hfid") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_AVLN_HFID)
def __send_sl(self):
- """Send a function call to the station to configure the SL.
+ """Send a message to the station to configure the SL.
"""
if self.get_sl() is not None:
- self.__get_maximus().create_fcall("maximus_set_sl") \
- .add_param_uchar("sl", self.get_sl()) \
- .send(self.__get_station())
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_sl") \
+ .add_param_uchar("sl", self.get_sl()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_SL + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Security Level for New NMK
+ # Octet Number = 0
+ # Field Size (Octets) = 1
+ SL = htohp8(self.get_sl())
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=SL)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_sl") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_SL)
+
+ def __send_tonemask(self):
+ """Send a message to the station to configure the tonemask.
+ """
+ if self.get_tonemask() is not None:
+ if self.get_config_mode() is CONFIG_MODES[2]: # 'fcall_cp_station'
+ self.__get_maximus().create_fcall("maximus_set_tonemask") \
+ .add_param("tonemask", self.get_tonemask()) \
+ .send(self.__get_station())
+ else: # we need to build an MME
+
+ # Computes MM Header
+ if self.get_mac_address() is not None:
+ osa = self.get_mac_address()
+ else:
+ osa = DEFAULT_MAC_ADDRESS
+ mmheader = MMHeader(ODA=DEFAULT_MAC_ADDRESS, OSA=osa, MMV=0x01, MMTYPE = DR_STA_SET_TONEMASK + MME_TYPES['REQ'])
+
+ # Computes MM Entry: Station Tonemask bitfield
+ # Octet Number = 0 - 191
+ # Field Size (Octets) = 192
+ Sta_Tonemask = self.get_tonemask()
+
+ # Create the MME
+ mme = MME(MMHeader=mmheader, MMEntry=Sta_Tonemask)
+
+ if self.get_config_mode() is CONFIG_MODES[1]: # 'fcall_process_drv'
+ self.__get_maximus().create_fcall("maximus_set_tonemask") \
+ .add_param("mme", mme.get()) \
+ .send(self.__get_station())
+ else: # 'MME'
+ # Allocate an MME buffer
+ alloc_mme_buffer(maximus=self.__get_maximus(), station=self.__get_station())
+
+ # Send the MME and wait for the response
+ rsp = mme.sendnrecv(maximus=self.__get_maximus(), station=self.__get_station(), filter=mme_filter)
+ self.__check_cnf(rsp, DR_STA_SET_TONEMASK)
+
+ def __check_cnf(self, rsp, name):
+ """Check the MME response validity.
+ """
+ mmtype = hptoh16(rsp[0].get_mmheader()[SIZE_OF_MMHEADER - SIZE_OF_FMI - SIZE_OF_MMTYPE:SIZE_OF_MMHEADER - SIZE_OF_FMI])
+ if mmtype != name + MME_TYPES['CNF']:
+ raise Error("Received unexpected message of type " + hex(mmtype))
+ mmentry = hptoh8(rsp[0].get_mmentry()[0])
+ if mmentry == FAILURE:
+ raise Error(hex(name) + ".CNF => Failure")
+ elif mmentry >= RESERVED:
+ raise Error(hex(name) + ".CNF => Reserved: " + hex(mmentry))
# Following functions just are encapsulation of interface module Boost Python functions.
#
diff --git a/maximus/python/maximus/utils/__init__.py b/maximus/python/maximus/utils/__init__.py
index 259e27a033..817bb5db8a 100644
--- a/maximus/python/maximus/utils/__init__.py
+++ b/maximus/python/maximus/utils/__init__.py
@@ -2,8 +2,9 @@
#print __name__
-#__all__ = ["converter", "crc", "exception"]
+#__all__ = ["converter", "crc", "exception", "format"]
from converter import *
from crc import *
from exception import *
+from format import *
diff --git a/maximus/python/maximus/utils/format.py b/maximus/python/maximus/utils/format.py
new file mode 100644
index 0000000000..d55d9a5b44
--- /dev/null
+++ b/maximus/python/maximus/utils/format.py
@@ -0,0 +1,255 @@
+#! usr/bin/env python
+
+#print __name__
+
+from struct import calcsize, pack, unpack
+
+
+# Define needed constants for 'struct.pack()' and 'struct.unpack()' functions
+#
+
+# pack(fmt, v1, v2, ...)
+# Return a string containing the values v1, v2, ..., packed according to the given format.
+# The arguments must match the values required by the format exactly.
+
+# unpack(fmt, string)
+# Unpack the string (presumably packed by pack(fmt, ... )) according to the given format.
+# The result is a tuple even if it contains exactly one item.
+# The string must contain exactly the amount of data required by the format (len(string) must equal calcsize(fmt)).
+
+# calcsize(fmt)
+# Return the size of the struct (and hence of the string) corresponding to the given format.
+
+
+# Format C Type Python
+# "x" pad byte no value
+# "c" char string of length 1
+# "b" signed char integer
+# "B" unsigned char integer
+# "h" short integer
+# "H" unsigned short integer
+# "i" int integer
+# "I" unsigned int long
+# "l" long integer
+# "L" unsigned long long
+# "q" long long long
+# "Q" unsigned long long long
+# "f" float float
+# "d" double float
+# "s" char[] string
+# "p" char[] string
+# "P" void * integer
+
+u8 = 'B'
+u16 = 'H'
+u32 = 'L' # or 'I'
+u64 = 'Q'
+
+# In octets
+SIZE_OF_U8 = calcsize(u8)
+SIZE_OF_U16 = calcsize(u16)
+SIZE_OF_U24 = 3 * SIZE_OF_U8
+SIZE_OF_U32 = calcsize(u32)
+SIZE_OF_U48 = SIZE_OF_U32 + SIZE_OF_U16
+SIZE_OF_U64 = calcsize(u64)
+
+MAX_VALUE_OF_U8 = 0xFF
+MAX_VALUE_OF_U16 = 0xFFFF
+MAX_VALUE_OF_U24 = 0xFFFFFF
+MAX_VALUE_OF_U32 = 0xFFFFFFFF
+MAX_VALUE_OF_U48 = 0xFFFFFFFFFFFF
+MAX_VALUE_OF_U64 = 0xFFFFFFFFFFFFFFFF
+
+
+# Character Byte order Size and alignment
+# "@" native native
+# "=" native standard
+# "<" little-endian standard
+# ">" big-endian standard
+# "!" network (= big-endian) standard
+
+h = '=' # hardware (= native = little-endian)
+le = '<' # little-endian
+be = '>' # big-endian
+n = '!' # network (= big-endian)
+hp = le # HomePlug (= little-endian)
+
+
+# Following functions respectively pack a u8, u16, u24, u32, u48 or u64 value into a string
+#
+
+def htohp8(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ return pack(hp + u8, value)
+
+def htohp16(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ return pack(hp + u16, value)
+
+def htohp24(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ string = pack(hp + u32, value)
+ return string[:SIZE_OF_U24]
+
+def htohp32(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ return pack(hp + u32, value)
+
+def htohp48(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ string = pack(hp + u64, value)
+ return string[SIZE_OF_U16:SIZE_OF_U48] + string[:SIZE_OF_U16]
+
+def htohp48_tuple(tuple):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ string = ''
+ for t in tuple:
+ string += htohp8(t)
+ result = ''
+ for i in range(0, len(string)):
+ result += string[len(string)-1-i]
+ return result[SIZE_OF_U16:SIZE_OF_U48] + result[:SIZE_OF_U16]
+
+def htohp64(value):
+ """Hardware to HomePlug,
+ i.e. little-endian to little-endian.
+ """
+ return pack(hp + u64, value)
+
+def hton8(value):
+ """Hardware to network,
+ i.e. little-endian to big-endian.
+ """
+ return pack(n + u8, value)
+
+def hton16(value):
+ """Hardware to network,
+ i.e. little-endian to big-endian.
+ """
+ return pack(n + u16, value)
+
+def hton32(value):
+ """Hardware to network,
+ i.e. little-endian to big-endian.
+ """
+ return pack(n + u32, value)
+
+def hton48(value):
+ """Hardware to network,
+ i.e. little-endian to big-endian.
+ """
+ return pack(n + u64, value)[SIZE_OF_U16:]
+
+def hton48_tuple(tuple):
+ """Hardware to HomePlug,
+ i.e. little-endian to big-endian.
+ """
+ string = ''
+ for t in tuple:
+ string += htohp8(t)
+ return string
+
+def hton64(value):
+ """Hardware to network,
+ i.e. little-endian to big-endian.
+ """
+ return pack(n + u64, value)
+
+
+# Following functions respectively unpack a string into a u8, u16, u24, u32, u48 or u64 value
+#
+
+def ntoh8(string):
+ """Network to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(n + u8, string)[0]
+
+def ntoh16(string):
+ """Network to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(n + u16, string)[0]
+
+def ntoh32(string):
+ """Network to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(n + u32, string)[0]
+
+def ntoh48(string):
+ """Network to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(n + u64, '\0\0' + string)[0]
+
+def ntoh48_tuple(string):
+ """HomePlug to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(hp + len(string) * u8, string)[:len(string)]
+
+def ntoh64(string):
+ """Network to hardware,
+ i.e. big-endian to little-endian.
+ """
+ return unpack(n + u64, string)[0]
+
+def hptoh8(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u8, string)[0]
+
+def hptoh16(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u16, string)[0]
+
+def hptoh24(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u32, string + '\0')[0]
+
+def hptoh32(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u32, string)[0]
+
+def hptoh48(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u64, string[SIZE_OF_U32:SIZE_OF_U48] + string[:SIZE_OF_U32] + '\0\0')[0]
+
+def hptoh48_tuple(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ string = string[SIZE_OF_U32:SIZE_OF_U48] + string[:SIZE_OF_U32]
+ tuple = unpack(hp + len(string) * u8, string)[:len(string)]
+ result = ()
+ for i in range(0, len(tuple)):
+ result += (tuple[len(tuple)-1-i]),
+ return result
+
+def hptoh64(string):
+ """HomePlug to hardware,
+ i.e. little-endian to little-endian.
+ """
+ return unpack(hp + u64, string)[0]
diff --git a/maximus/python/py/test_ether.py b/maximus/python/py/test_ether.py
index ba36174d4c..4e35cbe497 100644
--- a/maximus/python/py/test_ether.py
+++ b/maximus/python/py/test_ether.py
@@ -78,7 +78,7 @@ if str(rsp1[i].payload) != msdu1 + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(msdu
# Test the 2nd received MSDU (MME) object
i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
-if rsp1[i].get_mmheader() != 'DCBAFEJIHGLKPONMRQThis ':
+if rsp1[i].get_mmheader() != 'ABCDEFGHIJKLMNOPQRThis ':
print "mmheader1 =", rsp1[i].get_mmheader()
raise Error("mmheader1")
if rsp1[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_MMHEADER - 23) * pack('B', 0):
@@ -118,7 +118,7 @@ if str(rsp2[i].payload) != msdu2 + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(msdu
# Test the 2nd received MSDU (MME) object
i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
-if rsp2[i].get_mmheader() != 'DCBAFEJIHGLKPONMRQThis ':
+if rsp2[i].get_mmheader() != 'ABCDEFGHIJKLMNOPQRThis ':
print "mmheader2 =", rsp2[i].get_mmheader()
raise Error("mmheader2")
if rsp2[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_MMHEADER - 23) * pack('B', 0):
@@ -135,16 +135,16 @@ rsp3 = recv(m, count=4)
# Test the 1st received MSDU (Eth) object
i = 0
-if rsp3[i].dst != '65:48:4d:4d:64:61': # 'MMHead'
+if rsp3[i].dst != '4d:4d:48:65:61:64': # 'MMHead'
print "dst3 =", rsp3[i].dst
raise Error("dst3")
-if rsp3[i].src != '68:2:72:65:73:61': # 'er has'
+if rsp3[i].src != '65:72:20:68:61:73': # 'er has'
print "src3 =", rsp3[i].src
raise Error("src3")
-if rsp3[i].vlantag != 0x20333220: # ' 23 '
+if rsp3[i].vlantag != 0x20323320: # ' 23 '
print "vlantag3 =", hex(rsp3[i].vlantag)
raise Error("vlantag3")
-if rsp3[i].type != 0x636F: # 'oc'
+if rsp3[i].type != 0x6F63: # 'oc'
print "type3 =", hex(rsp3[i].type)
raise Error("type3")
if str(rsp3[i].payload) != 'tets!--- This is the Management Message Entry ---':
@@ -169,16 +169,16 @@ rsp4 = mme4.sendnrecv(m, sta1, count=4)
# Test the 1st received MSDU (Eth) object
i = 0
-if rsp4[i].dst != '65:48:4d:4d:64:61': # 'MMHead'
+if rsp4[i].dst != '4d:4d:48:65:61:64': # 'MMHead'
print "dst4 =", rsp4[i].dst
raise Error("dst4")
-if rsp4[i].src != '68:2:72:65:73:61': # 'er has'
+if rsp4[i].src != '65:72:20:68:61:73': # 'er has'
print "src4 =", rsp4[i].src
raise Error("src4")
-if rsp4[i].vlantag != 0x20333220: # ' 23 '
+if rsp4[i].vlantag != 0x20323320: # ' 23 '
print "vlantag4 =", hex(rsp4[i].vlantag)
raise Error("vlantag4")
-if rsp4[i].type != 0x636F: # 'oc'
+if rsp4[i].type != 0x6F63: # 'oc'
print "type4 =", hex(rsp4[i].type)
raise Error("type4")
if str(rsp4[i].payload) != 'tets!--- This is the Management Message Entry ---':
diff --git a/maximus/python/test/test.txt b/maximus/python/test/test.txt
index a1b8bba42f..fdf4907302 100644
--- a/maximus/python/test/test.txt
+++ b/maximus/python/test/test.txt
@@ -9,3 +9,86 @@ Allow to get values in different formats.
>>> d.get_hex()
'7b'
+
+FORMAT TEST
+-----------
+
+>>> from maximus.utils.format import *
+>>> hton8(0x41)
+'A'
+>>> hex(ntoh8('A'))
+'0x41'
+>>> hex(ntoh8(hton8(0x41)))
+'0x41'
+>>> hton16(0x4142)
+'AB'
+>>> hex(ntoh16('AB'))
+'0x4142'
+>>> hex(ntoh16(hton16(0x4142)))
+'0x4142'
+>>> hton32(0x41424344)
+'ABCD'
+>>> hex(ntoh32('ABCD'))
+'0x41424344'
+>>> hex(ntoh32(hton32(0x41424344)))
+'0x41424344'
+>>> hton48(0x414243444546)
+'ABCDEF'
+>>> hex(ntoh48('ABCDEF'))
+'0x414243444546L'
+>>> hex(ntoh48(hton48(0x414243444546)))
+'0x414243444546L'
+>>> hton48_tuple((0x41, 0x42, 0x43, 0x44, 0x45, 0x46))
+'ABCDEF'
+>>> ntoh48_tuple('ABCDEF')
+(65, 66, 67, 68, 69, 70)
+>>> ntoh48_tuple(hton48_tuple((65, 66, 67, 68, 69, 70)))
+(65, 66, 67, 68, 69, 70)
+>>> hton64(0x4142434445464748)
+'ABCDEFGH'
+>>> hex(ntoh64('ABCDEFGH'))
+'0x4142434445464748L'
+>>> hex(ntoh64(hton64(0x4142434445464748)))
+'0x4142434445464748L'
+>>> htohp8(0x41)
+'A'
+>>> hex(hptoh8('A'))
+'0x41'
+>>> hex(hptoh8(htohp8(0x41)))
+'0x41'
+>>> htohp16(0x4142)
+'BA'
+>>> hex(hptoh16('BA'))
+'0x4142'
+>>> hex(hptoh16(htohp16(0x4142)))
+'0x4142'
+>>> htohp24(0x414243)
+'CBA'
+>>> hex(hptoh24('CBA'))
+'0x414243'
+>>> hex(hptoh24(htohp24(0x414243)))
+'0x414243'
+>>> htohp32(0x41424344)
+'DCBA'
+>>> hex(hptoh32('DCBA'))
+'0x41424344'
+>>> hex(hptoh32(htohp32(0x41424344)))
+'0x41424344'
+>>> htohp48(0x414243444546)
+'DCBAFE'
+>>> hex(hptoh48('DCBAFE'))
+'0x414243444546L'
+>>> hex(hptoh48(htohp48(0x414243444546)))
+'0x414243444546L'
+>>> htohp48_tuple((0x41, 0x42, 0x43, 0x44, 0x45, 0x46))
+'DCBAFE'
+>>> hptoh48_tuple('DCBAFE')
+(65, 66, 67, 68, 69, 70)
+>>> hptoh48_tuple(htohp48_tuple((65, 66, 67, 68, 69, 70)))
+(65, 66, 67, 68, 69, 70)
+>>> htohp64(0x4142434445464748)
+'HGFEDCBA'
+>>> hex(hptoh64('HGFEDCBA'))
+'0x4142434445464748L'
+>>> hex(hptoh64(htohp64(0x4142434445464748)))
+'0x4142434445464748L'
diff --git a/maximus/python/test/test_ethernet.py b/maximus/python/test/test_ethernet.py
index 2e5150485c..e0105407ba 100644
--- a/maximus/python/test/test_ethernet.py
+++ b/maximus/python/test/test_ethernet.py
@@ -169,7 +169,7 @@ class TestEthFunctions(unittest.TestCase):
pass
def test_set_msdu_attr(self):
- p = 'DCBAFEJIHGLKPONMRQThis is the Ethernet Payload'
+ p = 'ABCDEFGHIJKLMNOPQRThis is the Ethernet Payload'
self.eth.set_msdu_attr(p)
self.assertEqual(self.eth.dst, '41:42:43:44:45:46')
self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c')
@@ -187,7 +187,7 @@ class TestEthFunctions(unittest.TestCase):
# 1st received frame is the Ethernet frame
self.assertEqual(rsp[0].dst, 'ff:ff:ff:ff:ff:ff')
- self.assertEqual(rsp[0].src, ':::::')
+ self.assertEqual(rsp[0].src, '')
self.assertEqual(rsp[0].vlantag, 0)
self.assertEqual(rsp[0].type, 0)
@@ -214,7 +214,7 @@ class TestEthFunctions(unittest.TestCase):
self.eth.type = 0x5152
s = 'This is the Ethernet Payload'
self.eth.payload = s
- self.assertEqual(self.eth.get(), 'DCBAFEJIHGLKPONMRQThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(s)) * pack('B', 0))
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNOPQRThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(s)) * pack('B', 0))
def test_get_ether_type(self):
self.assertEqual(self.eth.get_ether_type(), 1)
diff --git a/maximus/python/test/test_fsm.py b/maximus/python/test/test_fsm.py
index 6cbda7d406..008dee4f2a 100644
--- a/maximus/python/test/test_fsm.py
+++ b/maximus/python/test/test_fsm.py
@@ -31,7 +31,7 @@ f.set_default_transition (error, 'IDLE')
f.add_transition_any ('INIT', None, 'END')
f.add_transition_any ('IDLE', None, 'END')
f.add_transition_any ('BUSY', None, 'END')
-f.add_transition ('create_station', 'INIT', create, 'IDLE')
+f.add_transition ('create_station', 'INIT', create, 'IDLE')
f.add_transition ('send_message', 'IDLE', send, 'BUSY')
f.add_transition ('receive_response', 'BUSY', receive, 'IDLE')
f.add_transition ('remove_station', 'IDLE', remove, 'INIT')
diff --git a/maximus/python/test/test_macframe.py b/maximus/python/test/test_macframe.py
index 21dfa2fb43..42d504ee7c 100644
--- a/maximus/python/test/test_macframe.py
+++ b/maximus/python/test/test_macframe.py
@@ -419,7 +419,7 @@ class TestMPDUFunctions(unittest.TestCase):
# Test with a Python string
fc10 = 'ABCD'
self.mpdu.set_fc_10(fc10)
- self.assertEqual(self.mpdu.get_fc_10(),unpack('I', fc10))
+ self.assertEqual(self.mpdu.get_fc_10(),unpack('I', fc10)[0])
def test_set_fc_av(self):
# Test with an FC AV object
diff --git a/maximus/python/test/test_mme.py b/maximus/python/test/test_mme.py
index 24e98b4abb..e1936e55e9 100644
--- a/maximus/python/test/test_mme.py
+++ b/maximus/python/test/test_mme.py
@@ -29,7 +29,7 @@ mme1 = MME(MMHeader=MMHeader(MMV=0x01), MMEntry=MMEntry('This is the MM Entry'))
mme1.send(m, staRx)
# Create an MME
-mme2 = MME(name='CC_ACCESS_NEW', type='REQ')
+mme2 = MME()
# Send the MME asynchronously
#rsp = mme2.sendnrecv(m)
@@ -44,6 +44,7 @@ import doctest
doctest.testmod(mme)
doctest.testmod(mmentry)
doctest.testmod(mmheader)
+doctest.testmod(mmtype)
# UNIT TEST
@@ -144,10 +145,8 @@ class TestMMHeaderFunctions(unittest.TestCase):
self.assertEqual(self.mmheader.get_fmi(), fmi)
def test_get(self):
- mmheader = MMHeader(ODA=0x646362616665, OSA=0x6A6968676C6B, VLANTag=0x706F6E6D, MTYPE=0x7271, MMV=0x73, MMTYPE=0x7475, FMI=0x7677)
- self.assertEqual(mmheader.get(), 'abcdefghijklmnopqrstuvw')
mmheader = MMHeader(ODA=0x616263646566, OSA=0x6768696A6B6C, VLANTag=0x6D6E6F70, MTYPE=0x7172, MMV=0x73, MMTYPE=0x7574, FMI=0x7776)
- self.assertEqual(mmheader.get(), 'dcbafejihglkponmrqsutwv')
+ self.assertEqual(mmheader.get(), 'abcdefghijklmnopqrstuvw')
suite = unittest.TestLoader().loadTestsFromTestCase(TestMMHeaderFunctions)
@@ -237,6 +236,7 @@ try:
suite.addTest(doctest.DocTestSuite(mme))
suite.addTest(doctest.DocTestSuite(mmentry))
suite.addTest(doctest.DocTestSuite(mmheader))
+ suite.addTest(doctest.DocTestSuite(mmtype))
except ValueError:
print "has no tests"
diff --git a/maximus/python/test/test_station.py b/maximus/python/test/test_station.py
index 9dcb6eaf9c..e847053cb7 100644
--- a/maximus/python/test/test_station.py
+++ b/maximus/python/test/test_station.py
@@ -6,8 +6,8 @@ import startup, sys
from maximus.station import *
from maximus.utils.exception import Error
+from maximus.utils.format import *
from interface import *
-from struct import pack
# STATION TEST
@@ -23,22 +23,23 @@ file_desc_nb = 3 # number of file descriptors per station (3 for pipe, 2 for soc
sta1 = STA(m, name="my_sta")
station_nb += 1
+sta1.debug()
sta1.set_mac_address(0x123456789ABC)
sta2 = STA(m, config=Config(mac_address='12:34:56:78:9A:BC'))
station_nb += 1
+sta2.debug()
sta2.set_cco_preference(False)
-sta3 = STA(m, name="my_sta", executable='../stationtest/obj/stationtest.elf')
+sta3 = STA(m, name="my_sta", executable='../stationtest/obj/test_station.elf')
sta3.debug()
-conf = Config(cco_preference=True, nid=0x12345678, snid=255)
+conf = Config(cco_preference=True)
sta4 = STA(m, name="my_sta", config=conf)
station_nb += 1
-sta4.set_nid(3)
-sta4.set_snid(3)
+sta4.debug()
-conf = Config(was_cco=True, npw="netclock_password", sl=3)
+conf = Config(was_cco=True, npw="netclock_password", sl=2)
sta5 = STA(m, name="This is the name of my station", config=conf)
station_nb += 1
sta5.debug()
@@ -83,17 +84,23 @@ class TestSTAFunctions(unittest.TestCase):
self.station.set_name(name)
self.assertEqual(self.station.get_name(), name)
+ def test_set_mme_buffer_nb(self):
+ mme_buffer_nb = 4
+ self.station.set_mme_buffer_nb(mme_buffer_nb)
+ self.assertEqual(self.station.get_mme_buffer_nb(), mme_buffer_nb)
+
+ def test_set_config_mode(self):
+ config_mode = 'fcall_process_drv'
+ self.station.set_config_mode(config_mode)
+ self.assertEqual(self.station.get_config_mode(), config_mode)
+
def test_set_config(self):
conf = Config()
conf.mac_address = (0x11, 0x22, 0x33, 0x44, 0x55, 0x66)
conf.cco_preference = True
- conf.nid = 0x12345678
- conf.snid = None
self.station.set_config(conf)
self.assertEqual(self.station.get_config().mac_address, conf.mac_address)
self.assertEqual(self.station.get_config().cco_preference, conf.cco_preference)
- self.assertEqual(self.station.get_config().nid, conf.nid)
- self.assertEqual(self.station.get_config().snid, conf.snid)
def test_set_mac_address(self):
mac_address = (0x41, 0x42, 0x43, 0x44, 0x45, 0x46)
@@ -104,7 +111,7 @@ class TestSTAFunctions(unittest.TestCase):
self.station.set_mac_address(0x414243444546)
self.assertEqual(self.station.get_mac_address(), mac_address)
# Test with a Python string of length equals to 6 octets
- self.station.set_mac_address(pack('!Q', 0x414243444546)[2:8])
+ self.station.set_mac_address(pack(n + u64, 0x414243444546)[SIZE_OF_U16:])
self.assertEqual(self.station.get_mac_address(), mac_address)
# Test with a Python string of length equals to 17 octets ('XX:XX:XX:XX:XX:XX')
self.station.set_mac_address('41:42:43:44:45:46')
@@ -120,26 +127,48 @@ class TestSTAFunctions(unittest.TestCase):
self.station.set_was_cco(was_cco)
self.assertEqual(self.station.get_was_cco(), was_cco)
- def test_set_nid(self):
- nid = 0x12345678
- self.station.set_nid(nid)
- self.assertEqual(self.station.get_nid(), nid)
-
- def test_set_snid(self):
- snid = 0x12
- self.station.set_snid(snid)
- self.assertEqual(self.station.get_snid(), snid)
-
def test_set_npw(self):
npw = "This is the network password"
self.station.set_npw(npw)
self.assertEqual(self.station.get_npw(), npw)
+ def test_set_dpw(self):
+ dpw = "This is the device password"
+ self.station.set_dpw(dpw)
+ self.assertEqual(self.station.get_dpw(), dpw)
+
+ def test_set_m_sta_hfid(self):
+ m_sta_hfid = "This is the manufacturer sta hfid"
+ self.station.set_m_sta_hfid(m_sta_hfid)
+ self.assertEqual(self.station.get_m_sta_hfid(), m_sta_hfid)
+
+ def test_set_u_sta_hfid(self):
+ u_sta_hfid = "This is the user sta hfid"
+ self.station.set_u_sta_hfid(u_sta_hfid)
+ self.assertEqual(self.station.get_u_sta_hfid(), u_sta_hfid)
+
+ def test_set_avln_hfid(self):
+ avln_hfid = "This is the avln hfid"
+ self.station.set_avln_hfid(avln_hfid)
+ self.assertEqual(self.station.get_avln_hfid(), avln_hfid)
+
def test_set_sl(self):
- sl = 0x12
+ sl = 0x02
self.station.set_sl(sl)
self.assertEqual(self.station.get_sl(), sl)
+ def test_set_tonemask(self):
+ tonemask = 85,139,167,214,225,282,302,409,419,569,591,736,748,856,882,1015,1027,1143,1535
+ self.station.set_tonemask(tonemask)
+ self.assertEqual(self.station.get_tonemask()[0 / 8], pack('B', 0)) # 74
+ self.assertEqual(self.station.get_tonemask()[11 / 8], pack('B', 0xf0)) # 85
+ self.assertEqual(self.station.get_tonemask()[345 / 8], pack('B', 0xfc)) # 419
+ self.assertEqual(self.station.get_tonemask()[495 / 8], pack('B', 0xff)) # 569
+ self.assertEqual(self.station.get_tonemask()[782 / 8], pack('B', 0x7f)) # 856
+ self.assertEqual(self.station.get_tonemask()[1069 / 8], pack('B', 0x3f)) # 1143
+ self.assertEqual(self.station.get_tonemask()[1070 / 8], pack('B', 0x3f)) # 1144
+ self.assertEqual(self.station.get_tonemask()[1155 / 8], pack('B', 0x00)) # 1229
+
def test_remove(self):
self.station.remove()
diff --git a/maximus/python/test/test_utils.py b/maximus/python/test/test_utils.py
index b4d41fca0a..0e13c17199 100644
--- a/maximus/python/test/test_utils.py
+++ b/maximus/python/test/test_utils.py
@@ -46,6 +46,7 @@ import doctest
doctest.testmod(converter)
doctest.testmod(crc)
doctest.testmod(exception)
+doctest.testmod(format)
# UNIT TEST
@@ -85,6 +86,7 @@ try:
suite.addTest(doctest.DocTestSuite(converter))
suite.addTest(doctest.DocTestSuite(crc))
suite.addTest(doctest.DocTestSuite(exception))
+ suite.addTest(doctest.DocTestSuite(format))
except ValueError:
print "has no tests"
diff --git a/maximus/stationtest/Makefile b/maximus/stationtest/Makefile
index 2dbc46668a..1784c2cced 100644
--- a/maximus/stationtest/Makefile
+++ b/maximus/stationtest/Makefile
@@ -32,7 +32,8 @@ test_cb_MODULES = lib host hal/phy/maximus
test_ether_SOURCES = test_ether.c
test_ether_MODULES = lib host hal/hle/maximus
+cp_beacon_MODULE_SOURCES = beacon.c
test_station_SOURCES = test_station.c
-test_station_MODULES = lib host cp/station/maximus
+test_station_MODULES = lib host hal/hle/maximus cp/station/maximus
include $(BASE)/common/make/top.mk
diff --git a/maximus/stationtest/src/test_station.c b/maximus/stationtest/src/test_station.c
index c529057e79..3e8e8c4003 100644
--- a/maximus/stationtest/src/test_station.c
+++ b/maximus/stationtest/src/test_station.c
@@ -14,8 +14,49 @@
#include "common/std.h"
#include "host/station.h" // for 'station_ctx_t'
#include "cp/station/maximus/inc/maximus_cp_station.h" // for 'maximus_cp_station_init()'
+#include "hal/hle/ipmbox.h"
+#include "hal/hle/defs.h" // for 'HLE_MSG_TYPE_...'
+#include "hal/hle/maximus/inc/maximus_ipmbox_ctx.h" // for 'ipmbox_t'
+#include "hal/hle/maximus/inc/maximus_interrupts.h" // for 'HAL_HLE_INTERRUPT_IPMBOX'
+#include "hal/hle/maximus/inc/maximus_ether.h" // for 'maximus_ether_send()'
+#include "maximus/common/types/ethernet_types.h" // for 'ETHERNET_TYPE_...'
extern station_ctx_t my_station;
+ipmbox_t * ctx;
+int user_data = 123;
+
+void ipmbox_rx_cb (void *user_data, u32 *first_msg, uint length)
+{
+ // Reset IT
+ maximus_pending_isrs &= (0 << HAL_HLE_INTERRUPT_IPMBOX);
+
+ ipmbox_msg_hdr_t *hdr = (ipmbox_msg_hdr_t *)&ctx->rx.mailbox[0];
+ if (HLE_MSG_TYPE_DATA == hdr->type)
+ {
+ /* When receiving an Ether SCI message of type MME REQ from Maximus,
+ * send the answer (an Ether SCI message of type MME CNF). */
+
+ uint data_length = (uint)(hdr->param >> 1);
+ memcpy(ctx->first_buffer->next->data, (u32 *)ctx->rx.mailbox[1], data_length);
+ char *data = (char *)ctx->first_buffer->next->data;
+ *(data + 19) = *(data + 19) + 1; // REQ => CNF
+ *(data + 23) = 0x01; // Success
+ memset(ctx->first_buffer->next->data + 23, '\0', data_length - 23);
+
+ hdr->param |= 0x001;
+ ctx->rx.mailbox[1] = (u32)ctx->first_buffer->next->data;
+ ipmbox_tx (ctx, ctx->rx.mailbox, ctx->rx.length);
+ }
+ else if (HLE_MSG_TYPE_BUFFER_ADD == hdr->type)
+ {
+ /* Receive an Ether SCI message of type MME_BUFFER_ADD from Maximus. */
+
+ hdr->type = HLE_MSG_TYPE_SEND_DONE;
+ ipmbox_tx (ctx, ctx->rx.mailbox, 2); // do nothing
+ }
+
+ return;
+}
int main(void)
{
@@ -25,5 +66,11 @@ int main(void)
maximus_cp_station_init(&my_station);
+ // Initialize the HAL HLE ipmbox
+ ctx = ipmbox_init ((void *)&user_data, &ipmbox_rx_cb);
+
+ // Enable assertions on warnings
+ ctx->warning_assert = true;
+
return 0;
}
diff --git a/maximus/test/test.sh b/maximus/test/test.sh
index 39f8d846fd..f6bd133d80 100755
--- a/maximus/test/test.sh
+++ b/maximus/test/test.sh
@@ -32,7 +32,7 @@ obj/usertest -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
# Maximus Python unitary tests
cd $WORKSPACE/maximus/python
make clean; make
-python test/test_channel.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+python test/test_channel.py -e ../stationtest/obj/test_station.elf -d false -t 2500000000
python test/test_cli.py
python test/test_ethernet.py -e ../stationtest/obj/test_ether.elf -d false -t 2500000000
python test/test_fsm.py
@@ -69,7 +69,7 @@ make clean; make
obj/Maximus_sar -e obj/Sta_sar.elf -d false -t 2500000000
# CE tests
-cd $WORKSPACE/ce/test/maximus
+cd $WORKSPACE/ce/test/rx/maximus
make clean; make
python send_noise.py -e obj/test_rx.elf -d false -t 2500000000
@@ -78,10 +78,13 @@ cd $WORKSPACE/test_general/integration/cl-sar-pbproc
make clean; make
python src/Maximus.py -e obj/cl-sar-pbproc.elf -d false -t 2500000000
-# SAR - PB proc tests
-cd $WORKSPACE/test_general/integration/sar-pbproc
+# CP BEACON - DP tests
+cd $WORKSPACE/test_general/integration/cp_beacon-dp
make clean; make
-python src/Maximus.py -e obj/sar-pbproc.elf -d false -t 2500000000
+python src/max.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_cco.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_cco_send.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_ucco.py -e obj/cp_beacon.elf -d false -t 2500000000
# HLE - CL - SAR - PB proc tests
cd $WORKSPACE/test_general/integration/hle-cl-sar-pbproc
@@ -93,3 +96,8 @@ cd $WORKSPACE/test_general/integration/ipmbox-hle-cl-sar-pbproc
make clean; make
python src/Maximus.py -e obj/ipmbox-hle-cl-sar-pbproc.elf -d false -t 2500000000
python src/Maximus_mme.py -e obj/ipmbox-hle-cl-sar-pbproc.elf -d false -t 2500000000
+
+# SAR - PB proc tests
+cd $WORKSPACE/test_general/integration/sar-pbproc
+make clean; make
+python src/Maximus.py -e obj/sar-pbproc.elf -d false -t 2500000000
diff --git a/maximus/test/test_python.sh b/maximus/test/test_python.sh
new file mode 100755
index 0000000000..6d5046dc3e
--- /dev/null
+++ b/maximus/test/test_python.sh
@@ -0,0 +1,69 @@
+#! /bin/bash
+
+WORKSPACE='/home/buret/workspace/maximus'
+
+cd $WORKSPACE/maximus/stationtest/
+make clean; make
+
+# Maximus Python unitary tests
+cd $WORKSPACE/maximus/python
+make clean; make
+python test/test_channel.py -e ../stationtest/obj/test_station.elf -d false -t 2500000000
+python test/test_cli.py
+python test/test_ethernet.py -e ../stationtest/obj/test_ether.elf -d false -t 2500000000
+python test/test_fsm.py
+python test/test_interface.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+python test/test_macframe.py -e ../stationtest/obj/test_send.elf -d false -t 2500000000
+python test/test_mme.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+python test/test_result.py
+python test/test_simu.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+python test/test_station.py -e ../stationtest/obj/test_station.elf -d false -t 2500000000
+python test/test_utils.py
+#python test/test_maximus.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+
+# Maximus Python scripts tests
+python py/script_example.py -e ../stationtest/obj/stationtest.elf -d false -t 2500000000
+python py/test_cb.py -e ../stationtest/obj/test_cb.elf -d false -t 2500000000
+python py/test_send_mpdu.py -e ../stationtest/obj/test_send.elf -d false -t 2500000000
+python py/test_send_noise.py -e ../stationtest/obj/test_send.elf -d false -t 2500000000
+python py/test_tx_rx.py -e ../stationtest/obj/test_tx_rx.elf -d false -t 2500000000
+python py/test_ether.py -e ../stationtest/obj/test_ether.elf -d false -t 2500000000
+
+# PB proc tests
+cd $WORKSPACE/mac/pbproc/test/maximus
+make clean; make
+python py/host_test_pbproc.py -e obj/test_pbproc.elf -d false -t 2500000000
+
+# CE tests
+cd $WORKSPACE/ce/test/rx/maximus
+make clean; make
+python send_noise.py -e obj/test_rx.elf -d false -t 2500000000
+
+# CL - SAR - PB proc tests
+cd $WORKSPACE/test_general/integration/cl-sar-pbproc
+make clean; make
+python src/Maximus.py -e obj/cl-sar-pbproc.elf -d false -t 2500000000
+
+# CP BEACON - DP tests
+cd $WORKSPACE/test_general/integration/cp_beacon-dp
+make clean; make
+python src/max.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_cco.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_cco_send.py -e obj/cp_beacon.elf -d false -t 2500000000
+python src/max_ucco.py -e obj/cp_beacon.elf -d false -t 2500000000
+
+# HLE - CL - SAR - PB proc tests
+cd $WORKSPACE/test_general/integration/hle-cl-sar-pbproc
+make clean; make
+python src/Maximus.py -e obj/hle-cl-sar-pbproc.elf -d false -t 2500000000
+
+# IPMBOX - HLE - CL - SAR - PB proc tests
+cd $WORKSPACE/test_general/integration/ipmbox-hle-cl-sar-pbproc
+make clean; make
+python src/Maximus.py -e obj/ipmbox-hle-cl-sar-pbproc.elf -d false -t 2500000000
+python src/Maximus_mme.py -e obj/ipmbox-hle-cl-sar-pbproc.elf -d false -t 2500000000
+
+# SAR - PB proc tests
+cd $WORKSPACE/test_general/integration/sar-pbproc
+make clean; make
+python src/Maximus.py -e obj/sar-pbproc.elf -d false -t 2500000000