summaryrefslogtreecommitdiff
path: root/cesar/maximus/python/utest/script_2/test_ethernet.py
diff options
context:
space:
mode:
Diffstat (limited to 'cesar/maximus/python/utest/script_2/test_ethernet.py')
-rw-r--r--cesar/maximus/python/utest/script_2/test_ethernet.py369
1 files changed, 369 insertions, 0 deletions
diff --git a/cesar/maximus/python/utest/script_2/test_ethernet.py b/cesar/maximus/python/utest/script_2/test_ethernet.py
new file mode 100644
index 0000000000..ff4e208169
--- /dev/null
+++ b/cesar/maximus/python/utest/script_2/test_ethernet.py
@@ -0,0 +1,369 @@
+#! usr/bin/env python
+
+import sys, startup
+
+from maximus.ethernet import *
+from maximus.ethernet.buffer import Buffer
+from maximus.ethernet.create import create_eth, create_buffer
+from maximus.ethernet.eth import MIN_SIZE_OF_HEADER, MAX_SIZE_OF_HEADER
+from maximus.mme.mmheader import MIN_SIZE_OF_MMHEADER
+from maximus.macframe.msdu import MIN_SIZE_OF_MSDU
+from maximus.simu.rx import * # for 'recv()' function
+from maximus.utils.exception import Error, OutOfRangeError
+from interface import *
+from struct import pack, unpack
+
+
+# ETH TEST
+
+# Create a Maximus instance
+m = Maximus()
+
+# Initialize Maximus with command line arguments
+m.init(sys.argv)
+
+# Create the destination station
+staRx = m.create_sta()
+staRx.debug()
+
+# Init ether
+fcall = m.create_fcall('init_ether')
+fcall.send(staRx)
+
+# Create an Ethernet frame
+f = Eth()
+
+# Create an Ethernet frame
+f.dst = '41:42:43:44:45:46'
+f.src = '47:48:49:4a:4b:4c'
+f.vlantag = 0x81004D4E
+f.type = 0x4F50
+s = 'This is the Ethernet Payload'
+f.payload = s
+if len(f) != MIN_SIZE_OF_HEADER + len(s):
+ print "expected length = ", MIN_SIZE_OF_HEADER + len(s)
+ print "length = ", len(f)
+ raise Error('Ethernet frame payload length')
+if str(f) != 'ABCDEFGHIJKLOP' + str(f.payload):
+ print "expected payload =", 'ABCDEFGHIJKLOP' + str(f.payload)
+ print "payload = ", str(f)
+ raise Error('Ethernet frame payload')
+
+# Create an Ethernet frame containing an IP frame
+g = Eth(dst=f.dst, src=f.src)/IP()
+if len(g) != MIN_SIZE_OF_HEADER + len(str(IP())):
+ raise Error('Ethernet IP frame payload length')
+h = unpack(34*'B', str(g))
+if h != (65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 8, 0, 69, 0, 0, 20, 0, 1, 0, 0, 64, 0, 124, 231, 127, 0, 0, 1, 127, 0, 0, 1):
+ raise Error('Ethernet IP frame payload')
+
+# Send the Ethernet frame asynchronously
+alloc_buffer (m, staRx, buffer_nb = 2)
+f.send(m, staRx)
+
+# Send the Ethernet frame synchronously
+alloc_buffer (m, staRx, buffer_nb = 2)
+rsp = f.sendnrecv(m, staRx)
+
+# Uninit ether
+fcall2 = m.create_fcall('uninit_ether')
+fcall2.send(staRx)
+
+# Remove the destination station
+del staRx
+
+
+# SCAPY TEST
+
+#print [p for p in IP()]
+target="www.target.com"
+target="www.target.com/30"
+ip=IP(dst=target)
+#hexdump(ip)
+#print [p for p in ip]
+
+#print [p for p in IP()]
+a=IP(dst="172.16.1.40")
+#print [p for p in a]
+#print a.dst
+#print a.ttl
+
+a.ttl=32
+#print [p for p in a]
+del(a.ttl)
+#print [p for p in a]
+#print a.ttl
+
+t=TCP()
+t.flags="SA"
+#print t.flags
+#print [p for p in t]
+t.flags=23
+#print [p for p in t]
+i=IP(flags="DF+MF")
+#print i.flags
+#print [p for p in i]
+i.flags=6
+#print [p for p in i]
+
+#print a.dst
+#print a.src
+del(a.dst)
+#print a.dst
+#print a.src
+a.dst="192.168.11.10"
+#print a.src
+a.dst=target
+#print a.src
+a.src="1.2.3.4"
+#print [p for p in a]
+
+#print [p for p in IP()]
+#print [p for p in IP()/TCP()]
+#print [p for p in Ether()/IP()/TCP()]
+#print [p for p in IP()/TCP()/"GET / HTTP/1.0\r\n\r\n"]
+#print [p for p in Ether()/IP()/IP()/UDP()]
+#print [p for p in IP(proto=55)/TCP()]
+
+#print str(IP())
+#print [p for p in IP(str(IP()))]
+a=Ether()/IP()/TCP()/"GET /index.html HTTP/1.0 \n\n"
+#hexdump(a)
+b=str(a)
+#print b
+c=Ether(b)
+#print [p for p in c]
+
+c.hide_defaults()
+#print [p for p in c]
+
+a=IP(dst="www.slashdot.org/30")
+#hexdump(a)
+#print [p for p in a]
+b=IP(ttl=[1,2,(5,9)])
+#hexdump(b)
+#print [p for p in b]
+c=TCP(dport=[80,443])
+#print [p for p in a/c]
+
+d = IP(dst="1.2.3.4")/ICMP()
+e = Ether()/IP(dst="1.2.3.4",ttl=(1,4))
+
+
+# DOC TEST
+
+import doctest
+doctest.testmod(buffer)
+doctest.testmod(create)
+doctest.testmod(eth)
+doctest.testmod(scapy)
+
+
+# UNIT TEST
+
+import unittest
+
+class TestEthFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.eth = Eth()
+ self.eth.payload = self.eth.pad(MIN_SIZE_OF_MMHEADER - len(str(self.eth)))
+ self.m = m
+
+ def tearDown(self):
+ pass
+
+ def test_set_msdu_attr(self):
+ # Test without vlantag
+ p = 'ABCDEFGHIJKLMNThis is the Ethernet Payload'
+ self.eth.set_msdu_attr(p)
+ self.assertEqual(self.eth.dst, '41:42:43:44:45:46')
+ self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c')
+ self.assertEqual(self.eth.type, 0x4D4E)
+ self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
+
+ # Test with vlantag
+ p = 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload'
+ self.eth.set_msdu_attr(p)
+ self.assertEqual(self.eth.dst, '41:42:43:44:45:46')
+ self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c')
+ self.assertEqual(self.eth.vlantag, 0x81004D4E)
+ self.assertEqual(self.eth.type, 0x4F50)
+ self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
+
+ def test_sendnrecv(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ none_buffer = Buffer('ETHERNET_TYPE_NONE')
+ length = len(none_buffer.get_buffer_dict())
+
+ # Disable the automatic buffer allocation
+ realloc_buffer(False)
+
+ alloc_buffer (self.m, sta, buffer_nb = 2)
+
+ # Enable the automatic buffer allocation
+ realloc_buffer(True)
+
+ alloc_buffer (self.m, sta, buffer_nb = 2)
+ length = len(none_buffer.get_buffer_dict())
+
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_send(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ alloc_buffer (self.m, sta, buffer_nb = 2)
+ self.eth.send(self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_get(self):
+ # Test without vlantag
+ self.eth.dst = '41:42:43:44:45:46'
+ self.eth.src = '47:48:49:4a:4b:4c'
+ self.eth.type = 0x4D4E
+ s = 'This is the Ethernet Payload'
+ self.eth.payload = s
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNThis is the Ethernet Payload')
+
+ # Test with vlantag
+ self.eth.dst = '41:42:43:44:45:46'
+ self.eth.src = '47:48:49:4a:4b:4c'
+ self.eth.vlantag = 0x81004D4E
+ self.eth.type = 0x4F50
+ s = 'This is the Ethernet Payload'
+ self.eth.payload = s
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload')
+
+ # Test with a bad vlantag
+ test = False
+ self.eth.vlantag = 0x4D4E4D4E
+ try:
+ self.eth.get()
+ except OutOfRangeError:
+ test = True
+ self.assert_(test)
+
+ def test_get_ether_type(self):
+ self.assertEqual(self.eth.get_ether_type(), 1)
+
+ def test_get_type(self):
+ self.assertEqual(self.eth.get_type(), 'ETHERNET_TYPE_DATA')
+
+ def test_create_eth(self):
+ self.assertNotEqual(create_eth(), None)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestEthFunctions)
+
+class TestBufferFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 8)
+ self.assertEqual (self.buffers.get_ether_type (), 3)
+ self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_BUFFER_ADD')
+ self.assertEqual (self.buffers.get_buffer_nb (), 8)
+ self.m = m
+
+ def tearDown(self):
+ pass
+
+ def test_set_type(self):
+ self.buffers.set_type ('ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual (self.buffers.get_ether_type (), 4)
+ self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ try:
+ self.buffers.set_type ('ETHERNET_TYPE_MME')
+ except OutOfRangeError:
+ self.assertEqual (self.buffers.get_ether_type (), 0)
+ self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_NONE')
+
+ def test_set_buffer_nb(self):
+ self.buffers.set_buffer_nb (123)
+ self.assertEqual (self.buffers.get_buffer_nb (), 123)
+ try:
+ self.buffers.set_buffer_nb (-123)
+ except OutOfRangeError:
+ self.assertEqual (self.buffers.get_buffer_nb (), 0)
+
+ def test_set_buffer_realloc(self):
+ self.assertEqual (self.buffers.get_buffer_realloc (), False)
+ self.buffers.set_buffer_realloc (True)
+ self.assertEqual (self.buffers.get_buffer_realloc (), True)
+ self.buffers.set_buffer_realloc (False)
+
+ def test_realloc(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ realloc_buffer(True)
+ buffer_id = 123
+ self.buffers.get_buffer_dict ()[buffer_id] = 'ETHERNET_TYPE_BUFFER_ADD'
+ self.buffers.realloc (station_id = sta.get_station_id (),
+ payload = pack ('I', buffer_id))
+ realloc_buffer(False)
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_set_msdu_attr(self):
+ id = 123
+ self.assert_ (self.buffers.get_buffer_dict ().has_key (id))
+ self.buffers.set_msdu_attr (pack ('I', id))
+ self.assert_ (not self.buffers.get_buffer_dict ().has_key (id))
+
+ def test_sendnrecv(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 2)
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_send(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 10)
+ buffers.send (self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_get(self):
+ buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 3)
+ buf = buffers.get ()
+ res = pack ('I', buffers.get_buffer_nb ())
+ for i in range (0, 3):
+ res += pack ('I', buffers.get_buffer_id () - 2 + i)
+ self.assertEqual(buf, res)
+
+ def test_create_buffer(self):
+ self.assertNotEqual(create_buffer(), None)
+
+ def test_alloc_buffer (self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ alloc_buffer (self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ del sta
+
+ def test_realloc_buffer(self):
+ self.assertEqual (self.buffers.get_buffer_realloc (), False)
+ realloc_buffer(True)
+ self.assertEqual (self.buffers.get_buffer_realloc (), True)
+ realloc_buffer(False)
+
+ def test_get_buffer_dict(self):
+ get_buffer_dict()
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBufferFunctions))
+
+try:
+ suite.addTest(doctest.DocTestSuite(buffer))
+ suite.addTest(doctest.DocTestSuite(create))
+ suite.addTest(doctest.DocTestSuite(eth))
+ suite.addTest(doctest.DocTestSuite(scapy))
+except ValueError:
+ print "has no tests"
+
+if __name__ == '__main__':
+ testResult = unittest.TextTestRunner(verbosity=2).run(suite)
+ # For nightly build errors
+ sys.exit ((1, 0)[testResult.wasSuccessful ()])