summaryrefslogtreecommitdiffhomepage
path: root/host/mex
diff options
context:
space:
mode:
Diffstat (limited to 'host/mex')
-rw-r--r--host/mex/hub.py268
-rw-r--r--host/mex/msg.py96
-rw-r--r--host/mex/node.py188
-rw-r--r--host/mex/test/test.py20
4 files changed, 286 insertions, 286 deletions
diff --git a/host/mex/hub.py b/host/mex/hub.py
index 5ae88ba7..4dd796a4 100644
--- a/host/mex/hub.py
+++ b/host/mex/hub.py
@@ -30,152 +30,152 @@ from struct import pack, unpack, calcsize
class Hub:
def __init__ (self, addr = mex.DEFAULT_ADDR, min_clients = 0, log = None):
- """Initialise a new Hub and bind to server address."""
- self.addr = addr
- self.min_clients = min_clients
- self.log = log
- self.clients = { }
- self.next_client_id = 1
- self.date = 0
- self.socket = socket.socket ()
- self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1)
- self.socket.bind (self.addr)
- self.socket.listen (5)
+ """Initialise a new Hub and bind to server address."""
+ self.addr = addr
+ self.min_clients = min_clients
+ self.log = log
+ self.clients = { }
+ self.next_client_id = 1
+ self.date = 0
+ self.socket = socket.socket ()
+ self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1)
+ self.socket.bind (self.addr)
+ self.socket.listen (5)
def wait (self, cond = False):
- """Wait until a cond is True and handle message exchanges."""
- while not cond and self.date != None:
- self.select ()
+ """Wait until a cond is True and handle message exchanges."""
+ while not cond and self.date != None:
+ self.select ()
def select (self):
- """Wait until the next event (connection or message) occurs and handle
- it."""
- if not self.min_clients:
- # Check idle and wanted dates.
- idle = True
- min_wait_date = None
- for c in self.clients.itervalues ():
- if not c.idle:
- idle = False
- break
- if c.wait_date != None and (c.wait_date < min_wait_date or
- min_wait_date == None):
- min_wait_date = c.wait_date
- # Abort if everyone waits forever.
- if idle and min_wait_date == None:
- self.date = None
- return
- # Send new date.
- if idle:
- self.date = min_wait_date
- if self.log:
- self.log ('[%d] date' % self.date)
- date = Msg (mex.DATE)
- date.push ('L', self.date)
- for c in self.clients.itervalues ():
- c.send (date)
- # Prepare fdset and select.
- infds = [ self.socket ]
- if not self.min_clients:
- infds += self.clients.values ()
- readyfds = select.select (infds, (), ()) [0]
- for i in readyfds:
- if i is self.socket:
- self.accept ()
- else:
- i.read ()
+ """Wait until the next event (connection or message) occurs and handle
+ it."""
+ if not self.min_clients:
+ # Check idle and wanted dates.
+ idle = True
+ min_wait_date = None
+ for c in self.clients.itervalues ():
+ if not c.idle:
+ idle = False
+ break
+ if c.wait_date != None and (c.wait_date < min_wait_date or
+ min_wait_date == None):
+ min_wait_date = c.wait_date
+ # Abort if everyone waits forever.
+ if idle and min_wait_date == None:
+ self.date = None
+ return
+ # Send new date.
+ if idle:
+ self.date = min_wait_date
+ if self.log:
+ self.log ('[%d] date' % self.date)
+ date = Msg (mex.DATE)
+ date.push ('L', self.date)
+ for c in self.clients.itervalues ():
+ c.send (date)
+ # Prepare fdset and select.
+ infds = [ self.socket ]
+ if not self.min_clients:
+ infds += self.clients.values ()
+ readyfds = select.select (infds, (), ()) [0]
+ for i in readyfds:
+ if i is self.socket:
+ self.accept ()
+ else:
+ i.read ()
def accept (self):
- """Accept a new connection and create a client."""
- # Add client.
- a = self.socket.accept ()
- if self.log:
- self.log ('[%d] connect from %s' % (self.date, str (a[1])))
- c = Hub.Client (self, a[0], self.next_client_id)
- self.next_client_id += 1
- assert self.next_client_id < 256
- self.clients[c.id] = c
- if self.min_clients:
- self.min_clients -= 1
- # Send first date.
- date = Msg (mex.DATE)
- date.push ('L', self.date)
- c.send (date)
+ """Accept a new connection and create a client."""
+ # Add client.
+ a = self.socket.accept ()
+ if self.log:
+ self.log ('[%d] connect from %s' % (self.date, str (a[1])))
+ c = Hub.Client (self, a[0], self.next_client_id)
+ self.next_client_id += 1
+ assert self.next_client_id < 256
+ self.clients[c.id] = c
+ if self.min_clients:
+ self.min_clients -= 1
+ # Send first date.
+ date = Msg (mex.DATE)
+ date.push ('L', self.date)
+ c.send (date)
def broadcast (self, msg, exclude = None):
- """Send a message broadcasted, could exclude a client."""
- for c in self.clients.itervalues ():
- if c is not exclude:
- c.send (msg)
+ """Send a message broadcasted, could exclude a client."""
+ for c in self.clients.itervalues ():
+ if c is not exclude:
+ c.send (msg)
class Client:
- def __init__ (self, hub, socket, id):
- """Initialise a new client."""
- self.hub = hub
- self.socket = socket
- self.id = id
- self.seq = 0
- self.idle = True
- self.wait_date = None
+ def __init__ (self, hub, socket, id):
+ """Initialise a new client."""
+ self.hub = hub
+ self.socket = socket
+ self.id = id
+ self.seq = 0
+ self.idle = True
+ self.wait_date = None
- def read (self):
- """Read and dispatch a message from this client."""
- # Read message.
- head = self.socket.recv (calcsize (mex.HEADER_FMT))
- if head == '':
- self.socket.close ()
- del self.hub.clients[self.id]
- return
- size, seq = unpack (mex.HEADER_FMT, head)
- data = self.socket.recv (size)
- m = Msg (data)
- if self.hub.log:
- self.hub.log ('[%d] received from %d(%d): %s' %
- (self.hub.date, self.id, seq, str (m)))
- # Dispatch.
- if m.mtype == mex.IDLE:
- if seq == self.seq:
- self.idle = True
- if len (m):
- self.wait_date, = m.pop ('L')
- assert self.wait_date >= self.hub.date
- else:
- self.wait_date = None
- elif m.mtype == mex.DATE:
- date = Msg (mex.DATE)
- date.push ('L', self.hub.date)
- self.send (date)
- elif m.mtype == mex.REQ:
- m.pop ('B')
- mr = Msg (mex.REQ)
- mr.push ('B', self.id)
- mr.push (m.pop ())
- self.hub.broadcast (mr, self)
- elif m.mtype == mex.RSP:
- to, = m.pop ('B')
- mr = Msg (mex.RSP)
- mr.push ('B', 0)
- mr.push (m.pop ())
- self.hub.clients[to].send (mr)
- else:
- self.hub.broadcast (m, self)
+ def read (self):
+ """Read and dispatch a message from this client."""
+ # Read message.
+ head = self.socket.recv (calcsize (mex.HEADER_FMT))
+ if head == '':
+ self.socket.close ()
+ del self.hub.clients[self.id]
+ return
+ size, seq = unpack (mex.HEADER_FMT, head)
+ data = self.socket.recv (size)
+ m = Msg (data)
+ if self.hub.log:
+ self.hub.log ('[%d] received from %d(%d): %s' %
+ (self.hub.date, self.id, seq, str (m)))
+ # Dispatch.
+ if m.mtype == mex.IDLE:
+ if seq == self.seq:
+ self.idle = True
+ if len (m):
+ self.wait_date, = m.pop ('L')
+ assert self.wait_date >= self.hub.date
+ else:
+ self.wait_date = None
+ elif m.mtype == mex.DATE:
+ date = Msg (mex.DATE)
+ date.push ('L', self.hub.date)
+ self.send (date)
+ elif m.mtype == mex.REQ:
+ m.pop ('B')
+ mr = Msg (mex.REQ)
+ mr.push ('B', self.id)
+ mr.push (m.pop ())
+ self.hub.broadcast (mr, self)
+ elif m.mtype == mex.RSP:
+ to, = m.pop ('B')
+ mr = Msg (mex.RSP)
+ mr.push ('B', 0)
+ mr.push (m.pop ())
+ self.hub.clients[to].send (mr)
+ else:
+ self.hub.broadcast (m, self)
- def send (self, msg):
- """Send a message to this client."""
- data = msg.data ()
- self.seq += 1
- if self.seq == 256:
- self.seq = 0
- packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
- self.socket.sendall (packet)
- self.idle = False
- if self.hub.log:
- self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date,
- self.id, self.seq, str (msg)))
+ def send (self, msg):
+ """Send a message to this client."""
+ data = msg.data ()
+ self.seq += 1
+ if self.seq == 256:
+ self.seq = 0
+ packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
+ self.socket.sendall (packet)
+ self.idle = False
+ if self.hub.log:
+ self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date,
+ self.id, self.seq, str (msg)))
- def fileno (self):
- """Return socket filedescriptor."""
- return self.socket.fileno ()
+ def fileno (self):
+ """Return socket filedescriptor."""
+ return self.socket.fileno ()
diff --git a/host/mex/msg.py b/host/mex/msg.py
index 5399a3c9..0cd01c8c 100644
--- a/host/mex/msg.py
+++ b/host/mex/msg.py
@@ -84,64 +84,64 @@ class Msg:
"""
def __init__ (self, f):
- """Initialise a new message, see class documentation for
- signatures."""
- try:
- f[0]
- except TypeError:
- # New empty message.
- self.mtype = f
- self.header = pack ('!B', self.mtype)
- self.payload = ''
- else:
- # Read from a buffer.
- self.header = f[0:1]
- self.payload = f[1:]
- self.mtype = unpack ('!B', self.header)[0]
+ """Initialise a new message, see class documentation for
+ signatures."""
+ try:
+ f[0]
+ except TypeError:
+ # New empty message.
+ self.mtype = f
+ self.header = pack ('!B', self.mtype)
+ self.payload = ''
+ else:
+ # Read from a buffer.
+ self.header = f[0:1]
+ self.payload = f[1:]
+ self.mtype = unpack ('!B', self.header)[0]
def data (self):
- """Get the message data, ready to be sent."""
- return self.header + self.payload
+ """Get the message data, ready to be sent."""
+ return self.header + self.payload
def __str__ (self):
- """Return an text representation."""
- payload = ' '.join (['%02x' % ord (i) for i in self.payload])
- return '<Msg %02x: %s>' % (self.mtype, payload)
+ """Return an text representation."""
+ payload = ' '.join (['%02x' % ord (i) for i in self.payload])
+ return '<Msg %02x: %s>' % (self.mtype, payload)
def push (self, fmt, *args):
- """
- Add data to the payload.
-
- msg.push (string) -> None. Add the given string to the payload.
-
- msg.push (fmt, values...) -> None. Add the given values to the
- payload, using a struct.pack format string.
- """
- if args:
- self.payload += pack ('!' + fmt, *args)
- else:
- self.payload += fmt
+ """
+ Add data to the payload.
+
+ msg.push (string) -> None. Add the given string to the payload.
+
+ msg.push (fmt, values...) -> None. Add the given values to the
+ payload, using a struct.pack format string.
+ """
+ if args:
+ self.payload += pack ('!' + fmt, *args)
+ else:
+ self.payload += fmt
def pop (self, fmt = None):
- """
- Get data from the payload.
-
- msg.pop () -> payload. Get all the remaining payload.
-
- msg.pop (fmt) -> (values, ...). Get values extracted according to a
- struct.unpack format string.
- """
- if fmt:
- nb = calcsize (fmt)
- ex, self.payload = self.payload[0:nb], self.payload[nb:]
- return unpack ('!' + fmt, ex)
- else:
- ex, self.payload = self.payload, ''
- return ex
+ """
+ Get data from the payload.
+
+ msg.pop () -> payload. Get all the remaining payload.
+
+ msg.pop (fmt) -> (values, ...). Get values extracted according to a
+ struct.unpack format string.
+ """
+ if fmt:
+ nb = calcsize (fmt)
+ ex, self.payload = self.payload[0:nb], self.payload[nb:]
+ return unpack ('!' + fmt, ex)
+ else:
+ ex, self.payload = self.payload, ''
+ return ex
def __len__ (self):
- """Get payload remaining length."""
- return len (self.payload)
+ """Get payload remaining length."""
+ return len (self.payload)
def _test ():
import doctest
diff --git a/host/mex/node.py b/host/mex/node.py
index 3307a4d7..2a6a5d22 100644
--- a/host/mex/node.py
+++ b/host/mex/node.py
@@ -30,128 +30,128 @@ from struct import pack, unpack, calcsize
class Node:
class closed:
- """Raised on closed connection."""
- pass
+ """Raised on closed connection."""
+ pass
def __init__ (self, addr = mex.DEFAULT_ADDR):
- """Create a new Node and connect it to given Hub address."""
- self.socket = socket.socket ()
- self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1)
- self.socket.connect (addr)
- self.date = 0
- self.seq = 0
- self.req = None
- self.handlers = { }
- self.register (mex.DATE, lambda msg: self.handle_DATE (msg))
- self.register (mex.REQ, lambda msg: self.handle_REQ (msg))
- # Synchronise.
- rsp = None
- while rsp == None or rsp.mtype != mex.DATE:
- rsp = self.recv ()
- self.dispatch (rsp)
+ """Create a new Node and connect it to given Hub address."""
+ self.socket = socket.socket ()
+ self.socket.setsockopt (socket.SOL_TCP, socket.TCP_NODELAY, 1)
+ self.socket.connect (addr)
+ self.date = 0
+ self.seq = 0
+ self.req = None
+ self.handlers = { }
+ self.register (mex.DATE, lambda msg: self.handle_DATE (msg))
+ self.register (mex.REQ, lambda msg: self.handle_REQ (msg))
+ # Synchronise.
+ rsp = None
+ while rsp == None or rsp.mtype != mex.DATE:
+ rsp = self.recv ()
+ self.dispatch (rsp)
def wait (self, date = None):
- """Wait forever or until a date is reached."""
- while date == None or self.date != date:
- idle = Msg (mex.IDLE)
- if date != None:
- idle.push ('L', date)
- self.send (idle)
- msg = self.recv ()
- self.dispatch (msg)
+ """Wait forever or until a date is reached."""
+ while date == None or self.date != date:
+ idle = Msg (mex.IDLE)
+ if date != None:
+ idle.push ('L', date)
+ self.send (idle)
+ msg = self.recv ()
+ self.dispatch (msg)
def wait_async (self, date = None):
- """Asynchronous version of wait. This should not be called again
- until sync return True."""
- self.async_waited = date
- synced = self.sync ()
- assert not synced
+ """Asynchronous version of wait. This should not be called again
+ until sync return True."""
+ self.async_waited = date
+ synced = self.sync ()
+ assert not synced
def sync (self):
- """To be called after read or wait_async. Return True if the waited
- date is reached or signal the Hub our waiting status."""
- if self.date == self.async_waited:
- return True
- else:
- idle = Msg (mex.IDLE)
- if self.async_waited != None:
- idle.push ('L', self.async_waited)
- self.send (idle)
+ """To be called after read or wait_async. Return True if the waited
+ date is reached or signal the Hub our waiting status."""
+ if self.date == self.async_waited:
+ return True
+ else:
+ idle = Msg (mex.IDLE)
+ if self.async_waited != None:
+ idle.push ('L', self.async_waited)
+ self.send (idle)
def read (self):
- """Used for asynchronous operations. Handle incoming data. The sync
- method should be called after this one returns."""
- msg = self.recv ()
- self.dispatch (msg)
+ """Used for asynchronous operations. Handle incoming data. The sync
+ method should be called after this one returns."""
+ msg = self.recv ()
+ self.dispatch (msg)
def send (self, msg):
- """Send a message."""
- data = msg.data ()
- packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
- self.socket.sendall (packet)
+ """Send a message."""
+ data = msg.data ()
+ packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
+ self.socket.sendall (packet)
def request (self, msg):
- """Send a request and return response."""
- # Send request.
- req = Msg (mex.REQ)
- req.push ('B', 0)
- req.push (msg.data ())
- self.send (req)
- # Wait for response.
- rsp = self.recv ()
- while rsp.mtype != mex.RSP:
- self.dispatch (rsp)
- rsp = self.recv ()
- # Discard reqid.
- rsp.pop ('B')
- return Msg (rsp.pop ())
+ """Send a request and return response."""
+ # Send request.
+ req = Msg (mex.REQ)
+ req.push ('B', 0)
+ req.push (msg.data ())
+ self.send (req)
+ # Wait for response.
+ rsp = self.recv ()
+ while rsp.mtype != mex.RSP:
+ self.dispatch (rsp)
+ rsp = self.recv ()
+ # Discard reqid.
+ rsp.pop ('B')
+ return Msg (rsp.pop ())
def response (self, msg):
- """Send a response to the currently serviced request."""
- assert self.req != None
- rsp = Msg (mex.RSP)
- rsp.push ('B', self.req)
- self.req = None
- rsp.push (msg.data ())
- self.send (rsp)
+ """Send a response to the currently serviced request."""
+ assert self.req != None
+ rsp = Msg (mex.RSP)
+ rsp.push ('B', self.req)
+ self.req = None
+ rsp.push (msg.data ())
+ self.send (rsp)
def register (self, mtype, handler):
- """Register an handler for the given message type."""
- assert mtype not in self.handlers
- self.handlers[mtype] = handler
+ """Register an handler for the given message type."""
+ assert mtype not in self.handlers
+ self.handlers[mtype] = handler
def close (self):
- """Close connection with the Hub."""
- self.socket.close ()
- self.socket = None
+ """Close connection with the Hub."""
+ self.socket.close ()
+ self.socket = None
def fileno (self):
- """Return socket fileno () for asynchronous operations."""
- return self.socket.fileno ()
+ """Return socket fileno () for asynchronous operations."""
+ return self.socket.fileno ()
def recv (self):
- """Receive one message."""
- head = self.socket.recv (calcsize (mex.HEADER_FMT))
- if head == '':
- self.close ()
- raise Node.closed
- size, self.seq = unpack (mex.HEADER_FMT, head)
- data = self.socket.recv (size)
- return Msg (data)
+ """Receive one message."""
+ head = self.socket.recv (calcsize (mex.HEADER_FMT))
+ if head == '':
+ self.close ()
+ raise Node.closed
+ size, self.seq = unpack (mex.HEADER_FMT, head)
+ data = self.socket.recv (size)
+ return Msg (data)
def dispatch (self, msg):
- """Call the right handler for the given message."""
- if msg.mtype in self.handlers:
- self.handlers[msg.mtype] (msg)
+ """Call the right handler for the given message."""
+ if msg.mtype in self.handlers:
+ self.handlers[msg.mtype] (msg)
def handle_DATE (self, msg):
- """Handle an incoming DATE."""
- self.date, = msg.pop ('L')
+ """Handle an incoming DATE."""
+ self.date, = msg.pop ('L')
def handle_REQ (self, msg):
- """Handle an incoming REQ."""
- self.req, = msg.pop ('B')
- dec = Msg (msg.pop ())
- self.dispatch (dec)
- self.req = None
+ """Handle an incoming REQ."""
+ self.req, = msg.pop ('B')
+ dec = Msg (msg.pop ())
+ self.dispatch (dec)
+ self.req = None
diff --git a/host/mex/test/test.py b/host/mex/test/test.py
index c4382c2c..4be24f49 100644
--- a/host/mex/test/test.py
+++ b/host/mex/test/test.py
@@ -15,12 +15,12 @@ h = Hub (min_clients = 2, log = log)
def c1 ():
n = Node ()
def a (msg):
- print 'oucouc'
- nb, = msg.pop ('B')
- nb += 1
- m = Msg (msg.mtype)
- m.push ('B', nb)
- n.response (m)
+ print 'oucouc'
+ nb, = msg.pop ('B')
+ nb += 1
+ m = Msg (msg.mtype)
+ m.push ('B', nb)
+ n.response (m)
n.register (0x82, a)
m = Msg (0x81)
n.send (m)
@@ -31,7 +31,7 @@ f1 = Forked (c1)
def c2 ():
n = Node ()
def a (msg):
- print 'coucou'
+ print 'coucou'
n.register (0x81, a)
m = Msg (0x82)
m.push ('B', 42)
@@ -40,9 +40,9 @@ def c2 ():
assert r.pop ('B') == (43,)
n.wait_async (42)
while not n.sync ():
- fds = select.select ((n, ), (), ())[0]
- for i in fds:
- i.read ()
+ fds = select.select ((n, ), (), ())[0]
+ for i in fds:
+ i.read ()
n.wait ()
f2 = Forked (c2)