summaryrefslogtreecommitdiff
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/mex/README25
-rw-r--r--host/mex/mex/__init__.py45
-rw-r--r--host/mex/mex/forked.py48
-rw-r--r--host/mex/mex/hub.py177
-rw-r--r--host/mex/mex/msg.py151
-rw-r--r--host/mex/mex/node.py124
-rw-r--r--host/mex/test/test.py51
7 files changed, 621 insertions, 0 deletions
diff --git a/host/mex/README b/host/mex/README
new file mode 100644
index 00000000..69e18f10
--- /dev/null
+++ b/host/mex/README
@@ -0,0 +1,25 @@
+mex - Message Exchange library.
+
+This library permit exchange of messages between several nodes using a central
+hub, and maintain a virtual date, synchronised between all nodes.
+
+
+Copyright (C) 2008 Nicolas Schodet
+
+APBTeam:
+ Web: http://apbteam.org/
+ Email: team AT apbteam DOT org
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 2 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
diff --git a/host/mex/mex/__init__.py b/host/mex/mex/__init__.py
new file mode 100644
index 00000000..17a462a3
--- /dev/null
+++ b/host/mex/mex/__init__.py
@@ -0,0 +1,45 @@
+# mex - Messages exchange library. {{{
+#
+# Copyright (C) 2008 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""mex - Messages exchange library."""
+
+IDLE = 0
+"""IDLE message, send by nodes to the hub to signal it had handled all
+messages. It can have an optionnal parameter: the date at which the node will
+not be idle anymore."""
+
+DATE = 1
+"""DATE message, send by the hub to the nodes to update the current date
+value."""
+
+REQ = 2
+"""REQ (request) message, send from a node to other nodes to request a
+response message."""
+
+RSP = 3
+"""RSP (response) message, response to a REQ message, only sent to the
+requesting node."""
+
+DEFAULT_ADDR = ('localhost', 2442)
+
+HEADER_FMT = '!HB'
diff --git a/host/mex/mex/forked.py b/host/mex/mex/forked.py
new file mode 100644
index 00000000..9e72d7d3
--- /dev/null
+++ b/host/mex/mex/forked.py
@@ -0,0 +1,48 @@
+# mex - Messages exchange library. {{{
+#
+# Copyright (C) 2008 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""Run a function in a separate process."""
+import os, sys, signal
+
+class Forked:
+ """
+ Usage:
+
+ >>> f = Forked (function)
+ (function runs in background, in a separated process)
+ >>> f.kill ()
+ (send the SIGTERM signal to the process)
+ """
+
+ def __init__ (self, function):
+ """Initialise, fork, and call the given function in the children
+ process."""
+ self.pid = os.fork ()
+ if self.pid == 0:
+ function ()
+ sys.exit (0)
+
+ def kill (self):
+ """Kill the forked function."""
+ os.kill (self.pid, signal.SIGTERM)
+ os.waitpid (self.pid, 0)
diff --git a/host/mex/mex/hub.py b/host/mex/mex/hub.py
new file mode 100644
index 00000000..79b18286
--- /dev/null
+++ b/host/mex/mex/hub.py
@@ -0,0 +1,177 @@
+# mex - Messages exchange library. {{{
+#
+# Copyright (C) 2008 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""mex Hub."""
+import mex
+from msg import Msg
+import msg
+import socket, select
+from struct import pack, unpack, calcsize
+
+class Hub:
+
+ def __init__ (self, addr = mex.DEFAULT_ADDR, min_clients = 0, log = None):
+ """Initialise a new Hub and bind to server address."""
+ self.addr = addr
+ self.min_clients = min_clients
+ self.log = log
+ self.clients = { }
+ self.next_client_id = 1
+ self.date = 0
+ self.socket = socket.socket ()
+ self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind (self.addr)
+ self.socket.listen (5)
+
+ def wait (self, cond = False):
+ """Wait until a cond is True and handle message exchanges."""
+ while not cond and self.date != None:
+ self.select ()
+
+ def select (self):
+ """Wait until the next event (connection or message) occurs and handle
+ it."""
+ if not self.min_clients:
+ # Check idle and wanted dates.
+ idle = True
+ min_wait_date = None
+ for c in self.clients.itervalues ():
+ if not c.idle:
+ idle = False
+ break
+ if c.wait_date != None and (c.wait_date < min_wait_date or
+ min_wait_date == None):
+ min_wait_date = c.wait_date
+ # Abort if everyone waits forever.
+ if idle and min_wait_date == None:
+ self.date = None
+ return
+ # Send new date.
+ if idle:
+ self.date = min_wait_date
+ if self.log:
+ self.log ('[%d] date' % self.date)
+ date = Msg (mex.DATE)
+ date.push ('L', self.date)
+ for c in self.clients.itervalues ():
+ c.send (date)
+ # Prepare fdset and select.
+ infds = [ self.socket ]
+ if not self.min_clients:
+ infds += self.clients.values ()
+ readyfds = select.select (infds, (), ()) [0]
+ for i in readyfds:
+ if i is self.socket:
+ self.accept ()
+ else:
+ i.read ()
+
+ def accept (self):
+ """Accept a new connection and create a client."""
+ # Add client.
+ a = self.socket.accept ()
+ if self.log:
+ self.log ('[%d] connect from %s' % (self.date, str (a[1])))
+ c = Hub.Client (self, a[0], self.next_client_id)
+ self.next_client_id += 1
+ assert self.next_client_id < 256
+ self.clients[c.id] = c
+ if self.min_clients:
+ self.min_clients -= 1
+ # Send first date.
+ date = Msg (mex.DATE)
+ date.push ('L', self.date)
+ c.send (date)
+
+ def broadcast (self, msg, exclude = None):
+ """Send a message broadcasted, could exclude a client."""
+ for c in self.clients.itervalues ():
+ if c is not exclude:
+ c.send (msg)
+
+ class Client:
+
+ def __init__ (self, hub, socket, id):
+ """Initialise a new client."""
+ self.hub = hub
+ self.socket = socket
+ self.id = id
+ self.seq = 0
+ self.idle = True
+ self.wait_date = None
+
+ def read (self):
+ """Read and dispatch a message from this client."""
+ # Read message.
+ head = self.socket.recv (calcsize (mex.HEADER_FMT))
+ if head == '':
+ self.socket.close ()
+ del self.hub.clients[self.id]
+ return
+ size, seq = unpack (mex.HEADER_FMT, head)
+ data = self.socket.recv (size)
+ m = Msg (data)
+ if self.hub.log:
+ self.hub.log ('[%d] received from %d(%d): %s' %
+ (self.hub.date, self.id, seq, str (m)))
+ # Dispatch.
+ if m.mtype == mex.IDLE:
+ if seq == self.seq:
+ self.idle = True
+ if len (m):
+ self.wait_date, = m.pop ('L')
+ assert self.wait_date >= self.hub.date
+ else:
+ self.wait_date = None
+ elif m.mtype == mex.REQ:
+ m.pop ('B')
+ mr = Msg (mex.REQ)
+ mr.push ('B', self.id)
+ mr.push (m.pop ())
+ self.hub.broadcast (mr, self)
+ elif m.mtype == mex.RSP:
+ to, = m.pop ('B')
+ mr = Msg (mex.RSP)
+ mr.push ('B', 0)
+ mr.push (m.pop ())
+ self.hub.clients[to].send (mr)
+ else:
+ self.hub.broadcast (m, self)
+
+ def send (self, msg):
+ """Send a message to this client."""
+ data = msg.data ()
+ self.seq += 1
+ if self.seq == 256:
+ self.seq = 0
+ packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
+ self.socket.sendall (packet)
+ self.idle = False
+ if self.hub.log:
+ self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date,
+ self.id, self.seq, str (msg)))
+
+ def fileno (self):
+ """Return socket filedescriptor."""
+ return self.socket.fileno ()
+
diff --git a/host/mex/mex/msg.py b/host/mex/mex/msg.py
new file mode 100644
index 00000000..5399a3c9
--- /dev/null
+++ b/host/mex/mex/msg.py
@@ -0,0 +1,151 @@
+# mex - Messages exchange library. {{{
+#
+# Copyright (C) 2008 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""mex message module."""
+from struct import pack, unpack, calcsize
+
+class Msg:
+ r"""Msg (int) -> new empty message with given mtype.
+ Msg (buffer) -> new message read from buffer.
+
+ Examples
+ ========
+
+ Send:
+
+ >>> m = Msg (3)
+ >>> print m
+ <Msg 03: >
+ >>> m.mtype
+ 3
+ >>> m.push ('BHl', 1, 2, 3)
+ >>> print m
+ <Msg 03: 01 00 02 00 00 00 03>
+ >>> len (m)
+ 7
+ >>> m.data ()
+ '\x03\x01\x00\x02\x00\x00\x00\x03'
+
+ Receive:
+
+ >>> d = m.data ()
+ >>> m2 = Msg (d)
+ >>> print m2
+ <Msg 03: 01 00 02 00 00 00 03>
+ >>> len (m)
+ 7
+ >>> m2.mtype
+ 3
+ >>> m2.pop ('BHl')
+ (1, 2, 3)
+ >>> len (m2)
+ 0
+
+ Encapsulate:
+
+ >>> menc = Msg (4)
+ >>> menc.push ('B', 5)
+ >>> menc.push (d)
+ >>> print menc
+ <Msg 04: 05 03 01 00 02 00 00 00 03>
+ >>> len (menc)
+ 9
+
+ Decapsulate:
+
+ >>> mdec = Msg (menc.data ())
+ >>> mdec.pop ('B')
+ (5,)
+ >>> mdecd = Msg (mdec.pop ())
+ >>> print mdecd
+ <Msg 03: 01 00 02 00 00 00 03>
+ >>> len (mdecd)
+ 7
+ """
+
+ def __init__ (self, f):
+ """Initialise a new message, see class documentation for
+ signatures."""
+ try:
+ f[0]
+ except TypeError:
+ # New empty message.
+ self.mtype = f
+ self.header = pack ('!B', self.mtype)
+ self.payload = ''
+ else:
+ # Read from a buffer.
+ self.header = f[0:1]
+ self.payload = f[1:]
+ self.mtype = unpack ('!B', self.header)[0]
+
+ def data (self):
+ """Get the message data, ready to be sent."""
+ return self.header + self.payload
+
+ def __str__ (self):
+ """Return an text representation."""
+ payload = ' '.join (['%02x' % ord (i) for i in self.payload])
+ return '<Msg %02x: %s>' % (self.mtype, payload)
+
+ def push (self, fmt, *args):
+ """
+ Add data to the payload.
+
+ msg.push (string) -> None. Add the given string to the payload.
+
+ msg.push (fmt, values...) -> None. Add the given values to the
+ payload, using a struct.pack format string.
+ """
+ if args:
+ self.payload += pack ('!' + fmt, *args)
+ else:
+ self.payload += fmt
+
+ def pop (self, fmt = None):
+ """
+ Get data from the payload.
+
+ msg.pop () -> payload. Get all the remaining payload.
+
+ msg.pop (fmt) -> (values, ...). Get values extracted according to a
+ struct.unpack format string.
+ """
+ if fmt:
+ nb = calcsize (fmt)
+ ex, self.payload = self.payload[0:nb], self.payload[nb:]
+ return unpack ('!' + fmt, ex)
+ else:
+ ex, self.payload = self.payload, ''
+ return ex
+
+ def __len__ (self):
+ """Get payload remaining length."""
+ return len (self.payload)
+
+def _test ():
+ import doctest
+ doctest.testmod ()
+
+if __name__ == '__main__':
+ _test()
diff --git a/host/mex/mex/node.py b/host/mex/mex/node.py
new file mode 100644
index 00000000..15cc5c2e
--- /dev/null
+++ b/host/mex/mex/node.py
@@ -0,0 +1,124 @@
+# mex - Messages exchange library. {{{
+#
+# Copyright (C) 2008 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""mex Node."""
+import mex
+from msg import Msg
+import msg
+import socket
+from struct import pack, unpack, calcsize
+
+class Node:
+
+ class closed:
+ """Raised on closed connection."""
+ pass
+
+ def __init__ (self, addr = mex.DEFAULT_ADDR):
+ """Create a new Node and connect it to given Hub address."""
+ self.socket = socket.socket ()
+ self.socket.connect (addr)
+ self.date = 0
+ self.seq = 0
+ self.req = None
+ self.handlers = { }
+ self.register (mex.DATE, lambda msg: self.handle_DATE (msg))
+ self.register (mex.REQ, lambda msg: self.handle_REQ (msg))
+
+ def wait (self, date = None):
+ """Wait forever or until a date is reached."""
+ while date == None or self.date != date:
+ idle = Msg (mex.IDLE)
+ if date != None:
+ idle.push ('L', date)
+ self.send (idle)
+ msg = self.recv ()
+ self.dispatch (msg)
+
+ def send (self, msg):
+ """Send a message."""
+ data = msg.data ()
+ packet = pack (mex.HEADER_FMT, len (data), self.seq) + data
+ self.socket.sendall (packet)
+
+ def request (self, msg):
+ """Send a request and return response."""
+ # Send request.
+ req = Msg (mex.REQ)
+ req.push ('B', 0)
+ req.push (msg.data ())
+ self.send (req)
+ # Wait for response.
+ rsp = self.recv ()
+ while rsp.mtype != mex.RSP:
+ self.dispatch (rsp)
+ rsp = self.recv ()
+ # Discard reqid.
+ rsp.pop ('B')
+ return Msg (rsp.pop ())
+
+ def response (self, msg):
+ """Send a response to the currently serviced request."""
+ assert self.req != None
+ rsp = Msg (mex.RSP)
+ rsp.push ('B', self.req)
+ self.req = None
+ rsp.push (msg.data ())
+ self.send (rsp)
+
+ def register (self, mtype, handler):
+ """Register an handler for the given message type."""
+ assert mtype not in self.handlers
+ self.handlers[mtype] = handler
+
+ def close (self):
+ """Close connection with the Hub."""
+ self.socket.close ()
+ self.socket = None
+
+ def recv (self):
+ """Receive one message."""
+ head = self.socket.recv (calcsize (mex.HEADER_FMT))
+ if head == '':
+ self.close ()
+ raise Node.closed
+ size, self.seq = unpack (mex.HEADER_FMT, head)
+ data = self.socket.recv (size)
+ return Msg (data)
+
+ def dispatch (self, msg):
+ """Call the right handler for the given message."""
+ if msg.mtype in self.handlers:
+ self.handlers[msg.mtype] (msg)
+
+ def handle_DATE (self, msg):
+ """Handle an incoming DATE."""
+ self.date, = msg.pop ('L')
+
+ def handle_REQ (self, msg):
+ """Handle an incoming REQ."""
+ self.req, = msg.pop ('B')
+ dec = Msg (msg.pop ())
+ self.dispatch (dec)
+ self.req = None
+
diff --git a/host/mex/test/test.py b/host/mex/test/test.py
new file mode 100644
index 00000000..48484486
--- /dev/null
+++ b/host/mex/test/test.py
@@ -0,0 +1,51 @@
+import sys
+sys.path.append (sys.path[0] + '/..')
+
+from mex.hub import Hub
+from mex.node import Node
+from mex.msg import Msg
+from mex.forked import Forked
+
+def log (x):
+ print x
+
+h = Hub (min_clients = 2, log = log)
+
+def c1 ():
+ n = Node ()
+ def a (msg):
+ print 'oucouc'
+ nb, = msg.pop ('B')
+ nb += 1
+ m = Msg (msg.mtype)
+ m.push ('B', nb)
+ n.response (m)
+ n.register (0x82, a)
+ m = Msg (0x81)
+ n.send (m)
+ n.wait ()
+
+f1 = Forked (c1)
+
+def c2 ():
+ n = Node ()
+ def a (msg):
+ print 'coucou'
+ n.register (0x81, a)
+ m = Msg (0x82)
+ m.push ('B', 42)
+ r = n.request (m)
+ assert r.mtype == 0x82
+ assert r.pop ('B') == (43,)
+ n.wait (42)
+ n.wait ()
+
+f2 = Forked (c2)
+
+try:
+ h.wait ()
+finally:
+ f1.kill ()
+ f2.kill ()
+ import time
+ time.sleep (1)