summaryrefslogtreecommitdiff
path: root/cleopatre/tools/spidupd/spidupdate.py
diff options
context:
space:
mode:
Diffstat (limited to 'cleopatre/tools/spidupd/spidupdate.py')
-rwxr-xr-xcleopatre/tools/spidupd/spidupdate.py116
1 files changed, 59 insertions, 57 deletions
diff --git a/cleopatre/tools/spidupd/spidupdate.py b/cleopatre/tools/spidupd/spidupdate.py
index 823aa7c725..8414fbbd38 100755
--- a/cleopatre/tools/spidupd/spidupdate.py
+++ b/cleopatre/tools/spidupd/spidupdate.py
@@ -3,21 +3,21 @@
import getopt
import hashlib
import os
+import scapy.all as scapy
import sys
-from scapy.all import *
from stat import *
ETH_P_HPAV = 0x88e1
OUI_SPIDCOM = 0x0013d7
-class Mme(Packet):
+class Mme(scapy.Packet):
name = "MME"
fields_desc = [
- XByteField("mmv", 1),
- LEShortField("mmtype", 0),
- XBitField("nf_mi", 0, 4),
- XBitField("fn_mi", 0, 4),
- XByteField("fmsn", 0),
+ scapy.XByteField("mmv", 1),
+ scapy.LEShortField("mmtype", 0),
+ scapy.XBitField("nf_mi", 0, 4),
+ scapy.XBitField("fn_mi", 0, 4),
+ scapy.XByteField("fmsn", 0),
]
TYPE_REQ = 0x00
@@ -30,103 +30,103 @@ class Mme(Packet):
TYPE_VS_UPDATE_END = 0xA06C
-class VsUpdateStartReq(Packet):
+class VsUpdateStartReq(scapy.Packet):
name = "VS_UPDATE_START_REQ"
mmtype = Mme.TYPE_VS_UPDATE_START | Mme.TYPE_REQ
arch_enum = {0:"SPC300"}
type_enum = {0:"NORMAL"}
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- ByteField("protocol", 1),
- StrFixedLenField("version", "SPiDImage\0", 16),
- LEIntEnumField("arch", 0, arch_enum),
- LEIntEnumField("type", 0, type_enum),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.ByteField("protocol", 1),
+ scapy.StrFixedLenField("version", "SPiDImage\0", 16),
+ scapy.LEIntEnumField("arch", 0, arch_enum),
+ scapy.LEIntEnumField("type", 0, type_enum),
]
def mysummary(self):
return self.sprintf("%VsUpdateStartReq.mmtype%")
-class VsUpdateStartCnf(Packet):
+class VsUpdateStartCnf(scapy.Packet):
name = "VS_UPDATE_START_CNF"
mmtype = Mme.TYPE_VS_UPDATE_START | Mme.TYPE_CNF
result_enum = {0:"SUCCESS", 1:"ALREADY_EXISTS", 2:"BAD_ARCH", 3:"BAD_TYPE",
4:"BUSY", 5:"PROTOCOL"}
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- ByteEnumField("result", 0, result_enum),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.ByteEnumField("result", 0, result_enum),
]
def answers(self, other):
return isinstance(other, VsUpdateStartReq)
-class VsUpdateTransferReq(Packet):
+class VsUpdateTransferReq(scapy.Packet):
name = "VS_UPDATE_TRANSFER_REQ"
mmtype = Mme.TYPE_VS_UPDATE_TRANSFER | Mme.TYPE_REQ
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- LEIntField("block_nb", 0),
- LEFieldLenField("length", None, length_of="data", fmt="<I"),
- StrLenField("data", "", length_from=lambda pkt:pkt.length),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.LEIntField("block_nb", 0),
+ scapy.LEFieldLenField("length", None, length_of="data", fmt="<I"),
+ scapy.StrLenField("data", "", length_from=lambda pkt:pkt.length),
]
-class VsUpdateTransferCnf(Packet):
+class VsUpdateTransferCnf(scapy.Packet):
name = "VS_UPDATE_TRANSFER_CNF"
mmtype = Mme.TYPE_VS_UPDATE_TRANSFER | Mme.TYPE_CNF
result_enum = {0:"SUCCESS", 1:"FAILURE"}
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- ByteEnumField("result", 0, result_enum),
- LEIntField("next", 0),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.ByteEnumField("result", 0, result_enum),
+ scapy.LEIntField("next", 0),
]
def answers(self, other):
return isinstance(other, VsUpdateTransferReq)
-class VsUpdateEndReq(Packet):
+class VsUpdateEndReq(scapy.Packet):
name = "VS_UPDATE_END_REQ"
mmtype = Mme.TYPE_VS_UPDATE_END | Mme.TYPE_REQ
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- StrFixedLenField("md5sum", "", length=16),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.StrFixedLenField("md5sum", "", length=16),
]
-class VsUpdateEndCnf(Packet):
+class VsUpdateEndCnf(scapy.Packet):
name = "VS_UPDATE_END_CNF"
mmtype = Mme.TYPE_VS_UPDATE_END | Mme.TYPE_CNF
result_enum = {0:"SUCCESS", 1:"MD5", 2:"VERSION"}
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- ByteEnumField("result", 0, result_enum),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.ByteEnumField("result", 0, result_enum),
]
def answers(self, other):
return isinstance(other, VsUpdateEndReq)
-class VsUpdateEndInd(Packet):
+class VsUpdateEndInd(scapy.Packet):
name = "VS_UPDATE_END_IND"
mmtype = Mme.TYPE_VS_UPDATE_END | Mme.TYPE_IND
result_enum = {0:"SUCCESS", 1:"FLASH", 2:"TIMEOUT"}
fields_desc = [
- X3BytesField("oui", OUI_SPIDCOM),
- ByteEnumField("result", 0, result_enum),
+ scapy.X3BytesField("oui", OUI_SPIDCOM),
+ scapy.ByteEnumField("result", 0, result_enum),
]
-bind_layers(Ether, Mme, type=ETH_P_HPAV)
-bind_layers(Mme, VsUpdateStartReq, mmtype=VsUpdateStartReq.mmtype)
-bind_layers(Mme, VsUpdateStartCnf, mmtype=VsUpdateStartCnf.mmtype)
-bind_layers(Mme, VsUpdateTransferReq, mmtype=VsUpdateTransferReq.mmtype)
-bind_layers(Mme, VsUpdateTransferCnf, mmtype=VsUpdateTransferCnf.mmtype)
-bind_layers(Mme, VsUpdateEndReq, mmtype=VsUpdateEndReq.mmtype)
-bind_layers(Mme, VsUpdateEndCnf, mmtype=VsUpdateEndCnf.mmtype)
-bind_layers(Mme, VsUpdateEndInd, mmtype=VsUpdateEndInd.mmtype)
+scapy.bind_layers(scapy.Ether, Mme, type=ETH_P_HPAV)
+scapy.bind_layers(Mme, VsUpdateStartReq, mmtype=VsUpdateStartReq.mmtype)
+scapy.bind_layers(Mme, VsUpdateStartCnf, mmtype=VsUpdateStartCnf.mmtype)
+scapy.bind_layers(Mme, VsUpdateTransferReq, mmtype=VsUpdateTransferReq.mmtype)
+scapy.bind_layers(Mme, VsUpdateTransferCnf, mmtype=VsUpdateTransferCnf.mmtype)
+scapy.bind_layers(Mme, VsUpdateEndReq, mmtype=VsUpdateEndReq.mmtype)
+scapy.bind_layers(Mme, VsUpdateEndCnf, mmtype=VsUpdateEndCnf.mmtype)
+scapy.bind_layers(Mme, VsUpdateEndInd, mmtype=VsUpdateEndInd.mmtype)
class SpidUpdate:
@@ -149,7 +149,7 @@ class SpidUpdate:
IFACE_DEFAULT = "eth0"
- dst_addr = ETHER_BROADCAST
+ dst_addr = scapy.ETHER_BROADCAST
src_addr = None
iface = IFACE_DEFAULT
@@ -162,22 +162,24 @@ class SpidUpdate:
raise IOError("Failed to open '%s'" % (filename))
if dest is not None:
- self.dst_addr = mac2str(dest)
+ self.dst_addr = scapy.mac2str(dest)
if iface is None:
- self.iface = get_if_list()[0]
- elif iface not in get_if_list():
+ self.iface = scapy.get_if_list()[0]
+ elif iface not in scapy.get_if_list():
raise AttributeError("interface '%s' is unknown" % iface)
else:
self.iface = iface
- self.src_addr = get_if_hwaddr(self.iface)
+ self.src_addr = scapy.get_if_hwaddr(self.iface)
def send_receive(self, mme_packet, filter_class, **kargs):
- s = (Ether(dst=self.dst_addr, src=self.src_addr, type=ETH_P_HPAV) /
- Mme(mmtype=mme_packet.mmtype) / mme_packet)
+ s = (scapy.Ether(
+ dst=self.dst_addr, src=self.src_addr, type=ETH_P_HPAV) /
+ Mme(mmtype=mme_packet.mmtype) / mme_packet)
- r = srp1(s, iface=self.iface, verbose=0, type=ETH_P_HPAV, **kargs)
+ r = scapy.srp1(s, iface=self.iface, verbose=0, type=ETH_P_HPAV,
+ **kargs)
if r:
return r.getlayer(filter_class)
@@ -189,7 +191,7 @@ class SpidUpdate:
# send the START request and wait for the answer,
# maximum START_RETRY_NB times
- print "Starting connection with %s..." % str2mac(self.dst_addr)
+ print "Starting connection with %s..." % scapy.str2mac(self.dst_addr)
start_cnf = None
for i in range(SpidUpdate.START_RETRY_NB):
start_cnf = self.send_receive(
@@ -203,7 +205,7 @@ class SpidUpdate:
if start_cnf is None:
# no answer: exit
- print "No answer from station %s" % str2mac(self.dst_addr)
+ print "No answer from station %s" % scapy.str2mac(self.dst_addr)
return
if ((start_cnf.oui != OUI_SPIDCOM) or
@@ -269,9 +271,9 @@ class SpidUpdate:
# now wait for the final result
print "\nTransfer completed, writing to flash..."
- p = sniff(count=1, lfilter=lambda x: x.getlayer(VsUpdateEndInd),
- timeout=SpidUpdate.IND_TIMEOUT_S, iface=self.iface,
- filter="ether proto {0}".format(hex(ETH_P_HPAV)))
+ p = scapy.sniff(count=1, lfilter=lambda x: x.getlayer(VsUpdateEndInd),
+ timeout=SpidUpdate.IND_TIMEOUT_S, iface=self.iface,
+ filter="ether proto {0}".format(hex(ETH_P_HPAV)))
if len(p) == 0:
print "Upgrade result timeout"
@@ -374,8 +376,8 @@ class SpidTest:
break
print "Found start"
- ether = p[0].getlayer(Ether)
- sendp(Ether(dst=ETHER_BROADCAST, type=ETH_P_HPAV) /
+ ether = p[0].getlayer(scapy.Ether)
+ sendp(scapy.Ether(dst=ETHER_BROADCAST, type=ETH_P_HPAV) /
Mme(mmtype=(Mme.TYPE_VS_UPDATE_START | Mme.TYPE_CNF)) /
VsUpdateStartCnf(), iface=iface)
# if mme.mmtype == Mme.TYPE_VS_UPDATE_START | Mme.TYPE_REQ: