From db0dd3f3982909155effb25f5aaa00e914e0a289 Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Tue, 3 Mar 2009 00:55:44 +0100 Subject: * digital/asserv/tools: - added mex interface. - added proto position observable. - added more documentation. --- digital/asserv/tools/asserv/asserv.py | 66 +++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 11 deletions(-) (limited to 'digital/asserv/tools/asserv/asserv.py') diff --git a/digital/asserv/tools/asserv/asserv.py b/digital/asserv/tools/asserv/asserv.py index 31cfce2d..7109e7b3 100644 --- a/digital/asserv/tools/asserv/asserv.py +++ b/digital/asserv/tools/asserv/asserv.py @@ -21,13 +21,18 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # }}} */ +"""Proto interface to asserv.""" + import time import numpy import math import proto +from utils.observable import Observable -class Asserv: +class Proto: + """Provide functions to communicate with asserv using the proto + interface.""" stats_format = { 'C': 'HHH', @@ -58,16 +63,36 @@ class Asserv: 'a0w': ('W', 2), } + class Position (Observable): + """An observable position. To be used with register_pos. + + - pos: (x, y) millimeters. + - angle: radian. + + """ + + def __init__ (self): + Observable.__init__ (self) + self.pos = None + self.angle = None + + def handle (self, x, y, a): + """Update position and notify observers.""" + self.pos = (x, y) + self.angle = a + self.notify () + def __init__ (self, file, time = time.time, **param): + """Initialise communication and send parameters to asserv.""" self.proto = proto.Proto (file, time, 0.1) self.async = False self.mseq = 0 self.mseq_ack = 0 self.a0seq = 0 self.a0seq_ack = 0 - self.proto.register ('A', 'BB', self.handle_ack) + self.proto.register ('A', 'BB', self.__handle_ack) def make_handle (s): - return lambda *args: self.handle_stats (s, *args) + return lambda *args: self.__handle_stats (s, *args) for (s, f) in self.stats_format.iteritems (): self.proto.register (s, f, make_handle (s)) self.stats_enabled = None @@ -87,7 +112,8 @@ class Asserv: self.send_param () def stats (self, *stats_items, **options): - """Activate stats.""" + """Activate stats. Take a list of items to record, and an optional + interval option.""" interval = 1 if 'interval' in options: interval = options['interval'] @@ -115,6 +141,7 @@ class Asserv: self.stats_line = [ ] def get_stats (self, wait = None): + """Get recorded stats. Return an array with every requested stats.""" if wait: self.wait (wait) list = self.stats_list @@ -207,13 +234,19 @@ class Asserv: """Set simulated position.""" self.proto.send ('h', 'BHHH', ord ('X'), x, y, a * 1024) - def register_pos (self, func, interval = 225 / 4): - """Will call func each time a position is received.""" - self.pos_func = func - self.proto.register ('X', 'lll', self.handle_pos) + def register_pos (self, func = None, interval = 225 / 4): + """Will call func each time a position is received. If no function is + provided, use the Position observable object.""" + if func is None: + self.position = self.Position () + self.pos_func = self.position.handle + else: + self.pos_func = func + self.proto.register ('X', 'lll', self.__handle_pos) self.proto.send ('X', 'B', interval) def send_param (self): + """Send all parameters.""" p = self.param self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256, p['akp'] * 256) @@ -239,11 +272,13 @@ class Asserv: self.proto.send ('p', 'BH', ord ('l'), p['l']) def write_eeprom (self): + """Request an EEPROM write.""" self.proto.send ('p', 'BB', ord ('E'), 1) time.sleep (1) self.wait (lambda: True) - def handle_stats (self, stat, *args): + def __handle_stats (self, stat, *args): + """Record received stats.""" if self.stats_enabled is not None: self.stats_line.extend (args) if self.stats_counter == stat: @@ -251,18 +286,22 @@ class Asserv: self.stats_line = [ ] self.stats_count += 1 - def handle_ack (self, mseq, a0seq): + def __handle_ack (self, mseq, a0seq): + """Record current acknowledge level and acknowledge reception.""" self.mseq_ack = mseq & 0x7f self.a0seq_ack = a0seq & 0x7f self.proto.send ('a', 'BB', mseq, a0seq) - def handle_pos (self, x, y, a): + def __handle_pos (self, x, y, a): + """Handle position report.""" x = x / 256 * self.param['scale'] y = y / 256 * self.param['scale'] a = a * 2 * math.pi / (1 << 24) self.pos_func (x, y, a) def wait (self, cond = None, auto = False): + """Wait for a condition to become true, or for a number of recorded + statistics. If auto is True, do not wait in asynchronous mode.""" if auto and self.async: return try: @@ -273,21 +312,26 @@ class Asserv: self.proto.wait (cond) def finished (self): + """Return True if movement commands have been acknowledged.""" return self.mseq == self.mseq_ack and self.a0seq == self.a0seq_ack def free (self): + """Coast motors.""" self.proto.send ('w') def reset (self): + """Coast all motors and reset asserv.""" self.proto.send ('w') self.proto.send ('w', 'H', 0) self.proto.send ('z') self.proto.send ('z') def close (self): + """Gracefully close communications.""" self.reset () self.wait (lambda: True) self.proto.file.close () def fileno (self): + """Return fileno for select() calls.""" return self.proto.fileno () -- cgit v1.2.3