summaryrefslogtreecommitdiff
path: root/cesar/test_general/station/scenario/py/scenario_init.py
blob: 838eb7522f4c67788fe6b06e6447e6691964e63f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/python

#############################################################################
#  Copyright (C) 2011 Spidcom
#############################################################################

import os
import sys
import unittest
sys.path.append ('../../../maximus/python/tools/csi/')
sys.path.append ('../../../maximus/python/')
sys.path.append ('../../../maximus/python/obj')
sys.path.append ('../../../maximus/python/lib/cesar')
sys.path.append ('../../../../common/lib')
sys.path.append ('lib')
from csicore import *
from maximus.mme.mme import MME
from maximus.simu.rx import recv
from csifilter import *
from csitime import sec_to_tck
import scammer
from scapy.all import Ether, hexdump

args = sys.argv + ['--station-executable', 'obj/test_scenario.elf']

MAC_DEFAULT = "00:00:00:00:00:01"
MAC_BCAST = "ff:ff:ff:ff:ff:ff"

def send_check_datas (csi, avln, packet_list = None):
    """Create some Ethernet packets to be sent."""
    nb_stas = avln.get_nb_sta ()
    if not packet_list:
        packets = csi.avln_create_traffic (avln)
    else:
        packets = packet_list
    csi.process_data_send_traffic (packets)
    csi.process_wait_transmission (nb_stas * 10)
    csi.process_verify_transmission ()
    result = csi.process_end_get_result ()
    csi.transmission_result_reset ()
    return result

def send_data_from_stasAvln1_to_stasAVLN2 (csi, avln1, avln2):
    """Send data from all the station of AVLN1 to the stations of AVLN2.
    Send data from all the station of AVLN2 to the stations of AVLN1.
    """
    packets = list ()
    for i in range (avln1.get_nb_sta ()):
        for j in range (avln2.get_nb_sta ()):
            # Create a packet from station of the AVLN 1 to AVLN 2.
            packets.append (csiPacket (
                60, avln1, avln1.get_sta (i),
                mac_addr_dest = avln2.get_sta (j).get_mac_addr ()))
            # Create a packet from station of the AVLN 2 to AVLN 1.
            packets.append (csiPacket (
                60, avln2, avln2.get_sta (j),
                mac_addr_dest = avln1.get_sta (i).get_mac_addr ()))
    csi.process_data_send_traffic (packets)
    csi.process_wait_transmission (
            avln1.get_nb_sta () + avln2.get_nb_sta () * 10)
    csi.process_verify_transmission ()
    result = csi.process_end_get_result ()
    csi.transmission_result_reset ()
    return result

def send_mme (maximus, station, scammer_mme):
    """Send a scammer MME to maximus."""
    mmemax = MME ()
    mmemax.set_msdu_attr (str (scammer_mme))
    mmemax.send (maximus, station.get_sta_cesar().get())

def sendnrecv_mme (maximus, station, scammer_mme):
    """Send a scammer MME to maximus."""
    mmemax = MME ()
    mmemax.set_msdu_attr (str (scammer_mme))
    rsp = mmemax.sendnrecv (maximus, station.get_sta_cesar().get(),
            timeout = sec_to_tck (1),
            filter = lambda msdu: msdu.get_type () is 'ETHERNET_TYPE_MME' \
                    and Ether (msdu.get ()).mmtype  == \
                    scammer_mme.mmtype + 1,
            count = 1)
    return rsp

def sniffer (maximus, sta, mme_tx = False, mme_rx = False, beacon_tx = False,
        beacon_rx = False):
    """Activate/deactivate the cesar sniffer on the station."""
    sniffer = Ether (dst = sta.get_mac_addr (), src = MAC_DEFAULT) \
        / scammer.MME () \
        / scammer.VS_SNIFFER_REQ (mme_tx = mme_tx, mme_rx = mme_rx,
                beacon_tx = beacon_tx, beacon_rx = beacon_rx)
    mmemax = MME ()
    mmemax.set_msdu_attr (str (sniffer))
    rsp = mmemax.sendnrecv (maximus, sta.get_sta_cesar().get(),
            count = 1, filter = frame_filter_sniffer,
            timeout = sec_to_tck (1))
    m = Ether (rsp[0].get ())
    if m.mmtype == scammer.HPAV_MMTYPES['VS_SNIFFER_CNF']:
        return m.result == scammer.HPAV_RESULT['success']
    return False

