summaryrefslogtreecommitdiffhomepage
path: root/digital
diff options
context:
space:
mode:
authorNicolas Schodet2010-04-29 23:04:09 +0200
committerNicolas Schodet2010-04-29 23:04:09 +0200
commit77b5cf5a300974bf84353fc7d8badbb5cc253a68 (patch)
tree72aa3dc69e83fc4fb8e2e1d7d5c6c92d1c7323bd /digital
parentdf7d0d94ecc69fa9aadd2ab90db89b5ef1a74030 (diff)
digital/mimot/tools: add mimot tools
Diffstat (limited to 'digital')
-rw-r--r--digital/mimot/tools/mimot/__init__.py2
-rw-r--r--digital/mimot/tools/mimot/init.py16
-rw-r--r--digital/mimot/tools/mimot/mex.py60
-rw-r--r--digital/mimot/tools/mimot/mimot.py234
-rw-r--r--digital/mimot/tools/write_eeprom.py16
5 files changed, 328 insertions, 0 deletions
diff --git a/digital/mimot/tools/mimot/__init__.py b/digital/mimot/tools/mimot/__init__.py
new file mode 100644
index 00000000..e6a0915b
--- /dev/null
+++ b/digital/mimot/tools/mimot/__init__.py
@@ -0,0 +1,2 @@
+from mimot import Proto
+from mex import Mex
diff --git a/digital/mimot/tools/mimot/init.py b/digital/mimot/tools/mimot/init.py
new file mode 100644
index 00000000..57c7fc23
--- /dev/null
+++ b/digital/mimot/tools/mimot/init.py
@@ -0,0 +1,16 @@
+"""Default parameters for asserv."""
+host = dict (
+ a0kp = 4,
+ a0a = 0.5, a0sm = 0x0a, a0ss = 0x05,
+ a1kp = 4,
+ a1a = 0.5, a1sm = 0x0a, a1ss = 0x05,
+ E = 0x3ff, D = 0x1ff,
+ )
+target = dict (
+ a0kp = 4,
+ a0a = 0.5, a0sm = 0x0a, a0ss = 0x05,
+ a1kp = 4,
+ a1a = 0.5, a1sm = 0x0a, a1ss = 0x05,
+ E = 0x3ff, D = 0x1ff,
+ w = 0x00,
+ )
diff --git a/digital/mimot/tools/mimot/mex.py b/digital/mimot/tools/mimot/mex.py
new file mode 100644
index 00000000..b32f2925
--- /dev/null
+++ b/digital/mimot/tools/mimot/mex.py
@@ -0,0 +1,60 @@
+# mimot - Mini motor control, with motor driver. {{{
+#
+# Copyright (C) 2010 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""Mex interface to mimot."""
+
+from utils.observable import Observable
+
+ID_AUX = 0xc8
+
+class Mex:
+ """Handle communications with simulated mimot."""
+
+ class Aux (Observable):
+ """Auxiliary motor angle.
+
+ - angle: radian.
+
+ """
+
+ def __init__ (self):
+ Observable.__init__ (self)
+ self.angle = None
+
+ class Pack:
+ """Handle reception of several Aux for one message."""
+
+ def __init__ (self, node, list):
+ self.__list = list
+ node.register (ID_AUX, self.__handle)
+
+ def __handle (self, msg):
+ angles = msg.pop ('%dl' % len (self.__list))
+ for aux, angle in zip (self.__list, angles):
+ aux.angle = float (angle) / 1024
+ aux.notify ()
+
+ def __init__ (self, node):
+ self.aux = (self.Aux (), self.Aux ())
+ self.__aux_pack = self.Aux.Pack (node, self.aux)
+
diff --git a/digital/mimot/tools/mimot/mimot.py b/digital/mimot/tools/mimot/mimot.py
new file mode 100644
index 00000000..2844c741
--- /dev/null
+++ b/digital/mimot/tools/mimot/mimot.py
@@ -0,0 +1,234 @@
+# mimot - Mini motor control, with motor driver. {{{
+#
+# Copyright (C) 2010 Nicolas Schodet
+#
+# APBTeam:
+# Web: http://apbteam.org/
+# Email: team AT apbteam DOT org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# }}}
+"""Proto interface to mimot."""
+
+import time
+import numpy
+import math
+
+import proto
+from utils.observable import Observable
+
+class Proto:
+ """Provide functions to communicate with mimot using the proto
+ interface."""
+
+ stats_format = {
+ 'C': 'HH',
+ 'S': 'bb',
+ 'P': 'hhhh',
+ 'W': 'hh',
+ }
+ # The last occuring stats will increment stats_count, so they have to
+ # be in the same order than in mimot program.
+ stats_order = 'CSPW'
+ stats_items = {
+ 'a0c': ('C', 0),
+ 'a1c': ('C', 1),
+ 'a0s': ('S', 0),
+ 'a1s': ('S', 1),
+ 'a0e': ('P', 0),
+ 'a0i': ('P', 1),
+ 'a1e': ('P', 2),
+ 'a1i': ('P', 3),
+ 'a0w': ('W', 0),
+ 'a1w': ('W', 1),
+ }
+
+ _index = dict (a0 = 0, a1 = 1)
+
+ def __init__ (self, file, time = time.time, **param):
+ """Initialise communication and send parameters to asserv."""
+ self.proto = proto.Proto (file, time, 0.1)
+ self.async = False
+ self.aseq = [ 0, 0 ]
+ self.aseq_ack = [ 0, 0 ]
+ self.proto.register ('A', 'BB', self.__handle_ack)
+ def make_handle (s):
+ return lambda *args: self.__handle_stats (s, *args)
+ for (s, f) in self.stats_format.iteritems ():
+ self.proto.register (s, f, make_handle (s))
+ self.stats_enabled = None
+ self.param = dict (
+ a0kp = 0, a0ki = 0, a0kd = 0,
+ a0a = 1, a0sm = 0, a0ss = 0,
+ a0be = 2048, a0bs = 0x10, a0bc = 20,
+ a1kp = 0, a1ki = 0, a1kd = 0,
+ a1a = 1, a1sm = 0, a1ss = 0,
+ a1be = 2048, a1bs = 0x10, a1bc = 20,
+ E = 1023, I = 1023, D = 1023,
+ w = 0x00,
+ )
+ self.param.update (param)
+ self.send_param ()
+
+ def stats (self, *stats_items, **options):
+ """Activate stats. Take a list of items to record, and an optional
+ interval option."""
+ interval = 1
+ if 'interval' in options:
+ interval = options['interval']
+ # Build list of stats letters.
+ stats = [self.stats_items[i][0] for i in stats_items]
+ stats = [s for s in self.stats_order if s in stats]
+ stats_last_pos = 0
+ stats_pos = { }
+ for s in stats:
+ stats_pos[s] = stats_last_pos
+ stats_last_pos += len (self.stats_format[s])
+ # Build stats item positions.
+ self.stats_items_pos = [ ]
+ for i in stats_items:
+ id = self.stats_items[i]
+ self.stats_items_pos.append (stats_pos[id[0]] + id[1])
+ # Enable stats.
+ for s in stats:
+ self.proto.send (s, 'B', interval)
+ # Prepare aquisition.
+ self.stats_enabled = stats
+ self.stats_counter = stats[-1]
+ self.stats_count = 0
+ self.stats_list = [ ]
+ self.stats_line = [ ]
+
+ def get_stats (self, wait = None):
+ """Get recorded stats. Return an array with every requested stats."""
+ if wait:
+ self.wait (wait)
+ list = self.stats_list
+ # Drop first line as it might be garbage.
+ del list[0]
+ for s in reversed (self.stats_enabled):
+ self.proto.send (s, 'B', 0)
+ # Extract asked stats.
+ array = numpy.array (list)
+ array = array[:, self.stats_items_pos]
+ # Cleanup.
+ self.stats_enabled = None
+ del self.stats_items_pos
+ del self.stats_counter
+ del self.stats_count
+ del self.stats_list
+ del self.stats_line
+ return array
+
+ def consign (self, w, c):
+ """Consign offset."""
+ self.proto.send ('c', 'Bh', self._index[w], c)
+
+ def speed (self, w, s):
+ """Speed consign."""
+ self.proto.send ('s', 'Bb', self._index[w], s)
+
+ def speed_pos (self, w, offset):
+ """Speed controlled position consign."""
+ i = self._index[w]
+ self.aseq[i] += 1
+ self.proto.send ('s', 'BlB', i, offset, self.aseq[i])
+ self.wait (self.finished, auto = True)
+
+ def goto_pos (self, w, pos):
+ """Go to absolute position."""
+ i = self._index[w]
+ self.aseq[i] += 1
+ self.proto.send ('y', 'BhB', i, pos, self.aseq[i])
+ self.wait (self.finished, auto = True)
+
+ def send_param (self):
+ """Send all parameters."""
+ p = self.param
+ def f88 (x):
+ return int (round (x * (1 << 8)))
+ for m in ('a0', 'a1'):
+ index = self._index [m]
+ self.proto.send ('p', 'cBH', 'p', index, f88 (p[m + 'kp']))
+ self.proto.send ('p', 'cBH', 'i', index, f88 (p[m + 'ki']))
+ self.proto.send ('p', 'cBH', 'd', index, f88 (p[m + 'kd']))
+ self.proto.send ('p', 'cBH', 'a', index, f88 (p[m + 'a']))
+ self.proto.send ('p', 'cBBB', 's', index, p[m + 'sm'], p[m + 'ss'])
+ self.proto.send ('p', 'cBHHB', 'b', index, p[m + 'be'],
+ p[m + 'bs'], p[m + 'bc'])
+ self.proto.send ('p', 'cH', 'E', p['E'])
+ self.proto.send ('p', 'cH', 'I', p['I'])
+ self.proto.send ('p', 'cH', 'D', p['D'])
+ self.proto.send ('p', 'cB', 'w', p['w'])
+
+ def write_eeprom (self):
+ """Request an EEPROM write."""
+ self.proto.send ('p', 'cB', 'E', 1)
+ time.sleep (1)
+ self.wait (lambda: True)
+
+ def __handle_stats (self, stat, *args):
+ """Record received stats."""
+ if self.stats_enabled is not None:
+ self.stats_line.extend (args)
+ if self.stats_counter == stat:
+ self.stats_list.append (self.stats_line)
+ self.stats_line = [ ]
+ self.stats_count += 1
+
+ def __handle_ack (self, a0seq, a1seq):
+ """Record current acknowledge level and acknowledge reception."""
+ self.aseq_ack[0] = a0seq & 0x7f
+ self.aseq_ack[1] = a1seq & 0x7f
+ self.proto.send ('a', 'BB', a0seq, a1seq)
+
+ def wait (self, cond = None, auto = False):
+ """Wait for a condition to become true, or for a number of recorded
+ statistics. If auto is True, do not wait in asynchronous mode."""
+ if auto and self.async:
+ return
+ try:
+ cond_count = int (cond)
+ cond = lambda: self.stats_count > cond_count
+ except TypeError:
+ pass
+ self.proto.wait (cond)
+
+ def finished (self):
+ """Return True if movement commands have been acknowledged."""
+ return (self.aseq[0] == self.aseq_ack[0]
+ and self.aseq[1] == self.aseq_ack[1])
+
+ def free (self):
+ """Coast motors."""
+ self.proto.send ('w')
+
+ def reset (self):
+ """Coast all motors and reset asserv."""
+ self.proto.send ('w')
+ self.proto.send ('z')
+ self.proto.send ('z')
+
+ def close (self):
+ """Gracefully close communications."""
+ self.reset ()
+ self.wait (lambda: True)
+ self.proto.file.close ()
+
+ def fileno (self):
+ """Return fileno for select() calls."""
+ return self.proto.fileno ()
+
diff --git a/digital/mimot/tools/write_eeprom.py b/digital/mimot/tools/write_eeprom.py
new file mode 100644
index 00000000..ab2ae315
--- /dev/null
+++ b/digital/mimot/tools/write_eeprom.py
@@ -0,0 +1,16 @@
+import sys
+
+import mimot
+import mimot.init
+import proto.popen_io
+import serial
+
+if sys.argv[1] == '!':
+ io = proto.popen_io.PopenIO (sys.argv[2:])
+ init = mimot.init.host
+else:
+ io = serial.Serial (sys.argv[1])
+ init = mimot.init.target
+a = mimot.Proto (io, **init)
+a.write_eeprom ()
+a.close ()