#!/usr/bin/python import getopt import hashlib import os import scapy.all as scapy import stat import sys ETH_P_HPAV = 0x88e1 OUI_SPIDCOM = 0x0013d7 class Mme(scapy.Packet): name = "MME" fields_desc = [ scapy.XByteField("mmv", 1), scapy.LEShortField("mmtype", 0), scapy.XBitField("nf_mi", 0, 4), scapy.XBitField("fn_mi", 0, 4), scapy.XByteField("fmsn", 0), ] TYPE_REQ = 0x00 TYPE_CNF = 0x01 TYPE_IND = 0x02 TYPE_RSP = 0x03 TYPE_VS_UPDATE_START = 0xA064 TYPE_VS_UPDATE_TRANSFER = 0xA068 TYPE_VS_UPDATE_END = 0xA06C class VsUpdateStartReq(scapy.Packet): name = "VS_UPDATE_START_REQ" mmtype = Mme.TYPE_VS_UPDATE_START | Mme.TYPE_REQ arch_enum = {0:"SPC300"} type_enum = {0:"NORMAL"} fields_desc = [ scapy.X3BytesField("oui", OUI_SPIDCOM), scapy.ByteField("protocol", 1), scapy.StrFixedLenField("version", "SPiDImage\0", 16), scapy.LEIntEnumField("arch", 0, arch_enum), scapy.LEIntEnumField("type", 0, type_enum), ] def mysummary(self): return self.sprintf("%VsUpdateStartReq.mmtype%") class VsUpdateStartCnf(scapy.Packet): name = "VS_UPDATE_START_CNF" mmtype = Mme.TYPE_VS_UPDATE_START | Mme.TYPE_CNF result_enum = {0:"SUCCESS", 1:"ALREADY_EXISTS", 2:"BAD_ARCH", 3:"BAD_TYPE", 4:"BUSY", 5:"PROTOCOL"} fields_desc = [ scapy.X3BytesField("oui", OUI_SPIDCOM), scapy.ByteEnumField("result", 0, result_enum), ] def answers(self, other): return isinstance(other, VsUpdateStartReq) class VsUpdateTransferReq(scapy.Packet): name = "VS_UPDATE_TRANSFER_REQ" mmtype = Mme.TYPE_VS_UPDATE_TRANSFER | Mme.TYPE_REQ fields_desc = [ scapy.X3BytesField("oui", OUI_SPIDCOM), scapy.LEIntField("block_nb", 0), scapy.LEFieldLenField("length", None, length_of="data", fmt="= SpidUpdate.TRANSFER_RETRY_NB: print "\nToo many retries... give up" return # compute MD5 of image and send the End message md5 = hashlib.md5() md5.update(data[:]) retry_nb = 0 end_cnf = None while ((end_cnf is None) and (retry_nb < SpidUpdate.END_RETRY_NB)): end_cnf = self.send_receive(VsUpdateEndReq(md5sum=md5.digest()), VsUpdateEndCnf, timeout=SpidUpdate.END_RETRY_DELAY_S) if end_cnf is None: retry_nb += 1 if end_cnf is None: print "No end confirm... failed" return if VsUpdateEndCnf.result_enum[end_cnf.result] != "SUCCESS": print ("Upgrade failed: %s" % VsUpdateEndCnf.result_enum[end_cnf.result]) return # now wait for the final result print "\nTransfer completed, writing to flash..." p = scapy.sniff(count=1, lfilter=lambda x: x.getlayer(VsUpdateEndInd), timeout=SpidUpdate.IND_TIMEOUT_S, iface=self.iface, filter="ether proto {0}".format(hex(ETH_P_HPAV))) if len(p) == 0: print "Upgrade result timeout" return end_ind = p[0].getlayer(VsUpdateEndInd) if VsUpdateEndInd.result_enum[end_ind.result] != "SUCCESS": print ("Upgrade failed: %s" % VsUpdateEndInd.result_enum[end_ind.result]) return print "Upgrade has been successfull !" def main(): """ spidupdate [options] filename with options: -d | --dest: address of device to upgrade (default: broadcast) -i | --iface: network interface name (default: eth0) -h | --help: this help """ # parse command line options try: opts, args = getopt.getopt(sys.argv[1:], "hd:i:", ["help", "dest=", "iface="]) except getopt.error, msg: print msg print "for help use --help" sys.exit(1) # process options iface = None dest = None for o, a in opts: if o in ("-h", "--help"): print main.__doc__ sys.exit(0) if o in ("-d", "--dest"): dest = a if o in ("-i", "--iface"): iface = a # process arguments if len(args) == 0: print main.__doc__ sys.exit(1) try: update = SpidUpdate(args[0], iface=iface, dest=dest) update.start() except (AttributeError, IOError), msg: print "%s" % msg sys.exit(2) if __name__ == "__main__": main()