#!/usr/bin/python ############################################################################# # Copyright (C) 2011 Spidcom ############################################################################# import os import sys import unittest sys.path.append ('../../../maximus/python/tools/csi/') sys.path.append ('../../../maximus/python/') sys.path.append ('../../../maximus/python/obj') sys.path.append ('../../../maximus/python/lib/cesar') sys.path.append ('../../../../common/lib') sys.path.append ('lib') from csicore import * from maximus.mme.mme import MME from maximus.simu.rx import recv from csifilter import * from csitime import sec_to_tck import scammer from scapy.all import Ether, hexdump args = sys.argv + ['--station-executable', 'obj/test_scenario.elf'] MAC_DEFAULT = "00:00:00:00:00:01" MAC_BCAST = "ff:ff:ff:ff:ff:ff" def send_check_datas (csi, avln, packet_list = None): """Create some Ethernet packets to be sent.""" nb_stas = avln.get_nb_sta () if not packet_list: packets = csi.avln_create_traffic (avln) else: packets = packet_list csi.process_data_send_traffic (packets) csi.process_wait_transmission (nb_stas * 10) csi.process_verify_transmission () result = csi.process_end_get_result () csi.transmission_result_reset () return result def send_data_from_stasAvln1_to_stasAVLN2 (csi, avln1, avln2): """Send data from all the station of AVLN1 to the stations of AVLN2. Send data from all the station of AVLN2 to the stations of AVLN1. """ packets = list () for i in range (avln1.get_nb_sta ()): for j in range (avln2.get_nb_sta ()): # Create a packet from station of the AVLN 1 to AVLN 2. packets.append (csiPacket ( 60, avln1, avln1.get_sta (i), mac_addr_dest = avln2.get_sta (j).get_mac_addr ())) # Create a packet from station of the AVLN 2 to AVLN 1. packets.append (csiPacket ( 60, avln2, avln2.get_sta (j), mac_addr_dest = avln1.get_sta (i).get_mac_addr ())) csi.process_data_send_traffic (packets) csi.process_wait_transmission ( avln1.get_nb_sta () + avln2.get_nb_sta () * 10) csi.process_verify_transmission () result = csi.process_end_get_result () csi.transmission_result_reset () return result def send_mme (maximus, station, scammer_mme): """Send a scammer MME to maximus.""" mmemax = MME () mmemax.set_msdu_attr (str (scammer_mme)) mmemax.send (maximus, station.get_sta_cesar().get()) def sendnrecv_mme (maximus, station, scammer_mme): """Send a scammer MME to maximus.""" mmemax = MME () mmemax.set_msdu_attr (str (scammer_mme)) rsp = mmemax.sendnrecv (maximus, station.get_sta_cesar().get(), timeout = sec_to_tck (1), filter = lambda msdu: msdu.get_type () is 'ETHERNET_TYPE_MME' \ and Ether (msdu.get ()).mmtype == \ scammer_mme.mmtype + 1, count = 1) return rsp def sniffer (maximus, sta, mme_tx = False, mme_rx = False, beacon_tx = False, beacon_rx = False): """Activate/deactivate the cesar sniffer on the station.""" sniffer = Ether (dst = sta.get_mac_addr (), src = MAC_DEFAULT) \ / scammer.MME () \ / scammer.VS_SNIFFER_REQ (mme_tx = mme_tx, mme_rx = mme_rx, beacon_tx = beacon_tx, beacon_rx = beacon_rx) mmemax = MME () mmemax.set_msdu_attr (str (sniffer)) rsp = mmemax.sendnrecv (maximus, sta.get_sta_cesar().get(), count = 1, filter = frame_filter_sniffer, timeout = sec_to_tck (1)) m = Ether (rsp[0].get ()) if m.mmtype == scammer.HPAV_MMTYPES['VS_SNIFFER_CNF']: return m.result == scammer.HPAV_RESULT['success'] return False class HPAVProtocol: """Class to test a protocol exchange.""" def __init__ (self): self.protocol_tx = list () self.protocol_rx = list () def __add_mme (self, dst, src, sublayer, repeat, tx): """Add the MME to the correct list.""" m = Ether (dst = dst, src = src) \ / scammer.MME () \ / scammer.VS_SNIFFER_IND (sniff_type = 0, direction = 0) \ / Ether (dst = dst, src = src) \ / scammer.MME () \ / sublayer if tx: self.protocol_tx.append ([m, repeat]) else: self.protocol_rx.append ([m, repeat]) def add_mme_tx (self, dst, src, sublayer, repeat_times = 1): """Add a MME to check the protocol. - dst = The destination mac address. - src = The source mac address. - mmtype = The MME MMtype - sublayer = The sublayer MME filled. This function only handles Sniffed MME from Cesar Sniffer. """ self.__add_mme (dst, src, sublayer, repeat_times, True) def add_mme_rx (self, dst, src, sublayer, repeat_times = 1): """Add a MME to check the protocol. - dst = The destination mac address. - src = The source mac address. - mmtype = The MME MMtype - sublayer = The sublayer MME filled. This function only handles Sniffed MME from Cesar Sniffer. """ self.__add_mme (dst, src, sublayer, repeat_times, False) def get_header (self, mme): # Create an Packet with the Ethernet and HPAV header. mmel0 = mme.copy () mmel1 = mmel0.getlayer (1) mmel0.remove_payload () mmel1.remove_payload () h = mmel0 / mmel1 h.mmtype = mme.mmtype return h def error (self, m1, m2): """Print an error m1 expected MME m2 MME got """ print "Protocol Error." print "Expected MME" m1.show () print "I Got MME" m2.show () def __check_security (self, m1, m2): """Check security points.""" peks_ok = m1.peks == m2.peks pid_ok = m1.pid == m2.pid pmn_ok = m1.pmn == m2.pmn if not peks_ok: print "PEKS wrong, expected %d got %d" \ % (m1.peks, m2.peks) if not pid_ok: print "PID wrong, expected %d got %d" \ % (m1.pid, m2.pid) if not pmn_ok: print "PMN wrong, expected %d git %d" \ % (m1.pmn. m2.pmn) return peks_ok and pid_ok and pmn_ok def __protocol_exchange_check_mmes (self, m1, m2): """Check the MME itself.""" m1h = self.get_header (m1) m2h = self.get_header (m2) header_ok = m1h == m2h # Compare the two ones. if not header_ok: print "Header wrong" print "Expected" hexdump (m1h) print "Got" hexdump (m2h) if m1.mmtype == scammer.HPAV_MMTYPES['CM_ENCRYPTED_PAYLOAD_IND']: return self.__check_security (m1, m2) elif m1.mmtype == scammer.HPAV_MMTYPES['CM_GET_KEY_REQ']: security_ok = self.__check_security (m1, m2) result_ok = m1.result == m2.result nid_ok = m1.nid == m2.nid eks_ok = m1.eks == m2.eks if not result_ok: print "Result is wrong." if not nid_ok: print "NID is wrong." if not eks_ok: print "EKS is wrong." return security_ok and result_ok and nid_ok and eks_ok elif m1.mmtype == scammer.HPAV_MMTYPES['CM_GET_KEY_CNF']: security_ok = self.__check_security (m1, m2) nid_ok = m1.nid == m2.nid eks_ok = m1.eks == m2.eks if not nid_ok: print "NID is wrong." if not eks_ok: print "EKS is wrong." return security_ok and nid_ok and eks_ok else: return str (m1) == str (m2) def __protocol_exchange_check_lists (self, protocollist, mmelist): """Check the MME list with the one defined.""" # At this point we have two list with sniffed MME Scammer types. index_protocol = 0 index_mmelist = 0 repeat = 0 # Count the number of MMEs expected. mme_nb = 0 for i in protocollist: mme_nb += i[1] i = 0 while index_protocol < len (protocollist) \ and index_mmelist < len (mmelist): [mme, repeat_new] = protocollist[index_protocol] if repeat == 0: repeat = repeat_new m1 = scammer.get_sniffed_mme (mme) m2 = scammer.get_sniffed_mme (mmelist[index_mmelist]) if m1.mmtype == m2.mmtype: if not self.__protocol_exchange_check_mmes (m1, m2): self.error (m1, m2) return False repeat = repeat - 1 if repeat == 0: # All repetition of the MME have been received. index_protocol += 1 index_mmelist += 1 else: repeat = 0 index_protocol += 1 if index_protocol == len (protocollist): return True else: print "Expected missing MMEs" while index_protocol < len (protocollist): [mme, repeat_new] = protocollist[index_protocol] m1 = scammer.get_sniffed_mme (mme) print scammer.HPAV_MMTYPES_REVERSE[m1.mmtype] index_protocol += 1 return False def protocol_exchange_check (self, mme_list): """Verify the protocol exchange. - mme_list: The sniffer MME list from cesar sniffer. return: True if the protocol is correct, false otherwise. """ # For now only process VS_SNIFFER.INF with MME embedded. mme_embed_list_tx = list () mme_embed_list_rx = list () ok_tx = False ok_rx = False for i in mme_list: if len (i.get ()) >= 14: m = Ether (i.get ()) if m.mmtype == scammer.HPAV_MMTYPES ['VS_SNIFFER_IND'] \ and m.sniff_type == 0 \ and scammer.get_sniffed_mme(m).mmtype \ != scammer.HPAV_MMTYPES ['DRV_STA_STATUS_IND']: if hasattr (m, 'load'): del (m.load) if m.direction == 0: mme_embed_list_tx.append (m) else: mme_embed_list_rx.append (m) ok_tx = self.__protocol_exchange_check_lists (self.protocol_tx, mme_embed_list_tx) if ok_tx: ok_rx = self.__protocol_exchange_check_lists (self.protocol_rx, mme_embed_list_rx) return ok_tx and ok_rx # Do not add executable code in the script.