summaryrefslogtreecommitdiffhomepage
path: root/digital/asserv/tools/asserv
diff options
context:
space:
mode:
authorNicolas Schodet2009-03-03 00:55:44 +0100
committerNicolas Schodet2009-03-03 00:55:44 +0100
commitdb0dd3f3982909155effb25f5aaa00e914e0a289 (patch)
treef545b7cfd68646ae6da405642b53f7ed4647eb05 /digital/asserv/tools/asserv
parenta74aced592651ac6db61e2c46cae6bdcc843b32f (diff)
* digital/asserv/tools:
- added mex interface. - added proto position observable. - added more documentation.
Diffstat (limited to 'digital/asserv/tools/asserv')
-rw-r--r--digital/asserv/tools/asserv/__init__.py3
-rw-r--r--digital/asserv/tools/asserv/asserv.py66
-rw-r--r--digital/asserv/tools/asserv/mex.py100
3 files changed, 157 insertions, 12 deletions
diff --git a/digital/asserv/tools/asserv/__init__.py b/digital/asserv/tools/asserv/__init__.py
index a498dc6b..7a8b5d6f 100644
--- a/digital/asserv/tools/asserv/__init__.py
+++ b/digital/asserv/tools/asserv/__init__.py
@@ -1 +1,2 @@
-from asserv import Asserv
+from asserv import Proto
+from mex import Mex
diff --git a/digital/asserv/tools/asserv/asserv.py b/digital/asserv/tools/asserv/asserv.py
index 31cfce2d..7109e7b3 100644
--- a/digital/asserv/tools/asserv/asserv.py
+++ b/digital/asserv/tools/asserv/asserv.py
@@ -21,13 +21,18 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# }}} */
+"""Proto interface to asserv."""
+
import time
import numpy
import math
import proto
+from utils.observable import Observable
-class Asserv:
+class Proto:
+ """Provide functions to communicate with asserv using the proto
+ interface."""
stats_format = {
'C': 'HHH',
@@ -58,16 +63,36 @@ class Asserv:
'a0w': ('W', 2),
}
+ class Position (Observable):
+ """An observable position. To be used with register_pos.
+
+ - pos: (x, y) millimeters.
+ - angle: radian.
+
+ """
+
+ def __init__ (self):
+ Observable.__init__ (self)
+ self.pos = None
+ self.angle = None
+
+ def handle (self, x, y, a):
+ """Update position and notify observers."""
+ self.pos = (x, y)
+ self.angle = a
+ self.notify ()
+
def __init__ (self, file, time = time.time, **param):
+ """Initialise communication and send parameters to asserv."""
self.proto = proto.Proto (file, time, 0.1)
self.async = False
self.mseq = 0
self.mseq_ack = 0
self.a0seq = 0
self.a0seq_ack = 0
- self.proto.register ('A', 'BB', self.handle_ack)
+ self.proto.register ('A', 'BB', self.__handle_ack)
def make_handle (s):
- return lambda *args: self.handle_stats (s, *args)
+ return lambda *args: self.__handle_stats (s, *args)
for (s, f) in self.stats_format.iteritems ():
self.proto.register (s, f, make_handle (s))
self.stats_enabled = None
@@ -87,7 +112,8 @@ class Asserv:
self.send_param ()
def stats (self, *stats_items, **options):
- """Activate stats."""
+ """Activate stats. Take a list of items to record, and an optional
+ interval option."""
interval = 1
if 'interval' in options:
interval = options['interval']
@@ -115,6 +141,7 @@ class Asserv:
self.stats_line = [ ]
def get_stats (self, wait = None):
+ """Get recorded stats. Return an array with every requested stats."""
if wait:
self.wait (wait)
list = self.stats_list
@@ -207,13 +234,19 @@ class Asserv:
"""Set simulated position."""
self.proto.send ('h', 'BHHH', ord ('X'), x, y, a * 1024)
- def register_pos (self, func, interval = 225 / 4):
- """Will call func each time a position is received."""
- self.pos_func = func
- self.proto.register ('X', 'lll', self.handle_pos)
+ def register_pos (self, func = None, interval = 225 / 4):
+ """Will call func each time a position is received. If no function is
+ provided, use the Position observable object."""
+ if func is None:
+ self.position = self.Position ()
+ self.pos_func = self.position.handle
+ else:
+ self.pos_func = func
+ self.proto.register ('X', 'lll', self.__handle_pos)
self.proto.send ('X', 'B', interval)
def send_param (self):
+ """Send all parameters."""
p = self.param
self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256,
p['akp'] * 256)
@@ -239,11 +272,13 @@ class Asserv:
self.proto.send ('p', 'BH', ord ('l'), p['l'])
def write_eeprom (self):
+ """Request an EEPROM write."""
self.proto.send ('p', 'BB', ord ('E'), 1)
time.sleep (1)
self.wait (lambda: True)
- def handle_stats (self, stat, *args):
+ def __handle_stats (self, stat, *args):
+ """Record received stats."""
if self.stats_enabled is not None:
self.stats_line.extend (args)
if self.stats_counter == stat:
@@ -251,18 +286,22 @@ class Asserv:
self.stats_line = [ ]
self.stats_count += 1
- def handle_ack (self, mseq, a0seq):
+ def __handle_ack (self, mseq, a0seq):
+ """Record current acknowledge level and acknowledge reception."""
self.mseq_ack = mseq & 0x7f
self.a0seq_ack = a0seq & 0x7f
self.proto.send ('a', 'BB', mseq, a0seq)
- def handle_pos (self, x, y, a):
+ def __handle_pos (self, x, y, a):
+ """Handle position report."""
x = x / 256 * self.param['scale']
y = y / 256 * self.param['scale']
a = a * 2 * math.pi / (1 << 24)
self.pos_func (x, y, a)
def wait (self, cond = None, auto = False):
+ """Wait for a condition to become true, or for a number of recorded
+ statistics. If auto is True, do not wait in asynchronous mode."""
if auto and self.async:
return
try:
@@ -273,21 +312,26 @@ class Asserv:
self.proto.wait (cond)
def finished (self):
+ """Return True if movement commands have been acknowledged."""
return self.mseq == self.mseq_ack and self.a0seq == self.a0seq_ack
def free (self):
+ """Coast motors."""
self.proto.send ('w')
def reset (self):
+ """Coast all motors and reset asserv."""
self.proto.send ('w')
self.proto.send ('w', 'H', 0)
self.proto.send ('z')
self.proto.send ('z')
def close (self):
+ """Gracefully close communications."""
self.reset ()
self.wait (lambda: True)
self.proto.file.close ()
def fileno (self):
+ """Return fileno for select() calls."""
return self.proto.fileno ()
diff --git a/digital/asserv/tools/asserv/mex.py b/digital/asserv/tools/asserv/mex.py
new file mode 100644
index 00000000..c673e8e0
--- /dev/null
+++ b/digital/asserv/tools/asserv/mex.py
@@ -0,0 +1,100 @@
+# asserv - Position & speed motor control on AVR. {{{
+#
+# Copyright (C) 2009 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}} */
+"""Mex interface to asserv."""
+
+from utils.observable import Observable
+
+ID_POSITION = 0xa0
+ID_PWM = 0xa1
+ID_AUX = 0xa8
+
+class Mex:
+ """Handle communications with simulated asserv."""
+
+ class Position (Observable):
+ """Robot position.
+
+ - pos: (x, y) millimeters.
+ - angle: radian.
+
+ """
+
+ def __init__ (self, node):
+ Observable.__init__ (self)
+ self.pos = None
+ self.angle = None
+ node.register (ID_POSITION, self.__handle)
+
+ def __handle (self, msg):
+ x, y, a = msg.pop ('hhl')
+ self.pos = (x, y)
+ self.angle = float (a) / 1024
+ self.notify ()
+
+ class PWM (Observable):
+ """Motor PWM.
+
+ - pwm: current PWM value (hardware unit).
+
+ """
+
+ def __init__ (self, node):
+ Observable.__init__ (self)
+ self.pwm = None
+ node.register (ID_PWM, self.__handle)
+
+ def __handle (self, msg):
+ self.pwm = msg.pop ('hhh')
+ self.notify ()
+
+ class Aux (Observable):
+ """Auxiliary motor angle.
+
+ - angle: radian.
+
+ """
+
+ def __init__ (self):
+ Observable.__init__ (self)
+ self.angle = None
+
+ class Pack:
+ """Handle reception of several Aux for one message."""
+
+ def __init__ (self, node, list):
+ self.__list = list
+ node.register (ID_AUX, self.__handle)
+
+ def __handle (self, msg):
+ angles = msg.pop ('%dl' % len (self.__list))
+ for aux, angle in zip (self.__list, angles):
+ aux.angle = float (angle) / 1024
+ aux.notify ()
+
+ def __init__ (self, node):
+ self.position = self.Position (node)
+ self.pwm = self.PWM (node)
+ self.aux = (self.Aux (), )
+ self.__aux_pack = self.Aux.Pack (node, self.aux)
+