From 3bc81942721dcbba5af957c3822c63b7161b2091 Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Sat, 29 Mar 2008 00:40:27 +0100 Subject: * host/mex: - first mex version. --- host/mex/README | 25 +++++++ host/mex/mex/__init__.py | 45 ++++++++++++ host/mex/mex/forked.py | 48 +++++++++++++ host/mex/mex/hub.py | 177 +++++++++++++++++++++++++++++++++++++++++++++++ host/mex/mex/msg.py | 151 ++++++++++++++++++++++++++++++++++++++++ host/mex/mex/node.py | 124 +++++++++++++++++++++++++++++++++ host/mex/test/test.py | 51 ++++++++++++++ 7 files changed, 621 insertions(+) create mode 100644 host/mex/README create mode 100644 host/mex/mex/__init__.py create mode 100644 host/mex/mex/forked.py create mode 100644 host/mex/mex/hub.py create mode 100644 host/mex/mex/msg.py create mode 100644 host/mex/mex/node.py create mode 100644 host/mex/test/test.py (limited to 'host/mex') diff --git a/host/mex/README b/host/mex/README new file mode 100644 index 00000000..69e18f10 --- /dev/null +++ b/host/mex/README @@ -0,0 +1,25 @@ +mex - Message Exchange library. + +This library permit exchange of messages between several nodes using a central +hub, and maintain a virtual date, synchronised between all nodes. + + +Copyright (C) 2008 Nicolas Schodet + +APBTeam: + Web: http://apbteam.org/ + Email: team AT apbteam DOT org + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. diff --git a/host/mex/mex/__init__.py b/host/mex/mex/__init__.py new file mode 100644 index 00000000..17a462a3 --- /dev/null +++ b/host/mex/mex/__init__.py @@ -0,0 +1,45 @@ +# mex - Messages exchange library. {{{ +# +# Copyright (C) 2008 Nicolas Schodet +# +# APBTeam: +# Web: http://apbteam.org/ +# Email: team AT apbteam DOT org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# }}} +"""mex - Messages exchange library.""" + +IDLE = 0 +"""IDLE message, send by nodes to the hub to signal it had handled all +messages. It can have an optionnal parameter: the date at which the node will +not be idle anymore.""" + +DATE = 1 +"""DATE message, send by the hub to the nodes to update the current date +value.""" + +REQ = 2 +"""REQ (request) message, send from a node to other nodes to request a +response message.""" + +RSP = 3 +"""RSP (response) message, response to a REQ message, only sent to the +requesting node.""" + +DEFAULT_ADDR = ('localhost', 2442) + +HEADER_FMT = '!HB' diff --git a/host/mex/mex/forked.py b/host/mex/mex/forked.py new file mode 100644 index 00000000..9e72d7d3 --- /dev/null +++ b/host/mex/mex/forked.py @@ -0,0 +1,48 @@ +# mex - Messages exchange library. {{{ +# +# Copyright (C) 2008 Nicolas Schodet +# +# APBTeam: +# Web: http://apbteam.org/ +# Email: team AT apbteam DOT org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# }}} +"""Run a function in a separate process.""" +import os, sys, signal + +class Forked: + """ + Usage: + + >>> f = Forked (function) + (function runs in background, in a separated process) + >>> f.kill () + (send the SIGTERM signal to the process) + """ + + def __init__ (self, function): + """Initialise, fork, and call the given function in the children + process.""" + self.pid = os.fork () + if self.pid == 0: + function () + sys.exit (0) + + def kill (self): + """Kill the forked function.""" + os.kill (self.pid, signal.SIGTERM) + os.waitpid (self.pid, 0) diff --git a/host/mex/mex/hub.py b/host/mex/mex/hub.py new file mode 100644 index 00000000..79b18286 --- /dev/null +++ b/host/mex/mex/hub.py @@ -0,0 +1,177 @@ +# mex - Messages exchange library. {{{ +# +# Copyright (C) 2008 Nicolas Schodet +# +# APBTeam: +# Web: http://apbteam.org/ +# Email: team AT apbteam DOT org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# }}} +"""mex Hub.""" +import mex +from msg import Msg +import msg +import socket, select +from struct import pack, unpack, calcsize + +class Hub: + + def __init__ (self, addr = mex.DEFAULT_ADDR, min_clients = 0, log = None): + """Initialise a new Hub and bind to server address.""" + self.addr = addr + self.min_clients = min_clients + self.log = log + self.clients = { } + self.next_client_id = 1 + self.date = 0 + self.socket = socket.socket () + self.socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind (self.addr) + self.socket.listen (5) + + def wait (self, cond = False): + """Wait until a cond is True and handle message exchanges.""" + while not cond and self.date != None: + self.select () + + def select (self): + """Wait until the next event (connection or message) occurs and handle + it.""" + if not self.min_clients: + # Check idle and wanted dates. + idle = True + min_wait_date = None + for c in self.clients.itervalues (): + if not c.idle: + idle = False + break + if c.wait_date != None and (c.wait_date < min_wait_date or + min_wait_date == None): + min_wait_date = c.wait_date + # Abort if everyone waits forever. + if idle and min_wait_date == None: + self.date = None + return + # Send new date. + if idle: + self.date = min_wait_date + if self.log: + self.log ('[%d] date' % self.date) + date = Msg (mex.DATE) + date.push ('L', self.date) + for c in self.clients.itervalues (): + c.send (date) + # Prepare fdset and select. + infds = [ self.socket ] + if not self.min_clients: + infds += self.clients.values () + readyfds = select.select (infds, (), ()) [0] + for i in readyfds: + if i is self.socket: + self.accept () + else: + i.read () + + def accept (self): + """Accept a new connection and create a client.""" + # Add client. + a = self.socket.accept () + if self.log: + self.log ('[%d] connect from %s' % (self.date, str (a[1]))) + c = Hub.Client (self, a[0], self.next_client_id) + self.next_client_id += 1 + assert self.next_client_id < 256 + self.clients[c.id] = c + if self.min_clients: + self.min_clients -= 1 + # Send first date. + date = Msg (mex.DATE) + date.push ('L', self.date) + c.send (date) + + def broadcast (self, msg, exclude = None): + """Send a message broadcasted, could exclude a client.""" + for c in self.clients.itervalues (): + if c is not exclude: + c.send (msg) + + class Client: + + def __init__ (self, hub, socket, id): + """Initialise a new client.""" + self.hub = hub + self.socket = socket + self.id = id + self.seq = 0 + self.idle = True + self.wait_date = None + + def read (self): + """Read and dispatch a message from this client.""" + # Read message. + head = self.socket.recv (calcsize (mex.HEADER_FMT)) + if head == '': + self.socket.close () + del self.hub.clients[self.id] + return + size, seq = unpack (mex.HEADER_FMT, head) + data = self.socket.recv (size) + m = Msg (data) + if self.hub.log: + self.hub.log ('[%d] received from %d(%d): %s' % + (self.hub.date, self.id, seq, str (m))) + # Dispatch. + if m.mtype == mex.IDLE: + if seq == self.seq: + self.idle = True + if len (m): + self.wait_date, = m.pop ('L') + assert self.wait_date >= self.hub.date + else: + self.wait_date = None + elif m.mtype == mex.REQ: + m.pop ('B') + mr = Msg (mex.REQ) + mr.push ('B', self.id) + mr.push (m.pop ()) + self.hub.broadcast (mr, self) + elif m.mtype == mex.RSP: + to, = m.pop ('B') + mr = Msg (mex.RSP) + mr.push ('B', 0) + mr.push (m.pop ()) + self.hub.clients[to].send (mr) + else: + self.hub.broadcast (m, self) + + def send (self, msg): + """Send a message to this client.""" + data = msg.data () + self.seq += 1 + if self.seq == 256: + self.seq = 0 + packet = pack (mex.HEADER_FMT, len (data), self.seq) + data + self.socket.sendall (packet) + self.idle = False + if self.hub.log: + self.hub.log ('[%d] sending to %d(%d): %s' % (self.hub.date, + self.id, self.seq, str (msg))) + + def fileno (self): + """Return socket filedescriptor.""" + return self.socket.fileno () + diff --git a/host/mex/mex/msg.py b/host/mex/mex/msg.py new file mode 100644 index 00000000..5399a3c9 --- /dev/null +++ b/host/mex/mex/msg.py @@ -0,0 +1,151 @@ +# mex - Messages exchange library. {{{ +# +# Copyright (C) 2008 Nicolas Schodet +# +# APBTeam: +# Web: http://apbteam.org/ +# Email: team AT apbteam DOT org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# }}} +"""mex message module.""" +from struct import pack, unpack, calcsize + +class Msg: + r"""Msg (int) -> new empty message with given mtype. + Msg (buffer) -> new message read from buffer. + + Examples + ======== + + Send: + + >>> m = Msg (3) + >>> print m + + >>> m.mtype + 3 + >>> m.push ('BHl', 1, 2, 3) + >>> print m + + >>> len (m) + 7 + >>> m.data () + '\x03\x01\x00\x02\x00\x00\x00\x03' + + Receive: + + >>> d = m.data () + >>> m2 = Msg (d) + >>> print m2 + + >>> len (m) + 7 + >>> m2.mtype + 3 + >>> m2.pop ('BHl') + (1, 2, 3) + >>> len (m2) + 0 + + Encapsulate: + + >>> menc = Msg (4) + >>> menc.push ('B', 5) + >>> menc.push (d) + >>> print menc + + >>> len (menc) + 9 + + Decapsulate: + + >>> mdec = Msg (menc.data ()) + >>> mdec.pop ('B') + (5,) + >>> mdecd = Msg (mdec.pop ()) + >>> print mdecd + + >>> len (mdecd) + 7 + """ + + def __init__ (self, f): + """Initialise a new message, see class documentation for + signatures.""" + try: + f[0] + except TypeError: + # New empty message. + self.mtype = f + self.header = pack ('!B', self.mtype) + self.payload = '' + else: + # Read from a buffer. + self.header = f[0:1] + self.payload = f[1:] + self.mtype = unpack ('!B', self.header)[0] + + def data (self): + """Get the message data, ready to be sent.""" + return self.header + self.payload + + def __str__ (self): + """Return an text representation.""" + payload = ' '.join (['%02x' % ord (i) for i in self.payload]) + return '' % (self.mtype, payload) + + def push (self, fmt, *args): + """ + Add data to the payload. + + msg.push (string) -> None. Add the given string to the payload. + + msg.push (fmt, values...) -> None. Add the given values to the + payload, using a struct.pack format string. + """ + if args: + self.payload += pack ('!' + fmt, *args) + else: + self.payload += fmt + + def pop (self, fmt = None): + """ + Get data from the payload. + + msg.pop () -> payload. Get all the remaining payload. + + msg.pop (fmt) -> (values, ...). Get values extracted according to a + struct.unpack format string. + """ + if fmt: + nb = calcsize (fmt) + ex, self.payload = self.payload[0:nb], self.payload[nb:] + return unpack ('!' + fmt, ex) + else: + ex, self.payload = self.payload, '' + return ex + + def __len__ (self): + """Get payload remaining length.""" + return len (self.payload) + +def _test (): + import doctest + doctest.testmod () + +if __name__ == '__main__': + _test() diff --git a/host/mex/mex/node.py b/host/mex/mex/node.py new file mode 100644 index 00000000..15cc5c2e --- /dev/null +++ b/host/mex/mex/node.py @@ -0,0 +1,124 @@ +# mex - Messages exchange library. {{{ +# +# Copyright (C) 2008 Nicolas Schodet +# +# APBTeam: +# Web: http://apbteam.org/ +# Email: team AT apbteam DOT org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# }}} +"""mex Node.""" +import mex +from msg import Msg +import msg +import socket +from struct import pack, unpack, calcsize + +class Node: + + class closed: + """Raised on closed connection.""" + pass + + def __init__ (self, addr = mex.DEFAULT_ADDR): + """Create a new Node and connect it to given Hub address.""" + self.socket = socket.socket () + self.socket.connect (addr) + self.date = 0 + self.seq = 0 + self.req = None + self.handlers = { } + self.register (mex.DATE, lambda msg: self.handle_DATE (msg)) + self.register (mex.REQ, lambda msg: self.handle_REQ (msg)) + + def wait (self, date = None): + """Wait forever or until a date is reached.""" + while date == None or self.date != date: + idle = Msg (mex.IDLE) + if date != None: + idle.push ('L', date) + self.send (idle) + msg = self.recv () + self.dispatch (msg) + + def send (self, msg): + """Send a message.""" + data = msg.data () + packet = pack (mex.HEADER_FMT, len (data), self.seq) + data + self.socket.sendall (packet) + + def request (self, msg): + """Send a request and return response.""" + # Send request. + req = Msg (mex.REQ) + req.push ('B', 0) + req.push (msg.data ()) + self.send (req) + # Wait for response. + rsp = self.recv () + while rsp.mtype != mex.RSP: + self.dispatch (rsp) + rsp = self.recv () + # Discard reqid. + rsp.pop ('B') + return Msg (rsp.pop ()) + + def response (self, msg): + """Send a response to the currently serviced request.""" + assert self.req != None + rsp = Msg (mex.RSP) + rsp.push ('B', self.req) + self.req = None + rsp.push (msg.data ()) + self.send (rsp) + + def register (self, mtype, handler): + """Register an handler for the given message type.""" + assert mtype not in self.handlers + self.handlers[mtype] = handler + + def close (self): + """Close connection with the Hub.""" + self.socket.close () + self.socket = None + + def recv (self): + """Receive one message.""" + head = self.socket.recv (calcsize (mex.HEADER_FMT)) + if head == '': + self.close () + raise Node.closed + size, self.seq = unpack (mex.HEADER_FMT, head) + data = self.socket.recv (size) + return Msg (data) + + def dispatch (self, msg): + """Call the right handler for the given message.""" + if msg.mtype in self.handlers: + self.handlers[msg.mtype] (msg) + + def handle_DATE (self, msg): + """Handle an incoming DATE.""" + self.date, = msg.pop ('L') + + def handle_REQ (self, msg): + """Handle an incoming REQ.""" + self.req, = msg.pop ('B') + dec = Msg (msg.pop ()) + self.dispatch (dec) + self.req = None + diff --git a/host/mex/test/test.py b/host/mex/test/test.py new file mode 100644 index 00000000..48484486 --- /dev/null +++ b/host/mex/test/test.py @@ -0,0 +1,51 @@ +import sys +sys.path.append (sys.path[0] + '/..') + +from mex.hub import Hub +from mex.node import Node +from mex.msg import Msg +from mex.forked import Forked + +def log (x): + print x + +h = Hub (min_clients = 2, log = log) + +def c1 (): + n = Node () + def a (msg): + print 'oucouc' + nb, = msg.pop ('B') + nb += 1 + m = Msg (msg.mtype) + m.push ('B', nb) + n.response (m) + n.register (0x82, a) + m = Msg (0x81) + n.send (m) + n.wait () + +f1 = Forked (c1) + +def c2 (): + n = Node () + def a (msg): + print 'coucou' + n.register (0x81, a) + m = Msg (0x82) + m.push ('B', 42) + r = n.request (m) + assert r.mtype == 0x82 + assert r.pop ('B') == (43,) + n.wait (42) + n.wait () + +f2 = Forked (c2) + +try: + h.wait () +finally: + f1.kill () + f2.kill () + import time + time.sleep (1) -- cgit v1.2.3