class HPAVProtocol:
    """Class to test a protocol exchange."""

    def __init__ (self):
        self.protocol_tx = list ()
        self.protocol_rx = list ()

    def __add_mme (self, dst, src, sublayer, repeat, tx):
        """Add the MME to the correct list."""
        m = Ether (dst = dst, src = src) \
            / scammer.MME () \
            / scammer.VS_SNIFFER_IND (sniff_type = 0, direction = 0) \
            / Ether (dst = dst, src = src) \
            / scammer.MME () \
            / sublayer
        if tx:
            self.protocol_tx.append ([m, repeat])
        else:
            self.protocol_rx.append ([m, repeat])

    def add_mme_tx (self, dst, src, sublayer, repeat_times = 1):
        """Add a MME to check the protocol.
        - dst = The destination mac address.
        - src = The source mac address.
        - mmtype = The MME MMtype
        - sublayer = The sublayer MME filled.
        This function only handles Sniffed MME from Cesar Sniffer.
        """
        self.__add_mme (dst, src, sublayer, repeat_times, True)

    def add_mme_rx (self, dst, src, sublayer, repeat_times = 1):
        """Add a MME to check the protocol.
        - dst = The destination mac address.
        - src = The source mac address.
        - mmtype = The MME MMtype
        - sublayer = The sublayer MME filled.
        This function only handles Sniffed MME from Cesar Sniffer.
        """
        self.__add_mme (dst, src, sublayer, repeat_times, False)

    def get_header (self, mme):
        # Create an Packet with the Ethernet and HPAV header.
        mmel0 = mme.copy ()
        mmel1 = mmel0.getlayer (1)
        mmel0.remove_payload ()
        mmel1.remove_payload ()
        h = mmel0 / mmel1
        h.mmtype = mme.mmtype
        return h

    def error (self, m1, m2):
        """Print an error
        m1 expected MME
        m2 MME got
        """
        print "Protocol Error."
        print "Expected MME"
        m1.show ()
        print "I Got MME"
        m2.show ()

    def __check_security (self, m1, m2):
        """Check security points."""
        peks_ok = m1.peks == m2.peks
        pid_ok = m1.pid == m2.pid
        pmn_ok = m1.pmn == m2.pmn
        if not peks_ok:
            print "PEKS wrong, expected %d got %d" \
                    % (m1.peks, m2.peks)
        if not pid_ok:
            print "PID wrong, expected %d got %d" \
                    % (m1.pid, m2.pid)
        if not pmn_ok:
            print "PMN wrong, expected %d git %d" \
                    % (m1.pmn. m2.pmn)
        return peks_ok and pid_ok and pmn_ok

    def __protocol_exchange_check_mmes (self, m1, m2):
        """Check the MME itself."""
        m1h = self.get_header (m1)
        m2h = self.get_header (m2)
        header_ok = m1h == m2h
        # Compare the two ones.
        if not header_ok:
            print "Header wrong"
            print "Expected"
            hexdump (m1h)
            print "Got"
            hexdump (m2h)
        if m1.mmtype == scammer.HPAV_MMTYPES['CM_ENCRYPTED_PAYLOAD_IND']:
            return self.__check_security (m1, m2)
        elif m1.mmtype == scammer.HPAV_MMTYPES['CM_GET_KEY_REQ']:
            security_ok = self.__check_security (m1, m2)
            result_ok = m1.result == m2.result
            nid_ok = m1.nid == m2.nid
            eks_ok = m1.eks == m2.eks
            if not result_ok:
                print "Result is wrong."
            if not nid_ok:
                print "NID is wrong."
            if not eks_ok:
                print "EKS is wrong."
            return security_ok and result_ok and nid_ok and eks_ok
        elif m1.mmtype == scammer.HPAV_MMTYPES['CM_GET_KEY_CNF']:
            security_ok = self.__check_security (m1, m2)
            nid_ok = m1.nid == m2.nid
            eks_ok = m1.eks == m2.eks
            if not nid_ok:
                print "NID is wrong."
            if not eks_ok:
                print "EKS is wrong."
            return security_ok and nid_ok and eks_ok
        else:
            return str (m1) == str (m2)

    def __protocol_exchange_check_lists (self, protocollist, mmelist):
        """Check the MME list with the one defined."""
        # At this point we have two list with sniffed MME Scammer types.
        index_protocol = 0
        index_mmelist = 0
        repeat = 0
        # Count the number of MMEs expected.
        mme_nb = 0
        for i in protocollist:
            mme_nb += i[1]
        i = 0
        while index_protocol < len (protocollist) \
            and index_mmelist < len (mmelist):
            [mme, repeat_new] = protocollist[index_protocol]
            if repeat == 0:
                repeat = repeat_new
            m1 = scammer.get_sniffed_mme (mme)
            m2 = scammer.get_sniffed_mme (mmelist[index_mmelist])
            if m1.mmtype == m2.mmtype:
                if not self.__protocol_exchange_check_mmes (m1, m2):
                    self.error (m1, m2)
                    return False
                repeat = repeat - 1
                if repeat == 0:
                    # All repetition of the MME have been received.
                    index_protocol += 1
                index_mmelist += 1
            else:
                repeat = 0
                index_protocol += 1
        if index_protocol == len (protocollist):
            return True
        else:
            print "Expected missing MMEs"
            while index_protocol < len (protocollist):
                [mme, repeat_new] = protocollist[index_protocol]
                m1 = scammer.get_sniffed_mme (mme)
                print scammer.HPAV_MMTYPES_REVERSE[m1.mmtype]
                index_protocol += 1
            return False

    def protocol_exchange_check (self, mme_list):
        """Verify the protocol exchange.
        - mme_list: The sniffer MME list from cesar sniffer.
        return:  True if the protocol is correct, false otherwise.
        """
        # For now only process VS_SNIFFER.INF with MME embedded.
        mme_embed_list_tx = list ()
        mme_embed_list_rx = list ()
        ok_tx = False
        ok_rx = False
        for i in mme_list:
            if len (i.get ()) >= 14:
                m = Ether (i.get ())
                if m.mmtype == scammer.HPAV_MMTYPES ['VS_SNIFFER_IND'] \
                    and m.sniff_type == 0 \
                    and scammer.get_sniffed_mme(m).mmtype \
                        != scammer.HPAV_MMTYPES ['DRV_STA_STATUS_IND']:
                    if hasattr (m, 'load'):
                        del (m.load)
                    if m.direction == 0:
                        mme_embed_list_tx.append (m)
                    else:
                        mme_embed_list_rx.append (m)
        ok_tx = self.__protocol_exchange_check_lists (self.protocol_tx,
                mme_embed_list_tx)
        if ok_tx:
            ok_rx = self.__protocol_exchange_check_lists (self.protocol_rx,
                    mme_embed_list_rx)
        return ok_tx and ok_rx

# Do not add executable code in the script.