summaryrefslogtreecommitdiff
path: root/cesar/maximus/python/py/test_ether.py
diff options
context:
space:
mode:
Diffstat (limited to 'cesar/maximus/python/py/test_ether.py')
-rw-r--r--cesar/maximus/python/py/test_ether.py227
1 files changed, 227 insertions, 0 deletions
diff --git a/cesar/maximus/python/py/test_ether.py b/cesar/maximus/python/py/test_ether.py
new file mode 100644
index 0000000000..db2184c29c
--- /dev/null
+++ b/cesar/maximus/python/py/test_ether.py
@@ -0,0 +1,227 @@
+#!/usr/bin/python
+
+print "\n*** " + __file__ + " ***\n"
+
+import sys
+sys.path.append('./test')
+sys.path.append('../test')
+import startup
+
+from interface import *
+from maximus import *
+from maximus.ethernet.eth import MAX_SIZE_OF_HEADER
+from maximus.mme.mmheader import MAX_SIZE_OF_MMHEADER
+from maximus.macframe.msdu import MIN_SIZE_OF_MSDU
+from maximus.simu.rx import * # for 'recv()' function
+from struct import pack
+
+m = Maximus()
+m.init(sys.argv)
+
+
+# TEST ETHER
+# To be run with "test_ether.elf" station executable
+# (to be compiled under "/trunk/maximus/stationtest").
+
+# Create station(s)
+sta1 = m.create_sta ()
+sta1.debug()
+
+# Init ether
+fcall = m.create_fcall('init_ether')
+fcall.send(sta1)
+
+# Test following functions:
+# - send(self, maximus, file=None)
+# - sendnrecv(self, maximus, file=None, timeout=None, filter=None, count=1)
+# - recv(maximus, timeout=None, filter=None, count=1)
+
+# Disable the automatic buffer allocation
+realloc_buffer(False)
+
+# Ethernet Frame config
+dst = '41:42:43:44:45:46'
+src = '47:48:49:4a:4b:4c'
+vlantag = 0x81004D4E
+type = 0x4F50
+
+# MME config
+mmheader = 'MMHeader 19 octets!'
+mmentry = '--- This is the Management Message Entry ---'
+
+# Send an Ethernet Frame asynchronously
+frame1 = Eth()
+frame1.dst = dst
+frame1.src = src
+frame1.vlantag = vlantag
+frame1.type = type
+msdu1 = 'This is the Ethernet payload'
+frame1.payload = msdu1
+alloc_data_buffer(m, sta1, buffer_nb=2)
+frame1.send(m, sta1)
+rsp1 = recv(m, count=4)
+
+# Test the 1st received MSDU (Eth) object
+i = 0
+if rsp1[i].dst != dst:
+ print "dst1 =", rsp1[i].dst
+ raise Error("dst1")
+if rsp1[i].src != src:
+ print "src1 =", rsp1[i].src
+ raise Error("src1")
+if rsp1[i].vlantag != vlantag:
+ print "vlantag1 =", hex(rsp1[i].vlantag)
+ raise Error("vlantag1")
+if rsp1[i].type != type:
+ print "type1 =", hex(rsp1[i].type)
+ raise Error("type1")
+if str(rsp1[i].payload) != msdu1 + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(msdu1)) * pack('B', 0):
+ print len(str(rsp1[i].payload))
+ print "msdu1 =", str(rsp1[i].payload)
+ raise Error("msdu1")
+
+# Test the 2nd received MSDU (MME) object
+i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
+if rsp1[i].get_mmheader() != 'ABCDEFGHIJKL' + pack('!I', vlantag) + 'OPThis ':
+ print "mmheader1 =", rsp1[i].get_mmheader()
+ raise Error("mmheader1")
+if rsp1[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_MMHEADER - 23) * pack('B', 0):
+ print "mmentry1 =", rsp1[i].get_mmentry()
+ raise Error("mmentry1")
+
+m.wait(100000)
+
+# Send an Ethernet Frame synchronously
+frame2 = Eth()
+frame2.dst = dst
+frame2.src = src
+frame2.vlantag = vlantag
+frame2.type = type
+msdu2 = 'This is the Ethernet payload'
+frame2.payload = msdu2
+alloc_data_buffer(m, sta1, buffer_nb=2)
+rsp2 = frame2.sendnrecv(m, sta1, count=4)
+
+# Test the 1st received MSDU (Eth) object
+i = 0
+if rsp2[i].dst != dst:
+ print "dst2 =", rsp2[i].dst
+ raise Error("dst2")
+if rsp2[i].src != src:
+ print "src2 =", rsp2[i].src
+ raise Error("src2")
+if rsp2[i].vlantag != vlantag:
+ print "vlantag2 =", hex(rsp2[i].vlantag)
+ raise Error("vlantag2")
+if rsp2[i].type != type:
+ print "type2 =", hex(rsp2[i].type)
+ raise Error("type2")
+if str(rsp2[i].payload) != msdu2 + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(msdu2)) * pack('B', 0):
+ print "msdu2 =", str(rsp2[i].payload)
+ raise Error("msdu2")
+
+# Test the 2nd received MSDU (MME) object
+i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
+if rsp2[i].get_mmheader() != 'ABCDEFGHIJKL' + pack('!I', vlantag) + 'OPThis ':
+ print "mmheader2 =", rsp2[i].get_mmheader()
+ raise Error("mmheader2")
+if rsp2[i].get_mmentry() != 'is the Ethernet payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_MMHEADER - 23) * pack('B', 0):
+ print "mmentry2 =", rsp2[i].get_mmentry()
+ raise Error("mmentry2")
+
+m.wait(100000)
+
+# Send an MME asynchronously
+mme3 = MME(MMHeader=mmheader, MMEntry=mmentry)
+alloc_data_buffer(m, sta1, buffer_nb=2)
+mme3.send(m, sta1)
+rsp3 = recv(m, count=4)
+
+# Test the 1st received MSDU (Eth) object
+i = 0
+if rsp3[i].dst != '4d:4d:48:65:61:64': # 'MMHead'
+ print "dst3 =", rsp3[i].dst
+ raise Error("dst3")
+if rsp3[i].src != '65:72:20:31:39:20': # 'er 19 '
+ print "src3 =", rsp3[i].src
+ raise Error("src3")
+if rsp3[i].type != 0x6F63: # 'oc'
+ print "type3 =", hex(rsp3[i].type)
+ raise Error("type3")
+if str(rsp3[i].payload) != 'tets!--- This is the Management Message Entry ---':
+ print "msdu3 =", str(rsp3[i].payload)
+ raise Error("msdu3")
+
+# Test the 2nd received MSDU (MME) object
+i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
+if rsp3[i].get_mmheader() != mmheader:
+ print "mmheader3 =", rsp3[i].get_mmheader()
+ raise Error("mmheader3")
+if rsp3[i].get_mmentry() != mmentry:
+ print "mmentry3 =", rsp3[i].get_mmentry()
+ raise Error("mmentry3")
+
+m.wait(100000)
+
+# Send an MME synchronously
+mme4 = MME(MMHeader=mmheader, MMEntry=mmentry)
+alloc_data_buffer(m, sta1, buffer_nb=2)
+rsp4 = mme4.sendnrecv(m, sta1, count=4)
+
+# Test the 1st received MSDU (Eth) object
+i = 0
+if rsp4[i].dst != '4d:4d:48:65:61:64': # 'MMHead'
+ print "dst4 =", rsp4[i].dst
+ raise Error("dst4")
+if rsp4[i].src != '65:72:20:31:39:20': # 'er 19 '
+ print "src4 =", rsp4[i].src
+ raise Error("src4")
+if rsp4[i].type != 0x6F63: # 'oc'
+ print "type4 =", hex(rsp4[i].type)
+ raise Error("type4")
+if str(rsp4[i].payload) != 'tets!--- This is the Management Message Entry ---':
+ print "msdu4 =", str(rsp4[i].payload)
+ raise Error("msdu4")
+
+# Test the 2nd received MSDU (MME) object
+i = 2 # i = 2 because the second received frame is the BUFFER RELEASED message
+if rsp4[i].get_mmheader() != mmheader:
+ print "mmheader4 =", rsp4[i].get_mmheader()
+ raise Error("mmheader4")
+if rsp4[i].get_mmentry() != mmentry:
+ print "mmentry4 =", rsp4[i].get_mmentry()
+ raise Error("mmentry4")
+
+m.wait(100000)
+
+# Test the sniffed received MSDU (Sniffer) object
+alloc_interface_buffer(m, sta1)
+snif = recv(m, count=2)[0] # the second received frame is the BUFFER RELEASED message
+snif.display()
+if snif.get()[0:48] != "This is a sniffed packet coming from the station":
+ print "packet =", snif.get()
+ raise Error("packet")
+if snif.get_ether_type() != 7:
+ print "ether type =", snif.get_ether_type()
+ raise Error("ether type")
+if snif.get_type() != 'ETHERNET_TYPE_SNIFFER':
+ print "type =", snif.get_type()
+ raise Error("type")
+if snif.get_way() != True:
+ print "way =", snif.get_way()
+ raise Error("way")
+if snif.get_encryption() != True:
+ print "encryption =", snif.get_encryption()
+ raise Error("encryption")
+if snif.get_sniffer_type() != 1:
+ print "sniffer type =", snif.get_sniffer_type()
+ raise Error("sniffer type")
+
+# Uninit ether
+fcall2 = m.create_fcall('uninit_ether')
+fcall2.send(sta1)
+
+# Remove station(s)
+sta1.remove()
+
+print "\n*** END ***\n"