From 6528ebfe4928a90c6e1cc67ca25b289b7ae93ffc Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Tue, 3 Jun 2008 21:48:19 +0200 Subject: * host, digital/asserv/tools, digital/avr/modules: - cleaned up python scripts hierarchy. --- digital/asserv/tools/asserv.py | 255 ----------------------------------------- 1 file changed, 255 deletions(-) delete mode 100644 digital/asserv/tools/asserv.py (limited to 'digital/asserv/tools/asserv.py') diff --git a/digital/asserv/tools/asserv.py b/digital/asserv/tools/asserv.py deleted file mode 100644 index 1f7c449b..00000000 --- a/digital/asserv/tools/asserv.py +++ /dev/null @@ -1,255 +0,0 @@ -import proto, time -import numpy - -class Asserv: - - stats_format = { - 'C': 'HHH', - 'Z': 'H', - 'S': 'bbb', - 'P': 'hhhhhh', - 'W': 'hhh', - } - # The last occuring stats will increment stats_count, so they have to - # be in the same order than in asserv program. - stats_order = 'CZSPW' - stats_items = { - 'lc': ('C', 0), - 'rc': ('C', 1), - 'a0c': ('C', 2), - 'a0z': ('Z', 0), - 'ts': ('S', 0), - 'as': ('S', 1), - 'a0s': ('S', 2), - 'te': ('P', 0), - 'ti': ('P', 1), - 'ae': ('P', 2), - 'ai': ('P', 3), - 'a0e': ('P', 4), - 'a0i': ('P', 5), - 'lw': ('W', 0), - 'rw': ('W', 1), - 'a0w': ('W', 2), - } - - def __init__ (self, file, **param): - self.proto = proto.Proto (file, time.time, 0.1) - self.async = False - self.seq = 0 - self.seq_ack = 0 - self.proto.register ('A', 'BB', self.handle_ack) - def make_handle (s): - return lambda *args: self.handle_stats (s, *args) - for (s, f) in self.stats_format.iteritems (): - self.proto.register (s, f, make_handle (s)) - self.stats_enabled = None - self.param = dict ( - scale = 1, - tkp = 0, tki = 0, tkd = 0, - akp = 0, aki = 0, akd = 0, - a0kp = 0, a0ki = 0, a0kd = 0, - E = 1023, I = 1023, D = 1023, b = 15000, - ta = 1, aa = 1, a0a = 1, - tsm = 0, asm = 0, tss = 0, ass = 0, a0sm = 0, a0ss = 0, - c = 1, f = 0x1000, - l = 0x2000, - ) - self.param.update (param) - self.send_param () - - def stats (self, *stats_items, **options): - """Activate stats.""" - interval = 1 - if 'interval' in options: - interval = options['interval'] - # Build list of stats letters. - stats = [self.stats_items[i][0] for i in stats_items] - stats = [s for s in self.stats_order if s in stats] - stats_last_pos = 0 - stats_pos = { } - for s in stats: - stats_pos[s] = stats_last_pos - stats_last_pos += len (self.stats_format[s]) - # Build stats item positions. - self.stats_items_pos = [ ] - for i in stats_items: - id = self.stats_items[i] - self.stats_items_pos.append (stats_pos[id[0]] + id[1]) - # Enable stats. - for s in stats: - self.proto.send (s, 'B', interval) - # Prepare aquisition. - self.stats_enabled = stats - self.stats_counter = stats[-1] - self.stats_count = 0 - self.stats_list = [ ] - self.stats_line = [ ] - - def get_stats (self, wait = None): - if wait: - self.wait (wait) - list = self.stats_list - # Drop first line as it might be garbage. - del list[0] - for s in reversed (self.stats_enabled): - self.proto.send (s, 'B', 0) - # Extract asked stats. - array = numpy.array (list) - array = array[:, self.stats_items_pos] - # Cleanup. - self.stats_enabled = None - del self.stats_items_pos - del self.stats_counter - del self.stats_count - del self.stats_list - del self.stats_line - return array - - def consign (self, w, c): - """Consign offset.""" - if w == 't': - self.proto.send ('c', 'hh', c, 0) - elif w == 'a': - self.proto.send ('c', 'hh', 0, c) - else: - assert w == 'a0' - self.proto.send ('c', 'h', c) - - def speed (self, w, s): - """Speed consign.""" - if w == 't': - self.proto.send ('s', 'bb', s, 0) - elif w == 'a': - self.proto.send ('s', 'bb', 0, s) - else: - assert w == 'a0' - self.proto.send ('s', 'b', s) - - def speed_pos (self, w, offset): - """Speed controlled position consign.""" - self.seq += 1 - if w == 't': - self.proto.send ('s', 'LLB', offset, 0, self.seq) - elif w == 'a': - self.proto.send ('s', 'LLB', 0, offset, self.seq) - else: - assert w == 'a0' - self.proto.send ('s', 'LB', offset, self.seq) - self.wait (self.finished, auto = True) - - def set_pos (self, x = None, y = None, a = None): - """Set current position.""" - if x is not None: - self.proto.send ('p', 'BL', ord ('X'), - 256 * x / self.param['scale']) - if y is not None: - self.proto.send ('p', 'BL', ord ('Y'), - 256 * y / self.param['scale']) - if a is not None: - self.proto.send ('p', 'BL', ord ('A'), a * (1 << 24) / 360) - - def goto (self, x, y, backward_ok = False): - """Go to position.""" - self.seq += 1 - self.proto.send (backward_ok and 'r' or 'x', 'LLB', - 256 * x / self.param['scale'], - 256 * y / self.param['scale'], self.seq) - self.wait (self.finished, auto = True) - - def goto_angle (self, a): - """Go to angle.""" - self.seq += 1 - self.proto.send ('x', 'HB', a * (1 << 16) / 360, self.seq) - self.wait (self.finished, auto = True) - - def goto_xya (self, x, y, a, backward_ok = False): - """Go to position, then angle.""" - self.seq += 1 - self.proto.send (backward_ok and 'r' or 'x', 'LLHB', - 256 * x / self.param['scale'], - 256 * y / self.param['scale'], - a * (1 << 16) / 360, self.seq) - self.wait (self.finished, auto = True) - - def register_pos (self, func, interval = 225 / 4): - """Will call func each time a position is received.""" - self.pos_func = func - self.proto.register ('X', 'lll', self.handle_pos) - self.proto.send ('X', 'B', interval) - - def send_param (self): - p = self.param - self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256, - p['akp'] * 256) - self.proto.send ('p', 'BHH', ord ('i'), p['tki'] * 256, - p['aki'] * 256) - self.proto.send ('p', 'BHH', ord ('d'), p['tkd'] * 256, - p['akd'] * 256) - self.proto.send ('p', 'BH', ord ('p'), p['a0kp'] * 256) - self.proto.send ('p', 'BH', ord ('i'), p['a0ki'] * 256) - self.proto.send ('p', 'BH', ord ('d'), p['a0kd'] * 256) - self.proto.send ('p', 'BH', ord ('E'), p['E']) - self.proto.send ('p', 'BH', ord ('I'), p['I']) - self.proto.send ('p', 'BH', ord ('D'), p['D']) - self.proto.send ('p', 'BH', ord ('b'), p['b']) - self.proto.send ('p', 'BHH', ord ('a'), p['ta'] * 256, - p['aa'] * 256) - self.proto.send ('p', 'BH', ord ('a'), p['a0a'] * 256) - self.proto.send ('p', 'BBBBB', ord ('s'), p['tsm'], p['asm'], - p['tss'], p['ass']) - self.proto.send ('p', 'BBB', ord ('s'), p['a0sm'], p['a0ss']) - self.proto.send ('p', 'BL', ord ('c'), p['c'] * 256 * 256 * 256) - self.proto.send ('p', 'BH', ord ('f'), p['f']) - self.proto.send ('p', 'BH', ord ('l'), p['l']) - - def write_eeprom (self): - self.proto.send ('p', 'BB', ord ('E'), 1) - time.sleep (1) - self.wait (lambda: True) - - def handle_stats (self, stat, *args): - if self.stats_enabled is not None: - self.stats_line.extend (args) - if self.stats_counter == stat: - self.stats_list.append (self.stats_line) - self.stats_line = [ ] - self.stats_count += 1 - - def handle_ack (self, mseq, a0seq): - self.seq_ack = mseq & 0x7f - - def handle_pos (self, x, y, a): - x = x / 256 * self.param['scale'] - y = y / 256 * self.param['scale'] - a = a * 360 / (1 << 24) - self.pos_func (x, y, a) - - def wait (self, cond = None, auto = False): - if auto and self.async: - return - try: - cond_count = int (cond) - cond = lambda: self.stats_count > cond_count - except TypeError: - pass - self.proto.wait (cond) - - def finished (self): - return self.seq == self.seq_ack - - def free (self): - self.proto.send ('w') - - def reset (self): - self.proto.send ('w') - self.proto.send ('w', 'H', 0) - self.proto.send ('z') - self.proto.send ('z') - - def close (self): - self.reset () - self.wait (lambda: True) - self.proto.file.close () - - def fileno (self): - return self.proto.fileno () -- cgit v1.2.3