summaryrefslogtreecommitdiff
path: root/cesar/maximus/python/test/test_ethernet.py
diff options
context:
space:
mode:
Diffstat (limited to 'cesar/maximus/python/test/test_ethernet.py')
-rw-r--r--cesar/maximus/python/test/test_ethernet.py489
1 files changed, 489 insertions, 0 deletions
diff --git a/cesar/maximus/python/test/test_ethernet.py b/cesar/maximus/python/test/test_ethernet.py
new file mode 100644
index 0000000000..fec75fe623
--- /dev/null
+++ b/cesar/maximus/python/test/test_ethernet.py
@@ -0,0 +1,489 @@
+#! usr/bin/env python
+
+print "\n*** " + __file__ + " ***\n"
+
+import sys, startup
+
+from maximus.ethernet import *
+from maximus.ethernet.buffer import Buffer
+from maximus.ethernet.create import create_eth, create_buffer, create_sniffer
+from maximus.ethernet.eth import MIN_SIZE_OF_HEADER, MAX_SIZE_OF_HEADER
+from maximus.ethernet.sniffer import Sniffer
+from maximus.macframe.msdu import MIN_SIZE_OF_MSDU
+from maximus.simu.rx import * # for 'recv()' function
+from maximus.utils.exception import Error, OutOfRangeError
+from interface import *
+from struct import pack, unpack
+
+
+# ETH TEST
+
+# Create a Maximus instance
+m = Maximus()
+
+# Initialize Maximus with command line arguments
+m.init(sys.argv)
+
+# Create the destination station
+staRx = m.create_sta()
+staRx.debug()
+
+# Init ether
+fcall = m.create_fcall('init_ether')
+fcall.send(staRx)
+
+# Create an Ethernet frame
+f = Eth()
+
+# Create an Ethernet frame
+f.dst = '41:42:43:44:45:46'
+f.src = '47:48:49:4a:4b:4c'
+f.vlantag = 0x81004D4E
+f.type = 0x4F50
+s = 'This is the Ethernet Payload'
+f.payload = s
+if len(f) != MIN_SIZE_OF_HEADER + len(s):
+ print "expected length = ", MIN_SIZE_OF_HEADER + len(s)
+ print "length = ", len(f)
+ raise Error('Ethernet frame payload length')
+if str(f) != 'ABCDEFGHIJKLOP' + str(f.payload):
+ print "expected payload =", 'ABCDEFGHIJKLOP' + str(f.payload)
+ print "payload = ", str(f)
+ raise Error('Ethernet frame payload')
+
+# Create an Ethernet frame containing an IP frame
+g = Eth(dst=f.dst, src=f.src)/IP()
+if len(g) != MIN_SIZE_OF_HEADER + len(str(IP())):
+ raise Error('Ethernet IP frame payload length')
+h = unpack(34*'B', str(g))
+if h != (65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 8, 0, 69, 0, 0, 20, 0, 1, 0, 0, 64, 0, 124, 231, 127, 0, 0, 1, 127, 0, 0, 1):
+ raise Error('Ethernet IP frame payload')
+
+# Send the Ethernet frame asynchronously
+alloc_data_buffer(m, staRx, buffer_nb=2)
+f.send(m, staRx)
+
+# Send the Ethernet frame synchronously
+alloc_data_buffer(m, staRx, buffer_nb=2)
+rsp = f.sendnrecv(m, staRx)
+
+# Uninit ether
+fcall2 = m.create_fcall('uninit_ether')
+fcall2.send(staRx)
+
+# Remove the destination station
+staRx.remove()
+
+
+# SCAPY TEST
+
+#print [p for p in IP()]
+target="www.target.com"
+target="www.target.com/30"
+ip=IP(dst=target)
+#hexdump(ip)
+#print [p for p in ip]
+
+#print [p for p in IP()]
+a=IP(dst="172.16.1.40")
+#print [p for p in a]
+#print a.dst
+#print a.ttl
+
+a.ttl=32
+#print [p for p in a]
+del(a.ttl)
+#print [p for p in a]
+#print a.ttl
+
+t=TCP()
+t.flags="SA"
+#print t.flags
+#print [p for p in t]
+t.flags=23
+#print [p for p in t]
+i=IP(flags="DF+MF")
+#print i.flags
+#print [p for p in i]
+i.flags=6
+#print [p for p in i]
+
+#print a.dst
+#print a.src
+del(a.dst)
+#print a.dst
+#print a.src
+a.dst="192.168.11.10"
+#print a.src
+a.dst=target
+#print a.src
+a.src="1.2.3.4"
+#print [p for p in a]
+
+#print [p for p in IP()]
+#print [p for p in IP()/TCP()]
+#print [p for p in Ether()/IP()/TCP()]
+#print [p for p in IP()/TCP()/"GET / HTTP/1.0\r\n\r\n"]
+#print [p for p in Ether()/IP()/IP()/UDP()]
+#print [p for p in IP(proto=55)/TCP()]
+
+#print str(IP())
+#print [p for p in IP(str(IP()))]
+a=Ether()/IP()/TCP()/"GET /index.html HTTP/1.0 \n\n"
+#hexdump(a)
+b=str(a)
+#print b
+c=Ether(b)
+#print [p for p in c]
+
+c.hide_defaults()
+#print [p for p in c]
+
+a=IP(dst="www.slashdot.org/30")
+#hexdump(a)
+#print [p for p in a]
+b=IP(ttl=[1,2,(5,9)])
+#hexdump(b)
+#print [p for p in b]
+c=TCP(dport=[80,443])
+#print [p for p in a/c]
+
+d = IP(dst="1.2.3.4")/ICMP()
+e = Ether()/IP(dst="1.2.3.4",ttl=(1,4))
+
+
+# DOC TEST
+
+import doctest
+doctest.testmod(buffer)
+doctest.testmod(create)
+doctest.testmod(eth)
+doctest.testmod(scapy)
+doctest.testmod(sniffer)
+
+
+# UNIT TEST
+
+import unittest
+
+class TestEthFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.eth = Eth()
+ self.m = m
+
+ def tearDown(self):
+ pass
+
+ def test_set_msdu_attr(self):
+ # Test without vlantag
+ p = 'ABCDEFGHIJKLMNThis is the Ethernet Payload'
+ self.eth.set_msdu_attr(p)
+ self.assertEqual(self.eth.dst, '41:42:43:44:45:46')
+ self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c')
+ self.assertEqual(self.eth.type, 0x4D4E)
+ self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
+
+ # Test with vlantag
+ p = 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload'
+ self.eth.set_msdu_attr(p)
+ self.assertEqual(self.eth.dst, '41:42:43:44:45:46')
+ self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c')
+ self.assertEqual(self.eth.vlantag, 0x81004D4E)
+ self.assertEqual(self.eth.type, 0x4F50)
+ self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload')
+
+ def test_sendnrecv(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ none_buffer = Buffer('ETHERNET_TYPE_NONE')
+ length = len(none_buffer.get_buffer_dict())
+
+ # Disable the automatic buffer allocation
+ realloc_buffer(False)
+
+ alloc_data_buffer(self.m, sta, buffer_nb=2)
+ rsp = self.eth.sendnrecv(self.m, sta, count=4)
+
+ # 1st received frame is the Ethernet frame
+ self.assertEqual(rsp[0].dst, 'ff:ff:ff:ff:ff:ff')
+ self.assertEqual(rsp[0].src, '')
+ self.assertEqual(rsp[0].type, 0)
+
+ # 2nd and 4th received frames are BUFFER RELEASED messages
+ self.assertEqual(rsp[1].get_ether_type(), 6)
+ self.assertEqual(rsp[1].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual(rsp[3].get_ether_type(), 6)
+ self.assertEqual(rsp[3].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual(len(rsp[1].get_buffer_dict()), length)
+
+ # Enable the automatic buffer allocation
+ realloc_buffer(True)
+
+ alloc_data_buffer(self.m, sta, buffer_nb=2)
+ length = len(none_buffer.get_buffer_dict())
+ rsp = self.eth.sendnrecv(self.m, sta, count=4)
+
+ # 1st received frame is the Ethernet frame
+ self.assertEqual(rsp[0].dst, 'ff:ff:ff:ff:ff:ff')
+ self.assertEqual(rsp[0].src, '')
+ self.assertEqual(rsp[0].type, 0)
+
+ # 3rd and 4th received frames are BUFFER RELEASED messages
+ self.assertEqual(rsp[2].get_ether_type(), 6)
+ self.assertEqual(rsp[2].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual(rsp[3].get_ether_type(), 6)
+ self.assertEqual(rsp[3].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual(len(rsp[3].get_buffer_dict()), length)
+
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_send(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ alloc_data_buffer(self.m, sta, buffer_nb=2)
+ self.eth.send(self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_get(self):
+ # Test without vlantag
+ self.eth.dst = '41:42:43:44:45:46'
+ self.eth.src = '47:48:49:4a:4b:4c'
+ self.eth.type = 0x4D4E
+ s = 'This is the Ethernet Payload'
+ self.eth.payload = s
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - MIN_SIZE_OF_HEADER - len(s)) * pack('B', 0))
+
+ # Test with vlantag
+ self.eth.dst = '41:42:43:44:45:46'
+ self.eth.src = '47:48:49:4a:4b:4c'
+ self.eth.vlantag = 0x81004D4E
+ self.eth.type = 0x4F50
+ s = 'This is the Ethernet Payload'
+ self.eth.payload = s
+ self.assertEqual(self.eth.get(), 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload' + (MIN_SIZE_OF_MSDU - MAX_SIZE_OF_HEADER - len(s)) * pack('B', 0))
+
+ # Test with a bad vlantag
+ test = False
+ self.eth.vlantag = 0x4D4E4D4E
+ try:
+ self.eth.get()
+ except OutOfRangeError:
+ test = True
+ self.assert_(test)
+
+ def test_get_ether_type(self):
+ self.assertEqual(self.eth.get_ether_type(), 1)
+
+ def test_get_type(self):
+ self.assertEqual(self.eth.get_type(), 'ETHERNET_TYPE_DATA')
+
+ def test_create_eth(self):
+ self.assertNotEqual(create_eth(), None)
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestEthFunctions)
+
+class TestBufferFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.data_buffer = Buffer('ETHERNET_TYPE_DATA_BUFFER_ADD')
+ self.assertEqual(self.data_buffer.get_ether_type(), 3)
+ self.assertEqual(self.data_buffer.get_type(), 'ETHERNET_TYPE_DATA_BUFFER_ADD')
+ self.assertEqual(self.data_buffer.get_buffer_nb(), 1)
+ self.mme_buffer = Buffer('ETHERNET_TYPE_MME_BUFFER_ADD', buffer_nb=2)
+ self.assertEqual(self.mme_buffer.get_ether_type(), 4)
+ self.assertEqual(self.mme_buffer.get_type(), 'ETHERNET_TYPE_MME_BUFFER_ADD')
+ self.assertEqual(self.mme_buffer.get_buffer_nb(), 2)
+ self.interface_buffer = Buffer('ETHERNET_TYPE_INTERFACE_BUFFER_ADD', buffer_nb=3)
+ self.assertEqual(self.interface_buffer.get_ether_type(), 5)
+ self.assertEqual(self.interface_buffer.get_type(), 'ETHERNET_TYPE_INTERFACE_BUFFER_ADD')
+ self.assertEqual(self.interface_buffer.get_buffer_nb(), 3)
+ self.m = m
+
+ def tearDown(self):
+ pass
+
+ def test_set_type(self):
+ self.data_buffer.set_type('ETHERNET_TYPE_BUFFER_RELEASED')
+ self.assertEqual(self.data_buffer.get_ether_type(), 6)
+ self.assertEqual(self.data_buffer.get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ try:
+ self.mme_buffer.set_type('ETHERNET_TYPE_MME')
+ except OutOfRangeError:
+ self.assertEqual(self.mme_buffer.get_ether_type(), 0)
+ self.assertEqual(self.mme_buffer.get_type(), 'ETHERNET_TYPE_NONE')
+
+ def test_set_buffer_nb(self):
+ self.data_buffer.set_buffer_nb(123)
+ self.assertEqual(self.data_buffer.get_buffer_nb(), 123)
+ try:
+ self.mme_buffer.set_buffer_nb(-123)
+ except OutOfRangeError:
+ self.assertEqual(self.mme_buffer.get_buffer_nb(), 0)
+
+ def test_set_buffer_realloc(self):
+ self.assertEqual(self.data_buffer.get_buffer_realloc(), False)
+ self.data_buffer.set_buffer_realloc(True)
+ self.assertEqual(self.data_buffer.get_buffer_realloc(), True)
+ self.data_buffer.set_buffer_realloc(False)
+
+ def test_realloc(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ realloc_buffer(True)
+ buffer_id = 123
+ self.interface_buffer.get_buffer_dict()[buffer_id] = 'ETHERNET_TYPE_DATA_BUFFER_ADD'
+ self.interface_buffer.realloc(station_id=sta.get_station_id(), payload=pack('I', buffer_id))
+ buffer_id = 456
+ self.interface_buffer.get_buffer_dict()[buffer_id] = 'ETHERNET_TYPE_MME_BUFFER_ADD'
+ self.interface_buffer.realloc(station_id=sta.get_station_id(), payload=pack('I', buffer_id))
+ buffer_id = 789
+ self.interface_buffer.get_buffer_dict()[buffer_id] = 'ETHERNET_TYPE_INTERFACE_BUFFER_ADD'
+ self.interface_buffer.realloc(station_id=sta.get_station_id(), payload=pack('I', buffer_id))
+ rsp = recv(self.m, count=2)
+ self.assertEqual(rsp[0].get_type(), 'ETHERNET_TYPE_SNIFFER')
+ self.assertEqual(rsp[1].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ realloc_buffer(False)
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_set_msdu_attr(self):
+ id = 123
+ self.assert_(self.data_buffer.get_buffer_dict().has_key(id))
+ self.data_buffer.set_msdu_attr(pack('I', id))
+ self.assert_(not self.data_buffer.get_buffer_dict().has_key(id))
+
+ def test_sendnrecv(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ interface_buffer = Buffer('ETHERNET_TYPE_INTERFACE_BUFFER_ADD', buffer_nb=2)
+ rsp = interface_buffer.sendnrecv(self.m, sta, count=2)
+ self.assertEqual(rsp[0].get_type(), 'ETHERNET_TYPE_SNIFFER')
+ self.assertEqual(rsp[1].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_send(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ mme_buffer = Buffer('ETHERNET_TYPE_MME_BUFFER_ADD', buffer_nb=10)
+ mme_buffer.send(self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_get(self):
+ mme_buffer = Buffer('ETHERNET_TYPE_MME_BUFFER_ADD', buffer_nb=3)
+ buf = mme_buffer.get()
+ res = pack('I', mme_buffer.get_buffer_nb())
+ for i in range (0, 3):
+ res += pack('I', mme_buffer.get_buffer_id() - 2 + i)
+ self.assertEqual(buf, res)
+
+ def test_create_buffer(self):
+ self.assertNotEqual(create_buffer(), None)
+
+ def test_alloc_data_buffer(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ alloc_data_buffer(self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_alloc_mme_buffer(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ alloc_mme_buffer(self.m, sta)
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_alloc_interface_buffer(self):
+ sta = self.m.create_sta()
+ self.m.create_fcall('init_ether').send(sta)
+ realloc_buffer(True)
+ alloc_interface_buffer(self.m, sta)
+ rsp = recv(self.m, count=2)
+ self.assertEqual(rsp[0].get_type(), 'ETHERNET_TYPE_SNIFFER')
+ self.assertEqual(rsp[1].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ realloc_buffer(False)
+ alloc_interface_buffer(self.m, sta)
+ rsp = recv(self.m, count=2)
+ self.assertEqual(rsp[0].get_type(), 'ETHERNET_TYPE_SNIFFER')
+ self.assertEqual(rsp[1].get_type(), 'ETHERNET_TYPE_BUFFER_RELEASED')
+ self.m.create_fcall('uninit_ether').send(sta)
+ sta.remove()
+
+ def test_realloc_buffer(self):
+ self.assertEqual(self.interface_buffer.get_buffer_realloc(), False)
+ realloc_buffer(True)
+ self.assertEqual(self.interface_buffer.get_buffer_realloc(), True)
+ realloc_buffer(False)
+
+ def test_get_buffer_dict(self):
+ get_buffer_dict()
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBufferFunctions))
+
+class TestSnifferFunctions(unittest.TestCase):
+
+ def setUp(self):
+ self.sniffer = Sniffer(way=False, encryption=False, sniffer_type=0)
+ self.assertEqual(self.sniffer.get(), None)
+ self.assertEqual(self.sniffer.get_ether_type(), 7)
+ self.assertEqual(self.sniffer.get_type(), 'ETHERNET_TYPE_SNIFFER')
+ self.assertEqual(self.sniffer.get_way(), False)
+ self.assertEqual(self.sniffer.get_encryption(), False)
+ self.assertEqual(self.sniffer.get_sniffer_type(), 0)
+ self.m = m
+
+ def tearDown(self):
+ pass
+
+ def test_set_way(self):
+ way = True
+ self.sniffer.set_way(way)
+ self.assertEqual(self.sniffer.get_way(), way)
+
+ def test_set_encryption(self):
+ encryption = True
+ self.sniffer.set_encryption(encryption)
+ self.assertEqual(self.sniffer.get_encryption(), encryption)
+
+ def test_set_sniffer_type(self):
+ sniffer_type = 1
+ self.sniffer.set_sniffer_type(sniffer_type)
+ self.assertEqual(self.sniffer.get_sniffer_type(), sniffer_type)
+
+ def test_set_msdu_attr(self):
+ payload = "This is the sniffed packed"
+ self.sniffer.set_msdu_attr(payload)
+ self.assertEqual(self.sniffer.get(), payload)
+
+ def test_display(self):
+ self.sniffer.display()
+ self.sniffer.set_way(True)
+ self.sniffer.set_encryption(True)
+ self.sniffer.set_sniffer_type(1)
+ self.sniffer.set_msdu_attr("ABCD")
+ self.sniffer.display()
+
+ def test_create_sniffer(self):
+ way = True
+ encryption = True
+ sniffer_type = 1
+ self.assertNotEqual(create_sniffer(way, encryption, sniffer_type), None)
+
+suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestSnifferFunctions))
+
+try:
+ suite.addTest(doctest.DocTestSuite(buffer))
+ suite.addTest(doctest.DocTestSuite(create))
+ suite.addTest(doctest.DocTestSuite(eth))
+ suite.addTest(doctest.DocTestSuite(scapy))
+ suite.addTest(doctest.DocTestSuite(sniffer))
+except ValueError:
+ print "has no tests"
+
+if __name__ == '__main__':
+ testResult = unittest.TextTestRunner(verbosity=2).run(suite)