#! usr/bin/env python import sys, startup from maximus.ethernet import * from maximus.ethernet.buffer import Buffer from maximus.ethernet.create import create_eth, create_buffer from maximus.ethernet.eth import MIN_SIZE_OF_HEADER, MAX_SIZE_OF_HEADER from maximus.mme.mmheader import MIN_SIZE_OF_MMHEADER from maximus.macframe.msdu import MIN_SIZE_OF_MSDU from maximus.simu.rx import * # for 'recv()' function from maximus.utils.exception import Error, OutOfRangeError from interface import * from struct import pack, unpack # ETH TEST # Create a Maximus instance m = Maximus() # Initialize Maximus with command line arguments m.init(sys.argv) # Create the destination station staRx = m.create_sta() staRx.debug() # Init ether fcall = m.create_fcall('init_ether') fcall.send(staRx) # Create an Ethernet frame f = Eth() # Create an Ethernet frame f.dst = '41:42:43:44:45:46' f.src = '47:48:49:4a:4b:4c' f.vlantag = 0x81004D4E f.type = 0x4F50 s = 'This is the Ethernet Payload' f.payload = s if len(f) != MIN_SIZE_OF_HEADER + len(s): print "expected length = ", MIN_SIZE_OF_HEADER + len(s) print "length = ", len(f) raise Error('Ethernet frame payload length') if str(f) != 'ABCDEFGHIJKLOP' + str(f.payload): print "expected payload =", 'ABCDEFGHIJKLOP' + str(f.payload) print "payload = ", str(f) raise Error('Ethernet frame payload') # Create an Ethernet frame containing an IP frame g = Eth(dst=f.dst, src=f.src)/IP() if len(g) != MIN_SIZE_OF_HEADER + len(str(IP())): raise Error('Ethernet IP frame payload length') h = unpack(34*'B', str(g)) if h != (65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 8, 0, 69, 0, 0, 20, 0, 1, 0, 0, 64, 0, 124, 231, 127, 0, 0, 1, 127, 0, 0, 1): raise Error('Ethernet IP frame payload') # Send the Ethernet frame asynchronously alloc_buffer (m, staRx, buffer_nb = 2) f.send(m, staRx) # Send the Ethernet frame synchronously alloc_buffer (m, staRx, buffer_nb = 2) rsp = f.sendnrecv(m, staRx) # Uninit ether fcall2 = m.create_fcall('uninit_ether') fcall2.send(staRx) # Remove the destination station del staRx # SCAPY TEST #print [p for p in IP()] target="www.target.com" target="www.target.com/30" ip=IP(dst=target) #hexdump(ip) #print [p for p in ip] #print [p for p in IP()] a=IP(dst="172.16.1.40") #print [p for p in a] #print a.dst #print a.ttl a.ttl=32 #print [p for p in a] del(a.ttl) #print [p for p in a] #print a.ttl t=TCP() t.flags="SA" #print t.flags #print [p for p in t] t.flags=23 #print [p for p in t] i=IP(flags="DF+MF") #print i.flags #print [p for p in i] i.flags=6 #print [p for p in i] #print a.dst #print a.src del(a.dst) #print a.dst #print a.src a.dst="192.168.11.10" #print a.src a.dst=target #print a.src a.src="1.2.3.4" #print [p for p in a] #print [p for p in IP()] #print [p for p in IP()/TCP()] #print [p for p in Ether()/IP()/TCP()] #print [p for p in IP()/TCP()/"GET / HTTP/1.0\r\n\r\n"] #print [p for p in Ether()/IP()/IP()/UDP()] #print [p for p in IP(proto=55)/TCP()] #print str(IP()) #print [p for p in IP(str(IP()))] a=Ether()/IP()/TCP()/"GET /index.html HTTP/1.0 \n\n" #hexdump(a) b=str(a) #print b c=Ether(b) #print [p for p in c] c.hide_defaults() #print [p for p in c] a=IP(dst="www.slashdot.org/30") #hexdump(a) #print [p for p in a] b=IP(ttl=[1,2,(5,9)]) #hexdump(b) #print [p for p in b] c=TCP(dport=[80,443]) #print [p for p in a/c] d = IP(dst="1.2.3.4")/ICMP() e = Ether()/IP(dst="1.2.3.4",ttl=(1,4)) # DOC TEST import doctest doctest.testmod(buffer) doctest.testmod(create) doctest.testmod(eth) doctest.testmod(scapy) # UNIT TEST import unittest class TestEthFunctions(unittest.TestCase): def setUp(self): self.eth = Eth() self.eth.payload = self.eth.pad(MIN_SIZE_OF_MMHEADER - len(str(self.eth))) self.m = m def tearDown(self): pass def test_set_msdu_attr(self): # Test without vlantag p = 'ABCDEFGHIJKLMNThis is the Ethernet Payload' self.eth.set_msdu_attr(p) self.assertEqual(self.eth.dst, '41:42:43:44:45:46') self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c') self.assertEqual(self.eth.type, 0x4D4E) self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload') # Test with vlantag p = 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload' self.eth.set_msdu_attr(p) self.assertEqual(self.eth.dst, '41:42:43:44:45:46') self.assertEqual(self.eth.src, '47:48:49:4a:4b:4c') self.assertEqual(self.eth.vlantag, 0x81004D4E) self.assertEqual(self.eth.type, 0x4F50) self.assertEqual(str(self.eth.payload), 'This is the Ethernet Payload') def test_sendnrecv(self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) none_buffer = Buffer('ETHERNET_TYPE_NONE') length = len(none_buffer.get_buffer_dict()) # Disable the automatic buffer allocation realloc_buffer(False) alloc_buffer (self.m, sta, buffer_nb = 2) # Enable the automatic buffer allocation realloc_buffer(True) alloc_buffer (self.m, sta, buffer_nb = 2) length = len(none_buffer.get_buffer_dict()) self.m.create_fcall('uninit_ether').send(sta) del sta def test_send(self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) alloc_buffer (self.m, sta, buffer_nb = 2) self.eth.send(self.m, sta) self.m.create_fcall('uninit_ether').send(sta) del sta def test_get(self): # Test without vlantag self.eth.dst = '41:42:43:44:45:46' self.eth.src = '47:48:49:4a:4b:4c' self.eth.type = 0x4D4E s = 'This is the Ethernet Payload' self.eth.payload = s self.assertEqual(self.eth.get(), 'ABCDEFGHIJKLMNThis is the Ethernet Payload') # Test with vlantag self.eth.dst = '41:42:43:44:45:46' self.eth.src = '47:48:49:4a:4b:4c' self.eth.vlantag = 0x81004D4E self.eth.type = 0x4F50 s = 'This is the Ethernet Payload' self.eth.payload = s self.assertEqual(self.eth.get(), 'ABCDEFGHIJKL' + pack('!I', 0x81004D4E) + 'OPThis is the Ethernet Payload') # Test with a bad vlantag test = False self.eth.vlantag = 0x4D4E4D4E try: self.eth.get() except OutOfRangeError: test = True self.assert_(test) def test_get_ether_type(self): self.assertEqual(self.eth.get_ether_type(), 1) def test_get_type(self): self.assertEqual(self.eth.get_type(), 'ETHERNET_TYPE_DATA') def test_create_eth(self): self.assertNotEqual(create_eth(), None) suite = unittest.TestLoader().loadTestsFromTestCase(TestEthFunctions) class TestBufferFunctions(unittest.TestCase): def setUp(self): self.buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 8) self.assertEqual (self.buffers.get_ether_type (), 3) self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_BUFFER_ADD') self.assertEqual (self.buffers.get_buffer_nb (), 8) self.m = m def tearDown(self): pass def test_set_type(self): self.buffers.set_type ('ETHERNET_TYPE_BUFFER_RELEASED') self.assertEqual (self.buffers.get_ether_type (), 4) self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_BUFFER_RELEASED') try: self.buffers.set_type ('ETHERNET_TYPE_MME') except OutOfRangeError: self.assertEqual (self.buffers.get_ether_type (), 0) self.assertEqual (self.buffers.get_type (), 'ETHERNET_TYPE_NONE') def test_set_buffer_nb(self): self.buffers.set_buffer_nb (123) self.assertEqual (self.buffers.get_buffer_nb (), 123) try: self.buffers.set_buffer_nb (-123) except OutOfRangeError: self.assertEqual (self.buffers.get_buffer_nb (), 0) def test_set_buffer_realloc(self): self.assertEqual (self.buffers.get_buffer_realloc (), False) self.buffers.set_buffer_realloc (True) self.assertEqual (self.buffers.get_buffer_realloc (), True) self.buffers.set_buffer_realloc (False) def test_realloc(self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) realloc_buffer(True) buffer_id = 123 self.buffers.get_buffer_dict ()[buffer_id] = 'ETHERNET_TYPE_BUFFER_ADD' self.buffers.realloc (station_id = sta.get_station_id (), payload = pack ('I', buffer_id)) realloc_buffer(False) self.m.create_fcall('uninit_ether').send(sta) del sta def test_set_msdu_attr(self): id = 123 self.assert_ (self.buffers.get_buffer_dict ().has_key (id)) self.buffers.set_msdu_attr (pack ('I', id)) self.assert_ (not self.buffers.get_buffer_dict ().has_key (id)) def test_sendnrecv(self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 2) self.m.create_fcall('uninit_ether').send(sta) del sta def test_send(self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 10) buffers.send (self.m, sta) self.m.create_fcall('uninit_ether').send(sta) del sta def test_get(self): buffers = Buffer ('ETHERNET_TYPE_BUFFER_ADD', buffer_nb = 3) buf = buffers.get () res = pack ('I', buffers.get_buffer_nb ()) for i in range (0, 3): res += pack ('I', buffers.get_buffer_id () - 2 + i) self.assertEqual(buf, res) def test_create_buffer(self): self.assertNotEqual(create_buffer(), None) def test_alloc_buffer (self): sta = self.m.create_sta() self.m.create_fcall('init_ether').send(sta) alloc_buffer (self.m, sta) self.m.create_fcall('uninit_ether').send(sta) del sta def test_realloc_buffer(self): self.assertEqual (self.buffers.get_buffer_realloc (), False) realloc_buffer(True) self.assertEqual (self.buffers.get_buffer_realloc (), True) realloc_buffer(False) def test_get_buffer_dict(self): get_buffer_dict() suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBufferFunctions)) try: suite.addTest(doctest.DocTestSuite(buffer)) suite.addTest(doctest.DocTestSuite(create)) suite.addTest(doctest.DocTestSuite(eth)) suite.addTest(doctest.DocTestSuite(scapy)) except ValueError: print "has no tests" if __name__ == '__main__': testResult = unittest.TextTestRunner(verbosity=2).run(suite) # For nightly build errors sys.exit ((1, 0)[testResult.wasSuccessful ()])