From 6528ebfe4928a90c6e1cc67ca25b289b7ae93ffc Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Tue, 3 Jun 2008 21:48:19 +0200 Subject: * host, digital/asserv/tools, digital/avr/modules: - cleaned up python scripts hierarchy. --- digital/asserv/tools/asserv/__init__.py | 1 + digital/asserv/tools/asserv/asserv.py | 255 ++++++++++++++++++++++++++++++++ digital/asserv/tools/asserv/init.py | 18 +++ 3 files changed, 274 insertions(+) create mode 100644 digital/asserv/tools/asserv/__init__.py create mode 100644 digital/asserv/tools/asserv/asserv.py create mode 100644 digital/asserv/tools/asserv/init.py (limited to 'digital/asserv/tools/asserv') diff --git a/digital/asserv/tools/asserv/__init__.py b/digital/asserv/tools/asserv/__init__.py new file mode 100644 index 00000000..a498dc6b --- /dev/null +++ b/digital/asserv/tools/asserv/__init__.py @@ -0,0 +1 @@ +from asserv import Asserv diff --git a/digital/asserv/tools/asserv/asserv.py b/digital/asserv/tools/asserv/asserv.py new file mode 100644 index 00000000..1f7c449b --- /dev/null +++ b/digital/asserv/tools/asserv/asserv.py @@ -0,0 +1,255 @@ +import proto, time +import numpy + +class Asserv: + + stats_format = { + 'C': 'HHH', + 'Z': 'H', + 'S': 'bbb', + 'P': 'hhhhhh', + 'W': 'hhh', + } + # The last occuring stats will increment stats_count, so they have to + # be in the same order than in asserv program. + stats_order = 'CZSPW' + stats_items = { + 'lc': ('C', 0), + 'rc': ('C', 1), + 'a0c': ('C', 2), + 'a0z': ('Z', 0), + 'ts': ('S', 0), + 'as': ('S', 1), + 'a0s': ('S', 2), + 'te': ('P', 0), + 'ti': ('P', 1), + 'ae': ('P', 2), + 'ai': ('P', 3), + 'a0e': ('P', 4), + 'a0i': ('P', 5), + 'lw': ('W', 0), + 'rw': ('W', 1), + 'a0w': ('W', 2), + } + + def __init__ (self, file, **param): + self.proto = proto.Proto (file, time.time, 0.1) + self.async = False + self.seq = 0 + self.seq_ack = 0 + self.proto.register ('A', 'BB', self.handle_ack) + def make_handle (s): + return lambda *args: self.handle_stats (s, *args) + for (s, f) in self.stats_format.iteritems (): + self.proto.register (s, f, make_handle (s)) + self.stats_enabled = None + self.param = dict ( + scale = 1, + tkp = 0, tki = 0, tkd = 0, + akp = 0, aki = 0, akd = 0, + a0kp = 0, a0ki = 0, a0kd = 0, + E = 1023, I = 1023, D = 1023, b = 15000, + ta = 1, aa = 1, a0a = 1, + tsm = 0, asm = 0, tss = 0, ass = 0, a0sm = 0, a0ss = 0, + c = 1, f = 0x1000, + l = 0x2000, + ) + self.param.update (param) + self.send_param () + + def stats (self, *stats_items, **options): + """Activate stats.""" + interval = 1 + if 'interval' in options: + interval = options['interval'] + # Build list of stats letters. + stats = [self.stats_items[i][0] for i in stats_items] + stats = [s for s in self.stats_order if s in stats] + stats_last_pos = 0 + stats_pos = { } + for s in stats: + stats_pos[s] = stats_last_pos + stats_last_pos += len (self.stats_format[s]) + # Build stats item positions. + self.stats_items_pos = [ ] + for i in stats_items: + id = self.stats_items[i] + self.stats_items_pos.append (stats_pos[id[0]] + id[1]) + # Enable stats. + for s in stats: + self.proto.send (s, 'B', interval) + # Prepare aquisition. + self.stats_enabled = stats + self.stats_counter = stats[-1] + self.stats_count = 0 + self.stats_list = [ ] + self.stats_line = [ ] + + def get_stats (self, wait = None): + if wait: + self.wait (wait) + list = self.stats_list + # Drop first line as it might be garbage. + del list[0] + for s in reversed (self.stats_enabled): + self.proto.send (s, 'B', 0) + # Extract asked stats. + array = numpy.array (list) + array = array[:, self.stats_items_pos] + # Cleanup. + self.stats_enabled = None + del self.stats_items_pos + del self.stats_counter + del self.stats_count + del self.stats_list + del self.stats_line + return array + + def consign (self, w, c): + """Consign offset.""" + if w == 't': + self.proto.send ('c', 'hh', c, 0) + elif w == 'a': + self.proto.send ('c', 'hh', 0, c) + else: + assert w == 'a0' + self.proto.send ('c', 'h', c) + + def speed (self, w, s): + """Speed consign.""" + if w == 't': + self.proto.send ('s', 'bb', s, 0) + elif w == 'a': + self.proto.send ('s', 'bb', 0, s) + else: + assert w == 'a0' + self.proto.send ('s', 'b', s) + + def speed_pos (self, w, offset): + """Speed controlled position consign.""" + self.seq += 1 + if w == 't': + self.proto.send ('s', 'LLB', offset, 0, self.seq) + elif w == 'a': + self.proto.send ('s', 'LLB', 0, offset, self.seq) + else: + assert w == 'a0' + self.proto.send ('s', 'LB', offset, self.seq) + self.wait (self.finished, auto = True) + + def set_pos (self, x = None, y = None, a = None): + """Set current position.""" + if x is not None: + self.proto.send ('p', 'BL', ord ('X'), + 256 * x / self.param['scale']) + if y is not None: + self.proto.send ('p', 'BL', ord ('Y'), + 256 * y / self.param['scale']) + if a is not None: + self.proto.send ('p', 'BL', ord ('A'), a * (1 << 24) / 360) + + def goto (self, x, y, backward_ok = False): + """Go to position.""" + self.seq += 1 + self.proto.send (backward_ok and 'r' or 'x', 'LLB', + 256 * x / self.param['scale'], + 256 * y / self.param['scale'], self.seq) + self.wait (self.finished, auto = True) + + def goto_angle (self, a): + """Go to angle.""" + self.seq += 1 + self.proto.send ('x', 'HB', a * (1 << 16) / 360, self.seq) + self.wait (self.finished, auto = True) + + def goto_xya (self, x, y, a, backward_ok = False): + """Go to position, then angle.""" + self.seq += 1 + self.proto.send (backward_ok and 'r' or 'x', 'LLHB', + 256 * x / self.param['scale'], + 256 * y / self.param['scale'], + a * (1 << 16) / 360, self.seq) + self.wait (self.finished, auto = True) + + def register_pos (self, func, interval = 225 / 4): + """Will call func each time a position is received.""" + self.pos_func = func + self.proto.register ('X', 'lll', self.handle_pos) + self.proto.send ('X', 'B', interval) + + def send_param (self): + p = self.param + self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256, + p['akp'] * 256) + self.proto.send ('p', 'BHH', ord ('i'), p['tki'] * 256, + p['aki'] * 256) + self.proto.send ('p', 'BHH', ord ('d'), p['tkd'] * 256, + p['akd'] * 256) + self.proto.send ('p', 'BH', ord ('p'), p['a0kp'] * 256) + self.proto.send ('p', 'BH', ord ('i'), p['a0ki'] * 256) + self.proto.send ('p', 'BH', ord ('d'), p['a0kd'] * 256) + self.proto.send ('p', 'BH', ord ('E'), p['E']) + self.proto.send ('p', 'BH', ord ('I'), p['I']) + self.proto.send ('p', 'BH', ord ('D'), p['D']) + self.proto.send ('p', 'BH', ord ('b'), p['b']) + self.proto.send ('p', 'BHH', ord ('a'), p['ta'] * 256, + p['aa'] * 256) + self.proto.send ('p', 'BH', ord ('a'), p['a0a'] * 256) + self.proto.send ('p', 'BBBBB', ord ('s'), p['tsm'], p['asm'], + p['tss'], p['ass']) + self.proto.send ('p', 'BBB', ord ('s'), p['a0sm'], p['a0ss']) + self.proto.send ('p', 'BL', ord ('c'), p['c'] * 256 * 256 * 256) + self.proto.send ('p', 'BH', ord ('f'), p['f']) + self.proto.send ('p', 'BH', ord ('l'), p['l']) + + def write_eeprom (self): + self.proto.send ('p', 'BB', ord ('E'), 1) + time.sleep (1) + self.wait (lambda: True) + + def handle_stats (self, stat, *args): + if self.stats_enabled is not None: + self.stats_line.extend (args) + if self.stats_counter == stat: + self.stats_list.append (self.stats_line) + self.stats_line = [ ] + self.stats_count += 1 + + def handle_ack (self, mseq, a0seq): + self.seq_ack = mseq & 0x7f + + def handle_pos (self, x, y, a): + x = x / 256 * self.param['scale'] + y = y / 256 * self.param['scale'] + a = a * 360 / (1 << 24) + self.pos_func (x, y, a) + + def wait (self, cond = None, auto = False): + if auto and self.async: + return + try: + cond_count = int (cond) + cond = lambda: self.stats_count > cond_count + except TypeError: + pass + self.proto.wait (cond) + + def finished (self): + return self.seq == self.seq_ack + + def free (self): + self.proto.send ('w') + + def reset (self): + self.proto.send ('w') + self.proto.send ('w', 'H', 0) + self.proto.send ('z') + self.proto.send ('z') + + def close (self): + self.reset () + self.wait (lambda: True) + self.proto.file.close () + + def fileno (self): + return self.proto.fileno () diff --git a/digital/asserv/tools/asserv/init.py b/digital/asserv/tools/asserv/init.py new file mode 100644 index 00000000..42c34f5d --- /dev/null +++ b/digital/asserv/tools/asserv/init.py @@ -0,0 +1,18 @@ +"""Default parameters for asserv.""" +host = dict ( + scale = 0.0395840674352314, f = 0xdd1, + tkp = 1, tkd = 16, akp = 2, akd = 16, + a0kp = 1, a0kd = 16, + E = 0x3ff, D = 0x1ff, + ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, + a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, + ) +target = dict ( + scale = 0.0413530725332892, f = 0xcfa, + c = float (0xffefbe) / (1 << 24), + tkp = 1, tkd = 16, akp = 2, akd = 16, + a0kp = 1, a0kd = 0.05, + E = 0x3ff, D = 0x1ff, + ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, + a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, + ) -- cgit v1.2.3