summaryrefslogtreecommitdiff
path: root/maximus
diff options
context:
space:
mode:
authorburet2008-03-19 12:52:50 +0000
committerburet2008-03-19 12:52:50 +0000
commit0e953e36003fc64546da9faf29ca6e2913b9cd72 (patch)
tree1a4d1055b642b250ed994cc561fffe4ba672b801 /maximus
parentd404088cd12a636da4f04ce45bd4dd21feb76755 (diff)
Maximus V2: VLAN Tag now is an optional field of Ethernet frames and MMEs.
git-svn-id: svn+ssh://pessac/svn/cesar/trunk@1622 017c9cb6-072f-447c-8318-d5b54f68fe89
Diffstat (limited to 'maximus')
-rwxr-xr-xmaximus/python/doc/fulminata_maximus_scenario_engine.odtbin345658 -> 345949 bytes
-rw-r--r--maximus/python/maximus/ethernet/eth.py51
-rw-r--r--maximus/python/maximus/mme/mme.py23
-rw-r--r--maximus/python/maximus/mme/mmentry.py4
-rw-r--r--maximus/python/maximus/mme/mmheader.py40
-rw-r--r--maximus/python/maximus/station/sta.py4
-rw-r--r--maximus/python/maximus/utils/format.py8
-rw-r--r--maximus/python/py/test_ether.py32
-rw-r--r--maximus/python/py/test_send_mpdu.py2
-rw-r--r--maximus/python/py/test_tx_rx.py4
-rw-r--r--maximus/python/test/test_ethernet.py62
-rw-r--r--maximus/python/test/test_mme.py73
-rw-r--r--maximus/stationtest/src/test_station.c6
13 files changed, 216 insertions, 93 deletions
diff --git a/maximus/python/doc/fulminata_maximus_scenario_engine.odt b/maximus/python/doc/fulminata_maximus_scenario_engine.odt
index e0823a56c8..e5f3c0328c 100755
--- a/maximus/python/doc/fulminata_maximus_scenario_engine.odt
+++ b/maximus/python/doc/fulminata_maximus_scenario_engine.odt
Binary files differ
diff --git a/maximus/python/maximus/ethernet/eth.py b/maximus/python/maximus/ethernet/eth.py
index f9bfb4213c..0c64558652 100644
--- a/maximus/python/maximus/ethernet/eth.py
+++ b/maximus/python/maximus/ethernet/eth.py
@@ -4,7 +4,7 @@
from maximus.ethernet.scapy import *
from maximus.macframe.msdu import MIN_SIZE_OF_MSDU, MSDU_TYPES, MSDU
-from maximus.utils.exception import Error
+from maximus.utils.exception import Error, OutOfRangeError
from maximus.utils.format import *
from struct import pack, unpack
@@ -12,31 +12,29 @@ from struct import pack, unpack
SIZE_OF_DST = SIZE_OF_U48 # in octets
SIZE_OF_SRC = SIZE_OF_DST # in octets
SIZE_OF_VLANTAG = SIZE_OF_U32 # in octets
+MIN_VALUE_OF_VLANTAG = 0x81000000
+MAX_VALUE_OF_VLANTAG = 0x8100FFFF
SIZE_OF_TYPE = SIZE_OF_U16 # in octets
-SIZE_OF_HEADER = SIZE_OF_DST + SIZE_OF_SRC + SIZE_OF_VLANTAG + SIZE_OF_TYPE # in octets
+MIN_SIZE_OF_HEADER = SIZE_OF_DST + SIZE_OF_SRC + SIZE_OF_TYPE # in octets
+MAX_SIZE_OF_HEADER = MIN_SIZE_OF_HEADER + SIZE_OF_VLANTAG # in octets
-# Constants for 'struct.pack()' and 'struct.unpack()' functions
-little_uint16_t = '<H' # little-endian
-little_uint32_t = '<I' # little-endian
-big_uint16_t = '>H' # big-endian
-big_uint32_t = '>I' # big-endian
-
-class VLANTag(Field):
- def __init__(self, name):
- Field.__init__(self, name, 0x00000000, 'I')
+#class VLANTag(Field):
+# def __init__(self, name):
+# Field.__init__(self, name, 0x00000000, 'I')
class Eth(MSDU, Packet):
name = "Eth"
aliastypes = [ Ether ]
- fields_desc = [DestMACField("dst"), SourceMACField("src"), VLANTag("vlantag"), XShortEnumField("type", 0x0000, ETHER_TYPES)]
+ #fields_desc = [DestMACField("dst"), SourceMACField("src"), VLANTag("vlantag"), XShortEnumField("type", 0x0000, ETHER_TYPES)]
+ fields_desc = [DestMACField("dst"), SourceMACField("src"), XShortEnumField("type", 0x0000, ETHER_TYPES)]
def set_msdu_attr(self, payload):
"""Set the Eth attributes from the received Ethernet Frame.
"""
- if len(payload) < SIZE_OF_HEADER:
+ if len(payload) < MIN_SIZE_OF_HEADER:
raise Error('Ethernet Frame length')
- # Set Eth Header: dst, src, vlantag, type
+ # Set Eth Header: dst, src, vlantag (optional), type
#
# Set dst
@@ -61,10 +59,13 @@ class Eth(MSDU, Packet):
src += ':'
self.src = src
- # Set vlantag
+ # Check vlantag
begin = end
- end += SIZE_OF_VLANTAG
- self.vlantag = ntoh32(payload[begin:end])
+ if ntoh32(payload[begin:end + SIZE_OF_VLANTAG]) <= MAX_VALUE_OF_VLANTAG\
+ and ntoh32(payload[begin:end + SIZE_OF_VLANTAG]) >= MIN_VALUE_OF_VLANTAG:
+ # Set vlantag
+ end += SIZE_OF_VLANTAG
+ self.vlantag = ntoh32(payload[begin:end])
# Set type
begin = end
@@ -77,7 +78,21 @@ class Eth(MSDU, Packet):
def get(self):
"""This function returns the Ethernet frame into a string.
"""
- return str(self) + self.pad(MIN_SIZE_OF_MSDU - len(str(self)))
+ eth = str(self)
+
+ # Check vlantag
+ try:
+ if type(self.vlantag) is int or type(self.vlantag) is long:
+ if self.vlantag <= MAX_VALUE_OF_VLANTAG and self.vlantag >= MIN_VALUE_OF_VLANTAG:
+ eth = eth[:SIZE_OF_DST + SIZE_OF_SRC] + hton32(self.vlantag) + eth[SIZE_OF_DST + SIZE_OF_SRC:]
+ else:
+ raise OutOfRangeError("vlantag")
+ else:
+ raise TypeError("vlantag")
+ except AttributeError:
+ pass
+
+ return eth + self.pad(MIN_SIZE_OF_MSDU - len(eth))
def get_ether_type(self):
"""This function returns the Ethernet Frame object type into a Python integer (uint8_t).
diff --git a/maximus/python/maximus/mme/mme.py b/maximus/python/maximus/mme/mme.py
index 792fa7dc32..b4dcdb5326 100644
--- a/maximus/python/maximus/mme/mme.py
+++ b/maximus/python/maximus/mme/mme.py
@@ -3,9 +3,12 @@
#print __name__
from maximus.mme.mmentry import MAX_SIZE_OF_MMENTRY, MMEntry
-from maximus.mme.mmheader import SIZE_OF_MMHEADER, MMHeader
+from maximus.mme.mmheader import MMHeader, MIN_SIZE_OF_MMHEADER, MAX_SIZE_OF_MMHEADER,\
+ SIZE_OF_ODA, SIZE_OF_OSA,\
+ SIZE_OF_VLANTAG, MIN_VALUE_OF_VLANTAG, MAX_VALUE_OF_VLANTAG
from maximus.macframe.msdu import MIN_SIZE_OF_MSDU, MSDU_TYPES, MSDU
from maximus.utils.exception import OutOfRangeError
+from maximus.utils.format import ntoh32
# Define an empty MME structure for Maximus
class MMEStruct:
@@ -31,15 +34,15 @@ class MME(MSDU):
def set_mmheader(self, mmheader):
"""Set the MM Header.
The MM Header can be an MM Header object,
- or a Python string of length equals to 23 octets.
+ or a Python string of length equals to 19 or 23 octets.
"""
self.__mmheader = mmheader
try:
# MM Header is an MM Header object
self.__mme.header = mmheader.get() # set MM Header into a Python string of length equals to 23 octets
except AttributeError:
- if len(mmheader) == SIZE_OF_MMHEADER:
- # MM Header is a Python string of length equals to 23 octets
+ if len(mmheader) == MIN_SIZE_OF_MMHEADER or len(mmheader) == MAX_SIZE_OF_MMHEADER:
+ # MM Header is a Python string of length equals to 19 or 23 octets
self.__mme.header = mmheader
else:
raise OutOfRangeError("MM Header")
@@ -63,9 +66,19 @@ class MME(MSDU):
def set_msdu_attr(self, payload):
"""Set the MME attributes from the received Ethernet Frame.
"""
+ # Check VLAN Tag
+ begin = SIZE_OF_ODA + SIZE_OF_OSA
+ end = begin + SIZE_OF_VLANTAG
+ if ntoh32(payload[begin:end]) <= MAX_VALUE_OF_VLANTAG\
+ and ntoh32(payload[begin:end]) >= MIN_VALUE_OF_VLANTAG:
+ # There is a VLAN Tag
+ end = MAX_SIZE_OF_MMHEADER
+ else:
+ # There is no VLAN Tag
+ end = MIN_SIZE_OF_MMHEADER
+
# Set MM Header
begin = 0
- end = SIZE_OF_MMHEADER
self.set_mmheader(payload[begin:end])
# Set MM Entry
diff --git a/maximus/python/maximus/mme/mmentry.py b/maximus/python/maximus/mme/mmentry.py
index d49bbf9da7..7d69dd0817 100644
--- a/maximus/python/maximus/mme/mmentry.py
+++ b/maximus/python/maximus/mme/mmentry.py
@@ -2,12 +2,12 @@
#print __name__
-from maximus.mme.mmheader import SIZE_OF_MMHEADER
+from maximus.mme.mmheader import MIN_SIZE_OF_MMHEADER
from maximus.macframe.msdu import MAX_SIZE_OF_MSDU
from maximus.utils.exception import OutOfRangeError
# Constants to check arguments validity
-MAX_SIZE_OF_MMENTRY = MAX_SIZE_OF_MSDU - SIZE_OF_MMHEADER
+MAX_SIZE_OF_MMENTRY = MAX_SIZE_OF_MSDU - MIN_SIZE_OF_MMHEADER
MAX_SIZE_OF_DATA = MAX_SIZE_OF_MMENTRY
class MMEntry:
diff --git a/maximus/python/maximus/mme/mmheader.py b/maximus/python/maximus/mme/mmheader.py
index 15d185c434..01588973c0 100644
--- a/maximus/python/maximus/mme/mmheader.py
+++ b/maximus/python/maximus/mme/mmheader.py
@@ -6,15 +6,17 @@ from maximus.utils.exception import OutOfRangeError
from maximus.utils.format import *
# Constants to check arguments validity
-SIZE_OF_MMHEADER = 23 # in octets
+MIN_SIZE_OF_MMHEADER = 19 # in octets
+MAX_SIZE_OF_MMHEADER = 23 # in octets
SIZE_OF_ODA = SIZE_OF_U48
MAX_VALUE_OF_ODA = MAX_VALUE_OF_U48
SIZE_OF_OSA = SIZE_OF_ODA
MAX_VALUE_OF_OSA = MAX_VALUE_OF_ODA
-SIZE_OF_VLANTAG = SIZE_OF_U32
-MAX_VALUE_OF_VLANTAG = MAX_VALUE_OF_U32
+MIN_VALUE_OF_VLANTAG = 0x81000000
+MAX_VALUE_OF_VLANTAG = 0x8100FFFF
SIZE_OF_MTYPE = SIZE_OF_U16
MAX_VALUE_OF_MTYPE = MAX_VALUE_OF_U16
+SIZE_OF_VLANTAG = SIZE_OF_U32
SIZE_OF_MMV = SIZE_OF_U8
MAX_VALUE_OF_MMV = MAX_VALUE_OF_U8
SIZE_OF_MMTYPE = SIZE_OF_U16
@@ -27,7 +29,7 @@ class MMHeaderStruct:
"""The MME structure is composed of the 7 following fields:
oda - a Python string of length equals to 6 octets
osa - a Python string of length equals to 6 octets
- vlantag - a Python string of length equals to 4 octets
+ vlantag (optional) - a Python string of length equals to 4 octets
mtype - a Python string of length equals to 2 octets
mmv - a Python string of length equals to 1 octet
mmtype - a Python string of length equals to 2 octets
@@ -37,7 +39,7 @@ class MMHeaderStruct:
class MMHeader:
- def __init__(self, ODA=MAX_VALUE_OF_ODA, OSA=0, VLANTag=0, MTYPE=0x88e1, MMV=0, MMTYPE=0, FMI=0):
+ def __init__(self, ODA=MAX_VALUE_OF_ODA, OSA=0, VLANTag=None, MTYPE=0x88e1, MMV=0, MMTYPE=0, FMI=0):
# Create an MME structure for Maximus
self.__mmheader = MMHeaderStruct()
@@ -121,23 +123,30 @@ class MMHeader:
raise TypeError("OSA")
def set_vlantag(self, vlantag):
- """Set the VLAN Tag.
- The VLAN Tag can be a Python integer (decimal or hexadecimal value),
+ """Set the VLAN Tag (VLAN Tag is optional).
+ The VLAN Tag can be a Python long (decimal or hexadecimal value),
or a Python string of length equals to 4 octets.
"""
self.__vlantag = vlantag
- try:
+ if type(vlantag) is str:
if len(vlantag) == SIZE_OF_VLANTAG:
# VLAN Tag is a Python string of length equals to 4 octets
- self.__mmheader.vlantag = vlantag
+ if ntoh32(vlantag) <= MAX_VALUE_OF_VLANTAG and ntoh32(vlantag) >= MIN_VALUE_OF_VLANTAG:
+ self.__mmheader.vlantag = vlantag
+ else:
+ raise OutOfRangeError("VLAN Tag")
else:
raise OutOfRangeError("VLAN Tag")
- except TypeError:
- # VLAN Tag is a Python integer
- if vlantag <= MAX_VALUE_OF_VLANTAG and vlantag >= 0:
+ elif type(vlantag) is int or type(vlantag) is long:
+ # VLAN Tag is a Python long
+ if vlantag <= MAX_VALUE_OF_VLANTAG and vlantag >= MIN_VALUE_OF_VLANTAG:
self.__mmheader.vlantag = hton32(vlantag)
else:
raise OutOfRangeError("VLAN Tag")
+ elif vlantag is None:
+ self.__mmheader.vlantag = ''
+ else:
+ raise TypeError("VLAN Tag")
def set_mtype(self, mtype):
"""Set the MTYPE.
@@ -235,8 +244,8 @@ class MMHeader:
return self.__osa
def get_vlantag(self):
- """Get the VLAN Tag.
- The VLAN Tag can be a Python integer,
+ """Get the VLAN Tag (VLAN Tag is optional).
+ The VLAN Tag can be a Python long,
or a Python string of length equals to 4 octets.
"""
return self.__vlantag
@@ -281,7 +290,8 @@ class MMHeader:
print "display the MM Header"
print "ODA =", hex(ntoh48(self.__mmheader.oda))
print "OSA =", hex(ntoh48(self.__mmheader.oda))
- print "VLAN Tag =", hex(ntoh16(self.__mmheader.vlantag))
+ if self.get_vlantag() is not None:
+ print "VLAN Tag =", hex(ntoh16(self.__mmheader.vlantag))
print "MTYPE =", hex(ntoh16(self.__mmheader.mtype))
print "MMV =", hex(hptoh8(self.__mmheader.mmv))
print "MMTYPE =", hex(hptoh16(self.__mmheader.mmtype))
diff --git a/maximus/python/maximus/station/sta.py b/maximus/python/maximus/station/sta.py
index 95d3942ecd..ecd49ba425 100644
--- a/maximus/python/maximus/station/sta.py
+++ b/maximus/python/maximus/station/sta.py
@@ -6,7 +6,7 @@ from maximus.station.config import *
from maximus.ethernet.buffer import alloc_data_buffer, alloc_mme_buffer, alloc_interface_buffer
from maximus.macframe.msdu import MSDU_TYPES
from maximus.mme import *
-from maximus.mme.mmheader import SIZE_OF_MMHEADER, SIZE_OF_MMTYPE, SIZE_OF_FMI
+from maximus.mme.mmheader import MIN_SIZE_OF_MMHEADER, SIZE_OF_MMTYPE, SIZE_OF_FMI
from maximus.utils.exception import Error, OutOfRangeError
from maximus.utils.format import *
@@ -1016,7 +1016,7 @@ class STA:
def __check_cnf(self, rsp, name):
"""Check the MME response validity.
"""
- mmtype = hptoh16(rsp[0].get_mmheader()[SIZE_OF_MMHEADER - SIZE_OF_FMI - SIZE_OF_MMTYPE:SIZE_OF_MMHEADER - SIZE_OF_FMI])
+ mmtype = hptoh16(rsp[0].get_mmheader()[MIN_SIZE_OF_MMHEADER - SIZE_OF_FMI - SIZE_OF_MMTYPE:MIN_SIZE_OF_MMHEADER - SIZE_OF_FMI])
if mmtype != name + MME_TYPES['CNF']:
raise Error("Received unexpected message of type " + hex(mmtype))
result = hptoh8(rsp[0].get_mmentry()[0])
diff --git a/maximus/python/maximus/utils/format.py b/maximus/python/maximus/utils/format.py
index d55d9a5b44..5285a23acd 100644
--- a/maximus/python/maximus/utils/format.py
+++ b/maximus/python/maximus/utils/format.py
@@ -53,6 +53,14 @@ SIZE_OF_U32 = calcsize(u32)
SIZE_OF_U48 = SIZE_OF_U32 + SIZE_OF_U16
SIZE_OF_U64 = calcsize(u64)
+# In bits
+size_of_u8 = 8
+size_of_u16 = 16
+size_of_u24 = 24
+size_of_u32 = 32
+size_of_u48 = 48
+size_of_u64 = 64
+
MAX_VALUE_OF_U8 = 0xFF
MAX_VALUE_OF_U16 = 0xFFFF
MAX_VALUE_OF_U24 = 0xFFFFFF
diff --git a/maximus/python/py/test_ether.py b/maximus/python/py/test_ether.py
index 00c2ca4081..c704bb97b1 100644
--- a/maximus/python/py/test_ether.py
+++ b/maximus/python/py/test_ether.py
@@ -9,8 +9,8 @@ import startup
from interface import *
from maximus import *
-from maximus.ethernet.eth import SIZE_OF_HEADER
-from maximus.mme.mmheader import SIZE_OF_MMHEADER
+from maximus.ethernet.eth import MAX_SIZE_OF_HEADER
+from maximus.mme.mmheader import MAX_SIZE_OF_MMHEADER
from maximus.macframe.msdu import MIN_SIZE_OF_MSDU
from maximus.simu.rx import * # for 'recv()' function
from struct import pack
@@ -39,11 +39,11 @@ fcall.send(sta1)
# Ethernet Frame config
dst = '41:42:43:44:45:46'
src = '47:48:49:4a:4b:4c'
-vlantag = 0x4D4E4F50
-type = 0x5152
+vlantag = 0x81004D4E
+type = 0x4F50
# MME config
-mmheader = 'MMHeader has 23 octets!'
+mmheader = 'MMHeader 19 octets!'
mmentry = '--- This is the Management Message Entry ---'
# Send an Ethernet Frame asynchronously
@@ -72,17 +72,17 @@ if rsp1[i].vlantag != vlantag:
if rsp1[i].type != type:
print "type1 =", hex(rsp1[i].type)
raise Error("type1")
-if str(rsp1[i].payload) != msdu1 + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(msdu1)) * pack('B', 0):
+if str(rsp1[i].payload) != msdu1 + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(msdu1)) * pack('B', 0):
print len(str(rsp1[i].payload))
print "msdu1 =", str(rsp1[i].payload)
raise Error("msdu1")
# Test the 2nd received MSDU (MME) object
i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
-if rsp1[i].get_mmheader() != 'ABCDEFGHIJKLMNOPQRThis ':
+if rsp1[i].get_mmheader() != 'ABCDEFGHIJKL' + pack('!I', vlantag) + 'OPThis ':
print "mmheader1 =", rsp1[i].get_mmheader()
raise Error("mmheader1")
-if rsp1[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_MMHEADER - 23) * pack('B', 0):
+if rsp1[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_MMHEADER - 23) * pack('B', 0):
print "mmentry1 =", rsp1[i].get_mmentry()
raise Error("mmentry1")
@@ -113,16 +113,16 @@ if rsp2[i].vlantag != vlantag:
if rsp2[i].type != type:
print "type2 =", hex(rsp2[i].type)
raise Error("type2")
-if str(rsp2[i].payload) != msdu2 + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(msdu2)) * pack('B', 0):
+if str(rsp2[i].payload) != msdu2 + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(msdu2)) * pack('B', 0):
print "msdu2 =", str(rsp2[i].payload)
raise Error("msdu2")
# Test the 2nd received MSDU (MME) object
i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
-if rsp2[i].get_mmheader() != 'ABCDEFGHIJKLMNOPQRThis ':
+if rsp2[i].get_mmheader() != 'ABCDEFGHIJKL' + pack('!I', vlantag) + 'OPThis ':
print "mmheader2 =", rsp2[i].get_mmheader()
raise Error("mmheader2")
-if rsp2[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_MMHEADER - 23) * pack('B', 0):
+if rsp2[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_MMHEADER - 23) * pack('B', 0):
print "mmentry2 =", rsp2[i].get_mmentry()
raise Error("mmentry2")
@@ -139,12 +139,9 @@ i = 0
if rsp3[i].dst.lower() != '4d:4d:48:65:61:64': # 'MMHead'
print "dst3 =", rsp3[i].dst.lower()
raise Error("dst3")
-if rsp3[i].src.lower() != '65:72:20:68:61:73': # 'er has'
+if rsp3[i].src.lower() != '65:72:20:31:39:20': # 'er 19 '
print "src3 =", rsp3[i].src.lower()
raise Error("src3")
-if rsp3[i].vlantag != 0x20323320: # ' 23 '
- print "vlantag3 =", hex(rsp3[i].vlantag)
- raise Error("vlantag3")
if rsp3[i].type != 0x6F63: # 'oc'
print "type3 =", hex(rsp3[i].type)
raise Error("type3")
@@ -173,12 +170,9 @@ i = 0
if rsp4[i].dst.lower() != '4d:4d:48:65:61:64': # 'MMHead'
print "dst4 =", rsp4[i].dst.lower()
raise Error("dst4")
-if rsp4[i].src.lower() != '65:72:20:68:61:73': # 'er has'
+if rsp4[i].src.lower() != '65:72:20:31:39:20': # 'er 19 '
print "src4 =", rsp4[i].src.lower()
raise Error("src4")
-if rsp4[i].vlantag != 0x20323320: # ' 23 '
- print "vlantag4 =", hex(rsp4[i].vlantag)
- raise Error("vlantag4")
if rsp4[i].type != 0x6F63: # 'oc'
print "type4 =", hex(rsp4[i].type)
raise Error("type4")
diff --git a/maximus/python/py/test_send_mpdu.py b/maximus/python/py/test_send_mpdu.py
index 7e19ed5168..7397fb7b0b 100644
--- a/maximus/python/py/test_send_mpdu.py
+++ b/maximus/python/py/test_send_mpdu.py
@@ -137,7 +137,7 @@ macFrame5.send(maximus)
send_fcall2(sta)
macFrame6 = MACFrame(MACFrameHeader='H!', ICV='ICV!')
macFrame6.set_fc_av(pack('IIII', 123, 456, 789, 10))
-macFrame6.set_msdu(mme.MME(MMHeader='MMHeader has 23 octets!', MMEntry='--- This is the Management Message Entry ---'))
+macFrame6.set_msdu(mme.MME(MMHeader='MMHeader 19 octets!', MMEntry='--- This is the Management Message Entry ---'))
macFrame6.send(maximus)
# Send an MPDU containing 236 PBs
diff --git a/maximus/python/py/test_tx_rx.py b/maximus/python/py/test_tx_rx.py
index b68c8d4d30..42c8c680ad 100644
--- a/maximus/python/py/test_tx_rx.py
+++ b/maximus/python/py/test_tx_rx.py
@@ -196,7 +196,7 @@ icv5 = 'ICV!'
macFrame5 = MACFrame(ICV=icv5)
fc_av_5 = fc_av_1
macFrame5.set_fc_av(fc_av_5)
-mmheader5 = 'MMHeader has 23 octets!'
+mmheader5 = 'MMHeader 19 octets!'
mmentry5 = '--- This is the Management Message Entry ---'
msdu5 = MME(MMHeader=mmheader5, MMEntry=mmentry5)
macFrame5.set_msdu(msdu5)
@@ -205,7 +205,7 @@ rsp5 = macFrame5.sendnrecv(m)
if rsp5[0].get_fc_av() != fc_av_5:
print "fc_av_5 =", rsp5[0].get_fc_av()
raise Error("fc_av_5")
-if rsp5[0].get_pblist()[0] != MACFrameHeader(MFL=len(mmheader5)+len(mmentry5)-1).get() + mmheader5 + mmentry5 + macFrame5.get_icv() + get_pad(55):
+if rsp5[0].get_pblist()[0] != MACFrameHeader(MFL=len(mmheader5)+len(mmentry5)-1).get() + mmheader5 + mmentry5 + macFrame5.get_icv() + get_pad(59):
print "msdu5 =", rsp5[0].get_pblist()[0]
raise Error("msdu5")
#if rsp5[0].get_icv() != icv5:
diff --git a/maximus/python/test/test_ethernet.py b/maximus/python/test/test_ethernet.py
index aba2f62a2b..d55c7e5587 100644
--- a/maximus/python/test/test_ethernet.py
+++ b/maximus/python/test/test_ethernet.py
@@ -7,7 +7,7 @@ import sys, startup
from maximus.ethernet import *
from maximus.ethernet.buffer import Buffer
from maximus.ethernet.create import create_eth, create_buffer, create_sniffer
-from maximus.ethernet.eth import SIZE_OF_HEADER
+from maximus.ethernet.eth import MIN_SIZE_OF_HEADER, MAX_SIZE_OF_HEADER
from maximus.ethernet.sniffer import Sniffer
from maximus.macframe.msdu import MIN_SIZE_OF_MSDU
from maximus.simu.rx import * # for 'recv()' function
@@ -38,21 +38,25 @@ f = Eth()
# Create an Ethernet frame
f.dst = '41:42:43:44:45:46'
f.src = '47:48:49:4a:4b:4c'
-f.vlantag = 0x4D4E4F50
-f.type = 0x5152
+f.vlantag = 0x81004D4E
+f.type = 0x4F50
s = 'This is the Ethernet Payload'
f.payload = s
-if len(f) != SIZE_OF_HEADER + len(s):
+if len(f) != MIN_SIZE_OF_HEADER + len(s):
+ print "expected length = ", MIN_SIZE_OF_HEADER + len(s)
+ print "length = ", len(f)
raise Error('Ethernet frame payload length')
-if str(f) != 'ABCDEFGHIJKLMNOPQR' + str(f.payload):
- raise Error('Ethernet frame payload')
+if str(f) != 'ABCDEFGHIJKLOP' + str(f.payload):
+ print "expected payload =", 'ABCDEFGHIJKLOP' + str(f.payload)
+ print "payload = ", str(f)
+ raise Error('Ethernet frame payload')
# Create an Ethernet frame containing an IP frame
g = Eth(dst=f.dst, src=f.src)/IP()
-if len(g) != SIZE_OF_HEADER + len(str(IP())):
+if len(g) != MIN_SIZE_OF_HEADER + len(str(IP())):
raise Error('Ethernet IP frame payload length')
-h = unpack(38*'B', str(g))
-if h != (65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 0, 0, 0, 0, 8, 0, 69, 0, 0, 20, 0, 1, 0, 0, 64, 0, 124, 231, 127, 0, 0, 1, 127, 0, 0, 1):
+h = unpack(34*'B', str(g))
+if h != (65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 8, 0, 69, 0, 0, 20, 0, 1, 0, 0, 64, 0, 124, 231, 127, 0, 0, 1, 127, 0, 0, 1):
raise Error('Ethernet IP frame payload')
# Send the Ethernet frame asynchronously
@@ -172,12 +176,21 @@ class TestEthFunctions(unittest.TestCase):
pass
def test_set_msdu_attr(self):
- p = 'ABCDEFGHIJKLMNOPQRThis is the Ethernet Payload'
+ # Test without vlantag
+ p = 'ABCDEFGHIJKLMNThis is the Ethernet Payload'
self.eth.set_msdu_attr(p)
self.assertEqual(self.eth.dst.lower(), '41:42:43:44:45:46')
self.assertEqual(self.eth.src.lower(), '47:48:49:4a:4b:4c')
- self.assertEqual(self.eth.vlantag, 0x4D4E4F50)
- self.assertEqual(self.eth.type, 0x5152)
+ self.assertEqual(self.eth.type, 0x4D4E)
+ self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
+
+ # Test with vlantag
+ p = 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload'
+ self.eth.set_msdu_attr(p)
+ self.assertEqual(self.eth.dst.lower(), '41:42:43:44:45:46')
+ self.assertEqual(self.eth.src.lower(), '47:48:49:4a:4b:4c')
+ self.assertEqual(self.eth.vlantag, 0x81004D4E)
+ self.assertEqual(self.eth.type, 0x4F50)
self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
def test_sendnrecv(self):
@@ -191,7 +204,6 @@ class TestEthFunctions(unittest.TestCase):
# 1st received frame is the Ethernet frame
self.assertEqual(rsp[0].dst.lower(), 'ff:ff:ff:ff:ff:ff')
self.assertEqual(rsp[0].src.lower(), '')
- self.assertEqual(rsp[0].vlantag, 0)
self.assertEqual(rsp[0].type, 0)
# 2nd and 4th received frames are BUFFER RELEASED messages
@@ -213,13 +225,31 @@ class TestEthFunctions(unittest.TestCase):
sta.remove()
def test_get(self):
+ # Test without vlantag
self.eth.dst = '41:42:43:44:45:46'
self.eth.src = '47:48:49:4a:4b:4c'
- self.eth.vlantag = 0x4D4E4F50
- self.eth.type = 0x5152
+ self.eth.type = 0x4D4E
s = 'This is the Ethernet Payload'
self.eth.payload = s
- self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNOPQRThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - SIZE_OF_HEADER - len(s)) * pack('B', 0))
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - MIN_SIZE_OF_HEADER - len(s)) * pack('B', 0))
+
+ # Test with vlantag
+ self.eth.dst = '41:42:43:44:45:46'
+ self.eth.src = '47:48:49:4a:4b:4c'
+ self.eth.vlantag = 0x81004D4E
+ self.eth.type = 0x4F50
+ s = 'This is the Ethernet Payload'
+ self.eth.payload = s
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(s)) * pack('B', 0))
+
+ # Test with a bad vlantag
+ test = False
+ self.eth.vlantag = 0x4D4E4D4E
+ try:
+ self.eth.get()
+ except OutOfRangeError:
+ test = True
+ self.assert_(test)
def test_get_ether_type(self):
self.assertEqual(self.eth.get_ether_type(), 1)
diff --git a/maximus/python/test/test_mme.py b/maximus/python/test/test_mme.py
index 57cde75f36..f33c5615b8 100644
--- a/maximus/python/test/test_mme.py
+++ b/maximus/python/test/test_mme.py
@@ -6,6 +6,7 @@ import sys, startup
from maximus.mme import *
from maximus.mme.create import create_mme
+from maximus.utils.exception import OutOfRangeError
from interface import *
from struct import pack
@@ -81,16 +82,34 @@ class TestMMHeaderFunctions(unittest.TestCase):
def test_set_vlantag(self):
# Test with a Python integer
- vlantag = 0xFFFFFFFF
+ #
+ vlantag = 0x8100FFFF
self.mmheader.set_vlantag(vlantag)
self.assertEqual(self.mmheader.get_vlantag(), vlantag)
+
+ # Test with a bad vlantag
+ test = False
+ vlantag = 0xFFFFFFFF
+ try:
+ self.mmheader.set_vlantag(vlantag)
+ except OutOfRangeError:
+ test = True
+ self.assert_(test)
+
# Test with a Python string of length equals to 4 octets
- vlantag = '1234'
- self.mmheader.set_vlantag(vlantag)
- self.assertEqual(self.mmheader.get_vlantag(), vlantag)
- vlantag = pack('I', 0x11223344)
+ #
+ vlantag = pack('!I', 0x81000000)
self.mmheader.set_vlantag(vlantag)
self.assertEqual(self.mmheader.get_vlantag(), vlantag)
+
+ # Test with a bad vlantag
+ test = False
+ vlantag = pack('!I', 0x00000000)
+ try:
+ self.mmheader.set_vlantag(vlantag)
+ except OutOfRangeError:
+ test = True
+ self.assert_(test)
def test_set_mtype(self):
# Test with a Python integer
@@ -145,8 +164,13 @@ class TestMMHeaderFunctions(unittest.TestCase):
self.assertEqual(self.mmheader.get_fmi(), fmi)
def test_get(self):
- mmheader = MMHeader(ODA=0x616263646566, OSA=0x6768696A6B6C, VLANTag=0x6D6E6F70, MTYPE=0x7172, MMV=0x73, MMTYPE=0x7574, FMI=0x7776)
- self.assertEqual(mmheader.get(), 'abcdefghijklmnopqrstuvw')
+ # Test without vlantag
+ mmheader = MMHeader(ODA=0x616263646566, OSA=0x6768696A6B6C, MTYPE=0x6D6E, MMV=0x6F, MMTYPE=0x7170, FMI=0x7372)
+ self.assertEqual(mmheader.get(), 'abcdefghijklmnopqrs')
+
+ # Test with vlantag
+ mmheader = MMHeader(ODA=0x616263646566, OSA=0x6768696A6B6C, VLANTag=0x81006D6E, MTYPE=0x6F70, MMV=0x71, MMTYPE=0x7372, FMI=0x7574)
+ self.assertEqual(mmheader.get(), 'abcdefghijkl' + pack('!I', 0x81006D6E) + 'opqrstu')
suite = unittest.TestLoader().loadTestsFromTestCase(TestMMHeaderFunctions)
@@ -181,10 +205,21 @@ class TestMMEFunctions(unittest.TestCase):
pass
def test_set_mmheader(self):
- # Test with an MM Header object
+ # Test with an MM Header object with VLAN Tag
+ mmheader = MMHeader(VLANTag=0x81000000)
+ self.mme.set_mmheader(mmheader)
+ self.assertEqual(self.mme.get_mmheader(), mmheader)
+
+ # Test with an MM Header object without VLAN Tag
mmheader = MMHeader()
self.mme.set_mmheader(mmheader)
self.assertEqual(self.mme.get_mmheader(), mmheader)
+
+ # Test with a Python string of length equals to 19 octets
+ mmheader = 'abcdefghijklmnopqrs'
+ self.mme.set_mmheader(mmheader)
+ self.assertEqual(self.mme.get_mmheader(), mmheader)
+
# Test with a Python string of length equals to 23 octets
mmheader = 'abcdefghijklmnopqrstuvw'
self.mme.set_mmheader(mmheader)
@@ -197,9 +232,16 @@ class TestMMEFunctions(unittest.TestCase):
self.assertEqual(self.mme.get_mmentry(), mmentry)
def test_set_msdu_attr(self):
- payload = 23*'H' + 'Payload'
+ # Test with VLAN Tag
+ payload = 12*'H' + pack('!I', 0x81000000) + 7*'H' + 'Payload'
+ self.mme.set_msdu_attr(payload)
+ self.assertEqual(self.mme.get_mmheader(), 12*'H' + pack('!I', 0x81000000) + 7*'H')
+ self.assertEqual(self.mme.get_mmentry(), 'Payload')
+
+ # Test without VLAN Tag
+ payload = 19*'H' + 'Payload'
self.mme.set_msdu_attr(payload)
- self.assertEqual(self.mme.get_mmheader(), 23*'H')
+ self.assertEqual(self.mme.get_mmheader(), 19*'H')
self.assertEqual(self.mme.get_mmentry(), 'Payload')
def test_sendnrecv(self):
@@ -219,6 +261,7 @@ class TestMMEFunctions(unittest.TestCase):
sta.remove()
def test_get(self):
+ # Test with VLAN Tag
mme1 = MME(MMHeader=23*'A', MMEntry='B')
self.assertEqual(mme1.get(), 23*'A'+'B'+36*pack('B',0))
mme2 = MME(MMHeader=23*'A', MMEntry=36*'B')
@@ -227,6 +270,16 @@ class TestMMEFunctions(unittest.TestCase):
self.assertEqual(mme3.get(), 23*'A'+37*'B')
mme4 = MME(MMHeader=23*'A', MMEntry=1000*'B')
self.assertEqual(mme4.get(), 23*'A'+1000*'B')
+
+ # Test without VLAN Tag
+ mme1 = MME(MMHeader=19*'A', MMEntry='B')
+ self.assertEqual(mme1.get(), 19*'A'+'B'+40*pack('B',0))
+ mme2 = MME(MMHeader=19*'A', MMEntry=40*'B')
+ self.assertEqual(mme2.get(), 19*'A'+40*'B'+pack('B',0))
+ mme3 = MME(MMHeader=19*'A', MMEntry=41*'B')
+ self.assertEqual(mme3.get(), 19*'A'+41*'B')
+ mme4 = MME(MMHeader=19*'A', MMEntry=1000*'B')
+ self.assertEqual(mme4.get(), 19*'A'+1000*'B')
def test_get_ether_type(self):
self.assertEqual(self.mme.get_ether_type(), 2)
diff --git a/maximus/stationtest/src/test_station.c b/maximus/stationtest/src/test_station.c
index b31ba7c24c..b61977cd72 100644
--- a/maximus/stationtest/src/test_station.c
+++ b/maximus/stationtest/src/test_station.c
@@ -38,9 +38,9 @@ void ipmbox_rx_cb (void *user_data, u32 *first_msg, uint length)
uint data_length = (uint)(hdr->param >> 1);
memcpy(ctx->first_buffer->next->data, (u32 *)ctx->rx.mailbox[1], data_length);
char *data = (char *)ctx->first_buffer->next->data;
- *(data + 19) = *(data + 19) + 1; // REQ => CNF
- *(data + 23) = 0x01; // Success
- memset(ctx->first_buffer->next->data + 23, '\0', data_length - 23);
+ *(data + 15) = *(data + 15) + 1; // REQ => CNF
+ *(data + 19) = 0x01; // Success
+ memset(ctx->first_buffer->next->data + 19, '\0', data_length - 19);
hdr->param |= 0x001;
ctx->rx.mailbox[1] = (u32)ctx->first_buffer->next->data;