From 2e95e3a33bcb34aeec66551503c692c1cb80ab61 Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Mon, 9 Feb 2009 20:05:30 +0100 Subject: * all python: - changed tabs to spaces. --- host/mex/hub.py | 268 +++++++++++++++++++++++++------------------------- host/mex/msg.py | 96 +++++++++--------- host/mex/node.py | 188 +++++++++++++++++------------------ host/mex/test/test.py | 20 ++-- 4 files changed, 286 insertions(+), 286 deletions(-) (limited to 'host/mex') diff --git a/host/mex/hub.py b/host/mex/hub.py index 5ae88ba7..4dd796a4 100644 --- a/host/mex/hub.py +++ b/host/mex/hub.py @@ -30,152 +30,152 @@ from struct import pack, unpack, calcsize class Hub: def __init__ (self, addr = mex.DEFAULT_ADDR, min_clients = 0, log = None): - """Initialise a new Hub and bind to server address.""" - self.addr = addr - self.min_clients = min_clients - self.log = log - self.clients = { } - self.next_client_id = 1 - self.date = 0 - self.socket = socket.socket () - self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1) - self.socket.bind (self.addr) - self.socket.listen (5) + """Initialise a new Hub and bind to server address.""" + self.addr = addr + self.min_clients = min_clients + self.log = log + self.clients = { } + self.next_client_id = 1 + self.date = 0 + self.socket = socket.socket () + self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.socket.bind (self.addr) + self.socket.listen (5) def wait (self, cond = False): - """Wait until a cond is True and handle message exchanges.""" - while not cond and self.date != None: - self.select () + """Wait until a cond is True and handle message exchanges.""" + while not cond and self.date != None: + self.select () def select (self): - """Wait until the next event (connection or message) occurs and handle - it.""" - if not self.min_clients: - # Check idle and wanted dates. - idle = True - min_wait_date = None - for c in self.clients.itervalues (): - if not c.idle: - idle = False - break - if c.wait_date != None and (c.wait_date < min_wait_date or - min_wait_date == None): - min_wait_date = c.wait_date - # Abort if everyone waits forever. - if idle and min_wait_date == None: - self.date = None - return - # Send new date. - if idle: - self.date = min_wait_date - if self.log: - self.log ('[%d] date' % self.date) - date = Msg (mex.DATE) - date.push ('L', self.date) - for c in self.clients.itervalues (): - c.send (date) - # Prepare fdset and select. - infds = [ self.socket ] - if not self.min_clients: - infds += self.clients.values () - readyfds = select.select (infds, (), ()) [0] - for i in readyfds: - if i is self.socket: - self.accept () - else: - i.read () + """Wait until the next event (connection or message) occurs and handle + it.""" + if not self.min_clients: + # Check idle and wanted dates. + idle = True + min_wait_date = None + for c in self.clients.itervalues (): + if not c.idle: + idle = False + break + if c.wait_date != None and (c.wait_date < min_wait_date or + min_wait_date == None): + min_wait_date = c.wait_date + # Abort if everyone waits forever. + if idle and min_wait_date == None: + self.date = None + return + # Send new date. + if idle: + self.date = min_wait_date + if self.log: + self.log ('[%d] date' % self.date) + date = Msg (mex.DATE) + date.push ('L', self.date) + for c in self.clients.itervalues (): + c.send (date) + # Prepare fdset and select. + infds = [ self.socket ] + if not self.min_clients: + infds += self.clients.values () + readyfds = select.select (infds, (), ()) [0] + for i in readyfds: + if i is self.socket: + self.accept () + else: + i.read () def accept (self): - """Accept a new connection and create a client.""" - # Add client. - a = self.socket.accept () - if self.log: - self.log ('[%d] connect from %s' % (self.date, str (a[1]))) - c = Hub.Client (self, a[0], self.next_client_id) - self.next_client_id += 1 - assert self.next_client_id < 256 - self.clients[c.id] = c - if self.min_clients: - self.min_clients -= 1 - # Send first date. - date = Msg (mex.DATE) - date.push ('L', self.date) - c.send (date) + """Accept a new connection and create a client.""" + # Add client. + a = self.socket.accept () + if self.log: + self.log ('[%d] connect from %s' % (self.date, str (a[1]))) + c = Hub.Client (self, a[0], self.next_client_id) + self.next_client_id += 1 + assert self.next_client_id < 256 + self.clients[c.id] = c + if self.min_clients: + self.min_clients -= 1 + # Send first date. + date = Msg (mex.DATE) + date.push ('L', self.date) + c.send (date) def broadcast (self, msg, exclude = None): - """Send a message broadcasted, could exclude a client.""" - for c in self.clients.itervalues (): - if c is not exclude: - c.send (msg) + """Send a message broadcasted, could exclude a client.""" + for c in self.clients.itervalues (): + if c is not exclude: + c.send (msg) class Client: - def __init__ (self, hub, socket, id): - """Initialise a new client.""" - self.hub = hub - self.socket = socket - self.id = id - self.seq = 0 - self.idle = True - self.wait_date = None + def __init__ (self, hub, socket, id): + """Initialise a new client.""" + self.hub = hub + self.socket = socket + self.id = id + self.seq = 0 + self.idle = True + self.wait_date = None - def read (self): - """Read and dispatch a message from this client.""" - # Read message. - head = self.socket.recv (calcsize (mex.HEADER_FMT)) - if head == '': - self.socket.close () - del self.hub.clients[self.id] - return - size, seq = unpack (mex.HEADER_FMT, head) - data = self.socket.recv (size) - m = Msg (data) - if self.hub.log: - self.hub.log ('[%d] received from %d(%d): %s' % - (self.hub.date, self.id, seq, str (m))) - # Dispatch. - if m.mtype == mex.IDLE: - if seq == self.seq: - self.idle = True - if len (m): - self.wait_date, = m.pop ('L') - assert self.wait_date >= self.hub.date - else: - self.wait_date = None - elif m.mtype == mex.DATE: - date = Msg (mex.DATE) - date.push ('L', self.hub.date) - self.send (date) - elif m.mtype == mex.REQ: - m.pop ('B') - mr = Msg (mex.REQ) - mr.push ('B', self.id) - mr.push (m.pop ()) - self.hub.broadcast (mr, self) - elif m.mtype == mex.RSP: - to, = m.pop ('B') - mr = Msg (mex.RSP) - mr.push ('B', 0) - mr.push (m.pop ()) - self.hub.clients[to].send (mr) - else: - self.hub.broadcast (m, self) + def read (self): + """Read and dispatch a message from this client.""" + # Read message. + head = self.socket.recv (calcsize (mex.HEADER_FMT)) + if head == '': + self.socket.close () + del self.hub.clients[self.id] + return + size, seq = unpack (mex.HEADER_FMT, head) + data = self.socket.recv (size) + m = Msg (data) + if self.hub.log: + self.hub.log ('[%d] received from %d(%d): %s' % + (self.hub.date, self.id, seq, str (m))) + # Dispatch. + if m.mtype == mex.IDLE: + if seq == self.seq: + self.idle = True + if len (m): + self.wait_date, = m.pop ('L') + assert self.wait_date >= self.hub.date + else: + self.wait_date = None + elif m.mtype == mex.DATE: + date = Msg (mex.DATE) + date.push ('L', self.hub.date) + self.send (date) + elif m.mtype == mex.REQ: + m.pop ('B') + mr = Msg (mex.REQ) + mr.push ('B', self.id) + mr.push (m.pop ()) + self.hub.broadcast (mr, self) + elif m.mtype == mex.RSP: + to, = m.pop ('B') + mr = Msg (mex.RSP) + mr.push ('B', 0) + mr.push (m.pop ()) + self.hub.clients[to].send (mr) + else: + self.hub.broadcast (m, self) - def send (self, msg): - """Send a message to this client.""" - data = msg.data () - self.seq += 1 - if self.seq == 256: - self.seq = 0 - packet = pack (mex.HEADER_FMT, len (data), self.seq) + data - self.socket.sendall (packet) - self.idle = False - if self.hub.log: - self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date, - self.id, self.seq, str (msg))) + def send (self, msg): + """Send a message to this client.""" + data = msg.data () + self.seq += 1 + if self.seq == 256: + self.seq = 0 + packet = pack (mex.HEADER_FMT, len (data), self.seq) + data + self.socket.sendall (packet) + self.idle = False + if self.hub.log: + self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date, + self.id, self.seq, str (msg))) - def fileno (self): - """Return socket filedescriptor.""" - return self.socket.fileno () + def fileno (self): + """Return socket filedescriptor.""" + return self.socket.fileno () diff --git a/host/mex/msg.py b/host/mex/msg.py index 5399a3c9..0cd01c8c 100644 --- a/host/mex/msg.py +++ b/host/mex/msg.py @@ -84,64 +84,64 @@ class Msg: """ def __init__ (self, f): - """Initialise a new message, see class documentation for - signatures.""" - try: - f[0] - except TypeError: - # New empty message. - self.mtype = f - self.header = pack ('!B', self.mtype) - self.payload = '' - else: - # Read from a buffer. - self.header = f[0:1] - self.payload = f[1:] - self.mtype = unpack ('!B', self.header)[0] + """Initialise a new message, see class documentation for + signatures.""" + try: + f[0] + except TypeError: + # New empty message. + self.mtype = f + self.header = pack ('!B', self.mtype) + self.payload = '' + else: + # Read from a buffer. + self.header = f[0:1] + self.payload = f[1:] + self.mtype = unpack ('!B', self.header)[0] def data (self): - """Get the message data, ready to be sent.""" - return self.header + self.payload + """Get the message data, ready to be sent.""" + return self.header + self.payload def __str__ (self): - """Return an text representation.""" - payload = ' '.join (['%02x' % ord (i) for i in self.payload]) - return '' % (self.mtype, payload) + """Return an text representation.""" + payload = ' '.join (['%02x' % ord (i) for i in self.payload]) + return '' % (self.mtype, payload) def push (self, fmt, *args): - """ - Add data to the payload. - - msg.push (string) -> None. Add the given string to the payload. - - msg.push (fmt, values...) -> None. Add the given values to the - payload, using a struct.pack format string. - """ - if args: - self.payload += pack ('!' + fmt, *args) - else: - self.payload += fmt + """ + Add data to the payload. + + msg.push (string) -> None. Add the given string to the payload. + + msg.push (fmt, values...) -> None. Add the given values to the + payload, using a struct.pack format string. + """ + if args: + self.payload += pack ('!' + fmt, *args) + else: + self.payload += fmt def pop (self, fmt = None): - """ - Get data from the payload. - - msg.pop () -> payload. Get all the remaining payload. - - msg.pop (fmt) -> (values, ...). Get values extracted according to a - struct.unpack format string. - """ - if fmt: - nb = calcsize (fmt) - ex, self.payload = self.payload[0:nb], self.payload[nb:] - return unpack ('!' + fmt, ex) - else: - ex, self.payload = self.payload, '' - return ex + """ + Get data from the payload. + + msg.pop () -> payload. Get all the remaining payload. + + msg.pop (fmt) -> (values, ...). Get values extracted according to a + struct.unpack format string. + """ + if fmt: + nb = calcsize (fmt) + ex, self.payload = self.payload[0:nb], self.payload[nb:] + return unpack ('!' + fmt, ex) + else: + ex, self.payload = self.payload, '' + return ex def __len__ (self): - """Get payload remaining length.""" - return len (self.payload) + """Get payload remaining length.""" + return len (self.payload) def _test (): import doctest diff --git a/host/mex/node.py b/host/mex/node.py index 3307a4d7..2a6a5d22 100644 --- a/host/mex/node.py +++ b/host/mex/node.py @@ -30,128 +30,128 @@ from struct import pack, unpack, calcsize class Node: class closed: - """Raised on closed connection.""" - pass + """Raised on closed connection.""" + pass def __init__ (self, addr = mex.DEFAULT_ADDR): - """Create a new Node and connect it to given Hub address.""" - self.socket = socket.socket () - self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1) - self.socket.connect (addr) - self.date = 0 - self.seq = 0 - self.req = None - self.handlers = { } - self.register (mex.DATE, lambda msg: self.handle_DATE (msg)) - self.register (mex.REQ, lambda msg: self.handle_REQ (msg)) - # Synchronise. - rsp = None - while rsp == None or rsp.mtype != mex.DATE: - rsp = self.recv () - self.dispatch (rsp) + """Create a new Node and connect it to given Hub address.""" + self.socket = socket.socket () + self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.socket.connect (addr) + self.date = 0 + self.seq = 0 + self.req = None + self.handlers = { } + self.register (mex.DATE, lambda msg: self.handle_DATE (msg)) + self.register (mex.REQ, lambda msg: self.handle_REQ (msg)) + # Synchronise. + rsp = None + while rsp == None or rsp.mtype != mex.DATE: + rsp = self.recv () + self.dispatch (rsp) def wait (self, date = None): - """Wait forever or until a date is reached.""" - while date == None or self.date != date: - idle = Msg (mex.IDLE) - if date != None: - idle.push ('L', date) - self.send (idle) - msg = self.recv () - self.dispatch (msg) + """Wait forever or until a date is reached.""" + while date == None or self.date != date: + idle = Msg (mex.IDLE) + if date != None: + idle.push ('L', date) + self.send (idle) + msg = self.recv () + self.dispatch (msg) def wait_async (self, date = None): - """Asynchronous version of wait. This should not be called again - until sync return True.""" - self.async_waited = date - synced = self.sync () - assert not synced + """Asynchronous version of wait. This should not be called again + until sync return True.""" + self.async_waited = date + synced = self.sync () + assert not synced def sync (self): - """To be called after read or wait_async. Return True if the waited - date is reached or signal the Hub our waiting status.""" - if self.date == self.async_waited: - return True - else: - idle = Msg (mex.IDLE) - if self.async_waited != None: - idle.push ('L', self.async_waited) - self.send (idle) + """To be called after read or wait_async. Return True if the waited + date is reached or signal the Hub our waiting status.""" + if self.date == self.async_waited: + return True + else: + idle = Msg (mex.IDLE) + if self.async_waited != None: + idle.push ('L', self.async_waited) + self.send (idle) def read (self): - """Used for asynchronous operations. Handle incoming data. The sync - method should be called after this one returns.""" - msg = self.recv () - self.dispatch (msg) + """Used for asynchronous operations. Handle incoming data. The sync + method should be called after this one returns.""" + msg = self.recv () + self.dispatch (msg) def send (self, msg): - """Send a message.""" - data = msg.data () - packet = pack (mex.HEADER_FMT, len (data), self.seq) + data - self.socket.sendall (packet) + """Send a message.""" + data = msg.data () + packet = pack (mex.HEADER_FMT, len (data), self.seq) + data + self.socket.sendall (packet) def request (self, msg): - """Send a request and return response.""" - # Send request. - req = Msg (mex.REQ) - req.push ('B', 0) - req.push (msg.data ()) - self.send (req) - # Wait for response. - rsp = self.recv () - while rsp.mtype != mex.RSP: - self.dispatch (rsp) - rsp = self.recv () - # Discard reqid. - rsp.pop ('B') - return Msg (rsp.pop ()) + """Send a request and return response.""" + # Send request. + req = Msg (mex.REQ) + req.push ('B', 0) + req.push (msg.data ()) + self.send (req) + # Wait for response. + rsp = self.recv () + while rsp.mtype != mex.RSP: + self.dispatch (rsp) + rsp = self.recv () + # Discard reqid. + rsp.pop ('B') + return Msg (rsp.pop ()) def response (self, msg): - """Send a response to the currently serviced request.""" - assert self.req != None - rsp = Msg (mex.RSP) - rsp.push ('B', self.req) - self.req = None - rsp.push (msg.data ()) - self.send (rsp) + """Send a response to the currently serviced request.""" + assert self.req != None + rsp = Msg (mex.RSP) + rsp.push ('B', self.req) + self.req = None + rsp.push (msg.data ()) + self.send (rsp) def register (self, mtype, handler): - """Register an handler for the given message type.""" - assert mtype not in self.handlers - self.handlers[mtype] = handler + """Register an handler for the given message type.""" + assert mtype not in self.handlers + self.handlers[mtype] = handler def close (self): - """Close connection with the Hub.""" - self.socket.close () - self.socket = None + """Close connection with the Hub.""" + self.socket.close () + self.socket = None def fileno (self): - """Return socket fileno () for asynchronous operations.""" - return self.socket.fileno () + """Return socket fileno () for asynchronous operations.""" + return self.socket.fileno () def recv (self): - """Receive one message.""" - head = self.socket.recv (calcsize (mex.HEADER_FMT)) - if head == '': - self.close () - raise Node.closed - size, self.seq = unpack (mex.HEADER_FMT, head) - data = self.socket.recv (size) - return Msg (data) + """Receive one message.""" + head = self.socket.recv (calcsize (mex.HEADER_FMT)) + if head == '': + self.close () + raise Node.closed + size, self.seq = unpack (mex.HEADER_FMT, head) + data = self.socket.recv (size) + return Msg (data) def dispatch (self, msg): - """Call the right handler for the given message.""" - if msg.mtype in self.handlers: - self.handlers[msg.mtype] (msg) + """Call the right handler for the given message.""" + if msg.mtype in self.handlers: + self.handlers[msg.mtype] (msg) def handle_DATE (self, msg): - """Handle an incoming DATE.""" - self.date, = msg.pop ('L') + """Handle an incoming DATE.""" + self.date, = msg.pop ('L') def handle_REQ (self, msg): - """Handle an incoming REQ.""" - self.req, = msg.pop ('B') - dec = Msg (msg.pop ()) - self.dispatch (dec) - self.req = None + """Handle an incoming REQ.""" + self.req, = msg.pop ('B') + dec = Msg (msg.pop ()) + self.dispatch (dec) + self.req = None diff --git a/host/mex/test/test.py b/host/mex/test/test.py index c4382c2c..4be24f49 100644 --- a/host/mex/test/test.py +++ b/host/mex/test/test.py @@ -15,12 +15,12 @@ h = Hub (min_clients = 2, log = log) def c1 (): n = Node () def a (msg): - print 'oucouc' - nb, = msg.pop ('B') - nb += 1 - m = Msg (msg.mtype) - m.push ('B', nb) - n.response (m) + print 'oucouc' + nb, = msg.pop ('B') + nb += 1 + m = Msg (msg.mtype) + m.push ('B', nb) + n.response (m) n.register (0x82, a) m = Msg (0x81) n.send (m) @@ -31,7 +31,7 @@ f1 = Forked (c1) def c2 (): n = Node () def a (msg): - print 'coucou' + print 'coucou' n.register (0x81, a) m = Msg (0x82) m.push ('B', 42) @@ -40,9 +40,9 @@ def c2 (): assert r.pop ('B') == (43,) n.wait_async (42) while not n.sync (): - fds = select.select ((n, ), (), ())[0] - for i in fds: - i.read () + fds = select.select ((n, ), (), ())[0] + for i in fds: + i.read () n.wait () f2 = Forked (c2) -- cgit v1.2.3