summaryrefslogtreecommitdiff
path: root/digital/mimot/tools/mimot/mimot.py
diff options
context:
space:
mode:
Diffstat (limited to 'digital/mimot/tools/mimot/mimot.py')
-rw-r--r--digital/mimot/tools/mimot/mimot.py201
1 files changed, 159 insertions, 42 deletions
diff --git a/digital/mimot/tools/mimot/mimot.py b/digital/mimot/tools/mimot/mimot.py
index 013a85ff..665947f5 100644
--- a/digital/mimot/tools/mimot/mimot.py
+++ b/digital/mimot/tools/mimot/mimot.py
@@ -44,40 +44,62 @@ class Proto:
# be in the same order than in mimot program.
stats_order = 'CSPW'
stats_items = {
- 'a0c': ('C', 0),
- 'a1c': ('C', 1),
- 'a0s': ('S', 0),
- 'a1s': ('S', 1),
- 'a0e': ('P', 0),
- 'a0i': ('P', 1),
- 'a1e': ('P', 2),
- 'a1i': ('P', 3),
- 'a0w': ('W', 0),
- 'a1w': ('W', 1),
+ 'lc': ('C', 0),
+ 'rc': ('C', 1),
+ 'ts': ('S', 0),
+ 'as': ('S', 1),
+ 'te': ('P', 0),
+ 'ti': ('P', 1),
+ 'ae': ('P', 2),
+ 'ai': ('P', 3),
+ 'lw': ('W', 0),
+ 'rw': ('W', 1),
}
- _index = dict (a0 = 0, a1 = 1)
+ _index = dict (t = ord ('t'), a = ord ('a'))
+
+ class Position (Observable):
+ """An observable position. To be used with register_pos.
+
+ - pos: (x, y) millimeters.
+ - angle: radian.
+
+ """
+
+ def __init__ (self):
+ Observable.__init__ (self)
+ self.pos = None
+ self.angle = None
+
+ def handle (self, x, y, a):
+ """Update position and notify observers."""
+ self.pos = (x, y)
+ self.angle = a
+ self.notify ()
def __init__ (self, file, time = time.time, **param):
"""Initialise communication and send parameters to asserv."""
self.proto = proto.Proto (file, time, 0.1)
self.async = False
- self.aseq = [ 0, 0 ]
- self.aseq_ack = [ 0, 0 ]
- self.proto.register ('A', 'BB', self.__handle_ack)
+ self.mseq = 0
+ self.mseq_ack = 0
+ self.proto.register ('A', 'B', self.__handle_ack)
def make_handle (s):
return lambda *args: self.__handle_stats (s, *args)
for (s, f) in self.stats_format.iteritems ():
self.proto.register (s, f, make_handle (s))
self.stats_enabled = None
self.param = dict (
- a0kp = 0, a0ki = 0, a0kd = 0,
- a0a = 1, a0sm = 0, a0ss = 0,
- a0be = 2048, a0bs = 0x10, a0bc = 20,
- a1kp = 0, a1ki = 0, a1kd = 0,
- a1a = 1, a1sm = 0, a1ss = 0,
- a1be = 2048, a1bs = 0x10, a1bc = 20,
+ scale = 1,
+ tkp = 0, tki = 0, tkd = 0,
+ ta = 1, tsm = 0, tss = 0,
+ tbe = 2048, tbs = 0x10, tbc = 20,
+ akp = 0, aki = 0, akd = 0,
+ aa = 1, asm = 0, ass = 0,
+ abe = 2048, abs = 0x10, abc = 20,
E = 1023, I = 1023, D = 1023,
+ c = 1, f = 0x1000,
+ l = 0x2000,
w = 0x00,
)
self.param.update (param)
@@ -135,39 +157,114 @@ class Proto:
def consign (self, w, c):
"""Consign offset."""
- self.proto.send ('c', 'Bh', self._index[w], c)
+ if w == 't':
+ self.proto.send ('c', 'hh', c, 0)
+ elif w == 'a':
+ self.proto.send ('c', 'hh', 0, c)
+ else:
+ raise IndexError
def speed (self, w, s):
"""Speed consign."""
- self.proto.send ('s', 'Bb', self._index[w], s)
+ if w == 't':
+ self.proto.send ('s', 'bb', s, 0)
+ elif w == 'a':
+ self.proto.send ('s', 'bb', 0, s)
+ else:
+ raise IndexError
def speed_pos (self, w, offset):
"""Speed controlled position consign."""
- i = self._index[w]
- self.aseq[i] += 1
- self.proto.send ('s', 'BlB', i, offset, self.aseq[i])
+ if w == 't':
+ self.mseq += 1
+ self.proto.send ('s', 'llB', self._dist (offset), 0, self.mseq)
+ elif w == 'a':
+ self.mseq += 1
+ self.proto.send ('s', 'llB', 0, self.dist (offset), self.mseq)
+ else:
+ raise IndexError
+ self.wait (self.finished, auto = True)
+
+ def speed_angle (self, w, angle):
+ """Speed controlled angle consign."""
+ if w == 'a':
+ self.mseq += 1
+ self.proto.send ('s', 'llB',
+ 0, int (round (angle * self.param['f'])), self.mseq)
+ else:
+ raise IndexError
+ self.wait (self.finished, auto = True)
+
+ def set_pos (self, x = None, y = None, a = None):
+ """Set current position."""
+ if x is not None:
+ self.proto.send ('p', 'cl', 'X', self._dist_f248 (x))
+ if y is not None:
+ self.proto.send ('p', 'cl', 'Y', self._dist_f248 (y))
+ if a is not None:
+ self.proto.send ('p', 'cl', 'A', self._angle_f824 (a))
+
+ def goto (self, x, y, backward = False, revert_ok = False):
+ """Go to position."""
+ self.mseq += 1
+ b = 0
+ if backward:
+ b |= 1
+ if revert_ok:
+ b |= 2
+ self.proto.send ('x', 'llBB',
+ self._dist_f248 (x), self._dist_f248 (y), b, self.mseq)
self.wait (self.finished, auto = True)
- def goto_pos (self, w, pos):
- """Go to absolute position."""
- i = self._index[w]
- self.aseq[i] += 1
- self.proto.send ('y', 'BhB', i, pos, self.aseq[i])
+ def goto_angle (self, a):
+ """Go to angle."""
+ self.mseq += 1
+ self.proto.send ('x', 'HB', self._angle_f16 (a), self.mseq)
self.wait (self.finished, auto = True)
- def clamp (self, w, s, pwm):
- """Clamp (speed control, then open loop PWM)."""
- i = self._index[w]
- self.aseq[i] += 1
- self.proto.send ('y', 'BBhB', i, s, pwm, self.aseq[i])
+ def goto_xya (self, x, y, a, backward = False, revert_ok = False):
+ """Go to position, then angle."""
+ self.mseq += 1
+ b = 0
+ if backward:
+ b |= 1
+ if revert_ok:
+ b |= 2
+ self.proto.send ('x', 'llHBB',
+ self._dist_f248 (x), self._dist_f248 (y),
+ self._angle_f16 (a), b, self.mseq)
self.wait (self.finished, auto = True)
+ def ftw (self, backward = True):
+ """Go to the wall."""
+ self.mseq += 1
+ self.proto.send ('f', 'BB', backward and 1 or 0, self.mseq)
+ self.wait (self.finished, auto = True)
+
+ def set_simu_pos (self, x, y, a):
+ """Set simulated position."""
+ self.proto.send ('h', 'chhh', 'X', int (round (x)), int (round (y)),
+ int (round (a * 1024)))
+
+ def register_pos (self, func = None, interval = 225 / 4):
+ """Will call func each time a position is received. If no function is
+ provided, use the Position observable object."""
+ if func is None:
+ self.position = self.Position ()
+ self.pos_func = self.position.handle
+ else:
+ self.pos_func = func
+ self.proto.register ('X', 'lll', self.__handle_pos)
+ self.proto.send ('X', 'B', interval)
+
def send_param (self):
"""Send all parameters."""
p = self.param
def f88 (x):
return int (round (x * (1 << 8)))
- for m in ('a0', 'a1'):
+ def f824 (x):
+ return int (round (x * (1 << 24)))
+ for m in ('t', 'a'):
index = self._index [m]
self.proto.send ('p', 'cBH', 'p', index, f88 (p[m + 'kp']))
self.proto.send ('p', 'cBH', 'i', index, f88 (p[m + 'ki']))
@@ -179,6 +276,9 @@ class Proto:
self.proto.send ('p', 'cH', 'E', p['E'])
self.proto.send ('p', 'cH', 'I', p['I'])
self.proto.send ('p', 'cH', 'D', p['D'])
+ self.proto.send ('p', 'cL', 'c', f824 (p['c']))
+ self.proto.send ('p', 'cH', 'f', p['f'])
+ self.proto.send ('p', 'cH', 'l', p['l'])
self.proto.send ('p', 'cB', 'w', p['w'])
def write_eeprom (self):
@@ -196,11 +296,17 @@ class Proto:
self.stats_line = [ ]
self.stats_count += 1
- def __handle_ack (self, a0seq, a1seq):
+ def __handle_ack (self, mseq):
"""Record current acknowledge level and acknowledge reception."""
- self.aseq_ack[0] = a0seq & 0x7f
- self.aseq_ack[1] = a1seq & 0x7f
- self.proto.send ('a', 'BB', a0seq, a1seq)
+ self.mseq_ack = mseq & 0x7f
+ self.proto.send ('a', 'B', mseq)
+
+ def __handle_pos (self, x, y, a):
+ """Handle position report."""
+ x = x / 256 * self.param['scale']
+ y = y / 256 * self.param['scale']
+ a = a * 2 * math.pi / (1 << 24)
+ self.pos_func (x, y, a)
def wait (self, cond = None, auto = False):
"""Wait for a condition to become true, or for a number of recorded
@@ -216,8 +322,7 @@ class Proto:
def finished (self):
"""Return True if movement commands have been acknowledged."""
- return (self.aseq[0] == self.aseq_ack[0]
- and self.aseq[1] == self.aseq_ack[1])
+ return self.mseq == self.mseq_ack
def free (self):
"""Coast motors."""
@@ -239,3 +344,15 @@ class Proto:
"""Return fileno for select() calls."""
return self.proto.fileno ()
+ def _dist (self, d):
+ return int (round (d / self.param['scale']))
+
+ def _dist_f248 (self, d):
+ return int (round ((1 << 8) * d / self.param['scale']))
+
+ def _angle_f16 (self, a):
+ return int (round ((1 << 16) * a / (2 * math.pi))) & 0xffff
+
+ def _angle_f824 (self, a):
+ return int (round ((1 << 24) * a / (2 * math.pi)))
+