From 2e95e3a33bcb34aeec66551503c692c1cb80ab61 Mon Sep 17 00:00:00 2001 From: Nicolas Schodet Date: Mon, 9 Feb 2009 20:05:30 +0100 Subject: * all python: - changed tabs to spaces. --- digital/asserv/tools/asserv/asserv.py | 422 +++++++++++++------------- digital/asserv/tools/asserv/init.py | 30 +- digital/asserv/tools/inter_asserv.py | 126 ++++---- digital/asserv/tools/step.py | 4 +- digital/avr/modules/path/test/test_path.py | 182 +++++------ digital/avr/modules/twi/test/test_twi_host.py | 4 +- digital/io/tools/io/init.py | 8 +- digital/io/tools/io/io.py | 40 +-- digital/io/tools/test_simu.py | 76 ++--- 9 files changed, 446 insertions(+), 446 deletions(-) (limited to 'digital') diff --git a/digital/asserv/tools/asserv/asserv.py b/digital/asserv/tools/asserv/asserv.py index 05a07130..2424a88b 100644 --- a/digital/asserv/tools/asserv/asserv.py +++ b/digital/asserv/tools/asserv/asserv.py @@ -5,264 +5,264 @@ import math class Asserv: stats_format = { - 'C': 'HHH', - 'Z': 'H', - 'S': 'bbb', - 'P': 'hhhhhh', - 'W': 'hhh', - } + 'C': 'HHH', + 'Z': 'H', + 'S': 'bbb', + 'P': 'hhhhhh', + 'W': 'hhh', + } # The last occuring stats will increment stats_count, so they have to # be in the same order than in asserv program. stats_order = 'CZSPW' stats_items = { - 'lc': ('C', 0), - 'rc': ('C', 1), - 'a0c': ('C', 2), - 'a0z': ('Z', 0), - 'ts': ('S', 0), - 'as': ('S', 1), - 'a0s': ('S', 2), - 'te': ('P', 0), - 'ti': ('P', 1), - 'ae': ('P', 2), - 'ai': ('P', 3), - 'a0e': ('P', 4), - 'a0i': ('P', 5), - 'lw': ('W', 0), - 'rw': ('W', 1), - 'a0w': ('W', 2), - } + 'lc': ('C', 0), + 'rc': ('C', 1), + 'a0c': ('C', 2), + 'a0z': ('Z', 0), + 'ts': ('S', 0), + 'as': ('S', 1), + 'a0s': ('S', 2), + 'te': ('P', 0), + 'ti': ('P', 1), + 'ae': ('P', 2), + 'ai': ('P', 3), + 'a0e': ('P', 4), + 'a0i': ('P', 5), + 'lw': ('W', 0), + 'rw': ('W', 1), + 'a0w': ('W', 2), + } def __init__ (self, file, time = time.time, **param): - self.proto = proto.Proto (file, time, 0.1) - self.async = False - self.mseq = 0 - self.mseq_ack = 0 - self.a0seq = 0 - self.a0seq_ack = 0 - self.proto.register ('A', 'BB', self.handle_ack) - def make_handle (s): - return lambda *args: self.handle_stats (s, *args) - for (s, f) in self.stats_format.iteritems (): - self.proto.register (s, f, make_handle (s)) - self.stats_enabled = None - self.param = dict ( - scale = 1, - tkp = 0, tki = 0, tkd = 0, - akp = 0, aki = 0, akd = 0, - a0kp = 0, a0ki = 0, a0kd = 0, - E = 1023, I = 1023, D = 1023, - be = 2048, bs = 0x10, bc = 20, - ta = 1, aa = 1, a0a = 1, - tsm = 0, asm = 0, tss = 0, ass = 0, a0sm = 0, a0ss = 0, - c = 1, f = 0x1000, - l = 0x2000, - ) - self.param.update (param) - self.send_param () + self.proto = proto.Proto (file, time, 0.1) + self.async = False + self.mseq = 0 + self.mseq_ack = 0 + self.a0seq = 0 + self.a0seq_ack = 0 + self.proto.register ('A', 'BB', self.handle_ack) + def make_handle (s): + return lambda *args: self.handle_stats (s, *args) + for (s, f) in self.stats_format.iteritems (): + self.proto.register (s, f, make_handle (s)) + self.stats_enabled = None + self.param = dict ( + scale = 1, + tkp = 0, tki = 0, tkd = 0, + akp = 0, aki = 0, akd = 0, + a0kp = 0, a0ki = 0, a0kd = 0, + E = 1023, I = 1023, D = 1023, + be = 2048, bs = 0x10, bc = 20, + ta = 1, aa = 1, a0a = 1, + tsm = 0, asm = 0, tss = 0, ass = 0, a0sm = 0, a0ss = 0, + c = 1, f = 0x1000, + l = 0x2000, + ) + self.param.update (param) + self.send_param () def stats (self, *stats_items, **options): - """Activate stats.""" - interval = 1 - if 'interval' in options: - interval = options['interval'] - # Build list of stats letters. - stats = [self.stats_items[i][0] for i in stats_items] - stats = [s for s in self.stats_order if s in stats] - stats_last_pos = 0 - stats_pos = { } - for s in stats: - stats_pos[s] = stats_last_pos - stats_last_pos += len (self.stats_format[s]) - # Build stats item positions. - self.stats_items_pos = [ ] - for i in stats_items: - id = self.stats_items[i] - self.stats_items_pos.append (stats_pos[id[0]] + id[1]) - # Enable stats. - for s in stats: - self.proto.send (s, 'B', interval) - # Prepare aquisition. - self.stats_enabled = stats - self.stats_counter = stats[-1] - self.stats_count = 0 - self.stats_list = [ ] - self.stats_line = [ ] + """Activate stats.""" + interval = 1 + if 'interval' in options: + interval = options['interval'] + # Build list of stats letters. + stats = [self.stats_items[i][0] for i in stats_items] + stats = [s for s in self.stats_order if s in stats] + stats_last_pos = 0 + stats_pos = { } + for s in stats: + stats_pos[s] = stats_last_pos + stats_last_pos += len (self.stats_format[s]) + # Build stats item positions. + self.stats_items_pos = [ ] + for i in stats_items: + id = self.stats_items[i] + self.stats_items_pos.append (stats_pos[id[0]] + id[1]) + # Enable stats. + for s in stats: + self.proto.send (s, 'B', interval) + # Prepare aquisition. + self.stats_enabled = stats + self.stats_counter = stats[-1] + self.stats_count = 0 + self.stats_list = [ ] + self.stats_line = [ ] def get_stats (self, wait = None): - if wait: - self.wait (wait) - list = self.stats_list - # Drop first line as it might be garbage. - del list[0] - for s in reversed (self.stats_enabled): - self.proto.send (s, 'B', 0) - # Extract asked stats. - array = numpy.array (list) - array = array[:, self.stats_items_pos] - # Cleanup. - self.stats_enabled = None - del self.stats_items_pos - del self.stats_counter - del self.stats_count - del self.stats_list - del self.stats_line - return array + if wait: + self.wait (wait) + list = self.stats_list + # Drop first line as it might be garbage. + del list[0] + for s in reversed (self.stats_enabled): + self.proto.send (s, 'B', 0) + # Extract asked stats. + array = numpy.array (list) + array = array[:, self.stats_items_pos] + # Cleanup. + self.stats_enabled = None + del self.stats_items_pos + del self.stats_counter + del self.stats_count + del self.stats_list + del self.stats_line + return array def consign (self, w, c): - """Consign offset.""" - if w == 't': - self.proto.send ('c', 'hh', c, 0) - elif w == 'a': - self.proto.send ('c', 'hh', 0, c) - else: - assert w == 'a0' - self.proto.send ('c', 'h', c) + """Consign offset.""" + if w == 't': + self.proto.send ('c', 'hh', c, 0) + elif w == 'a': + self.proto.send ('c', 'hh', 0, c) + else: + assert w == 'a0' + self.proto.send ('c', 'h', c) def speed (self, w, s): - """Speed consign.""" - if w == 't': - self.proto.send ('s', 'bb', s, 0) - elif w == 'a': - self.proto.send ('s', 'bb', 0, s) - else: - assert w == 'a0' - self.proto.send ('s', 'b', s) + """Speed consign.""" + if w == 't': + self.proto.send ('s', 'bb', s, 0) + elif w == 'a': + self.proto.send ('s', 'bb', 0, s) + else: + assert w == 'a0' + self.proto.send ('s', 'b', s) def speed_pos (self, w, offset): - """Speed controlled position consign.""" - if w == 't': - self.mseq += 1 - self.proto.send ('s', 'LLB', offset, 0, self.mseq) - elif w == 'a': - self.mseq += 1 - self.proto.send ('s', 'LLB', 0, offset, self.mseq) - else: - assert w == 'a0' - self.a0seq += 1 - self.proto.send ('s', 'LB', offset, self.a0seq) - self.wait (self.finished, auto = True) + """Speed controlled position consign.""" + if w == 't': + self.mseq += 1 + self.proto.send ('s', 'LLB', offset, 0, self.mseq) + elif w == 'a': + self.mseq += 1 + self.proto.send ('s', 'LLB', 0, offset, self.mseq) + else: + assert w == 'a0' + self.a0seq += 1 + self.proto.send ('s', 'LB', offset, self.a0seq) + self.wait (self.finished, auto = True) def set_pos (self, x = None, y = None, a = None): - """Set current position.""" - if x is not None: - self.proto.send ('p', 'BL', ord ('X'), - 256 * x / self.param['scale']) - if y is not None: - self.proto.send ('p', 'BL', ord ('Y'), - 256 * y / self.param['scale']) - if a is not None: - self.proto.send ('p', 'BL', ord ('A'), a * (1 << 24) / 360) + """Set current position.""" + if x is not None: + self.proto.send ('p', 'BL', ord ('X'), + 256 * x / self.param['scale']) + if y is not None: + self.proto.send ('p', 'BL', ord ('Y'), + 256 * y / self.param['scale']) + if a is not None: + self.proto.send ('p', 'BL', ord ('A'), a * (1 << 24) / 360) def goto (self, x, y, backward_ok = False): - """Go to position.""" - self.mseq += 1 - self.proto.send (backward_ok and 'r' or 'x', 'LLB', - 256 * x / self.param['scale'], - 256 * y / self.param['scale'], self.mseq) - self.wait (self.finished, auto = True) + """Go to position.""" + self.mseq += 1 + self.proto.send (backward_ok and 'r' or 'x', 'LLB', + 256 * x / self.param['scale'], + 256 * y / self.param['scale'], self.mseq) + self.wait (self.finished, auto = True) def goto_angle (self, a): - """Go to angle.""" - self.mseq += 1 - self.proto.send ('x', 'HB', a * (1 << 16) / 360, self.mseq) - self.wait (self.finished, auto = True) + """Go to angle.""" + self.mseq += 1 + self.proto.send ('x', 'HB', a * (1 << 16) / 360, self.mseq) + self.wait (self.finished, auto = True) def goto_xya (self, x, y, a, backward_ok = False): - """Go to position, then angle.""" - self.mseq += 1 - self.proto.send (backward_ok and 'r' or 'x', 'LLHB', - 256 * x / self.param['scale'], - 256 * y / self.param['scale'], - a * (1 << 16) / 360, self.mseq) - self.wait (self.finished, auto = True) + """Go to position, then angle.""" + self.mseq += 1 + self.proto.send (backward_ok and 'r' or 'x', 'LLHB', + 256 * x / self.param['scale'], + 256 * y / self.param['scale'], + a * (1 << 16) / 360, self.mseq) + self.wait (self.finished, auto = True) def set_simu_pos (self, x, y, a): - """Set simulated position.""" - self.proto.send ('h', 'BHHH', ord ('X'), x, y, - math.radians (a) * 1024) + """Set simulated position.""" + self.proto.send ('h', 'BHHH', ord ('X'), x, y, + math.radians (a) * 1024) def register_pos (self, func, interval = 225 / 4): - """Will call func each time a position is received.""" - self.pos_func = func - self.proto.register ('X', 'lll', self.handle_pos) - self.proto.send ('X', 'B', interval) + """Will call func each time a position is received.""" + self.pos_func = func + self.proto.register ('X', 'lll', self.handle_pos) + self.proto.send ('X', 'B', interval) def send_param (self): - p = self.param - self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256, - p['akp'] * 256) - self.proto.send ('p', 'BHH', ord ('i'), p['tki'] * 256, - p['aki'] * 256) - self.proto.send ('p', 'BHH', ord ('d'), p['tkd'] * 256, - p['akd'] * 256) - self.proto.send ('p', 'BH', ord ('p'), p['a0kp'] * 256) - self.proto.send ('p', 'BH', ord ('i'), p['a0ki'] * 256) - self.proto.send ('p', 'BH', ord ('d'), p['a0kd'] * 256) - self.proto.send ('p', 'BH', ord ('E'), p['E']) - self.proto.send ('p', 'BH', ord ('I'), p['I']) - self.proto.send ('p', 'BH', ord ('D'), p['D']) - self.proto.send ('p', 'BHHH', ord ('b'), p['be'], p['bs'], p['bc']) - self.proto.send ('p', 'BHH', ord ('a'), p['ta'] * 256, - p['aa'] * 256) - self.proto.send ('p', 'BH', ord ('a'), p['a0a'] * 256) - self.proto.send ('p', 'BBBBB', ord ('s'), p['tsm'], p['asm'], - p['tss'], p['ass']) - self.proto.send ('p', 'BBB', ord ('s'), p['a0sm'], p['a0ss']) - self.proto.send ('p', 'BL', ord ('c'), p['c'] * 256 * 256 * 256) - self.proto.send ('p', 'BH', ord ('f'), p['f']) - self.proto.send ('p', 'BH', ord ('l'), p['l']) + p = self.param + self.proto.send ('p', 'BHH', ord ('p'), p['tkp'] * 256, + p['akp'] * 256) + self.proto.send ('p', 'BHH', ord ('i'), p['tki'] * 256, + p['aki'] * 256) + self.proto.send ('p', 'BHH', ord ('d'), p['tkd'] * 256, + p['akd'] * 256) + self.proto.send ('p', 'BH', ord ('p'), p['a0kp'] * 256) + self.proto.send ('p', 'BH', ord ('i'), p['a0ki'] * 256) + self.proto.send ('p', 'BH', ord ('d'), p['a0kd'] * 256) + self.proto.send ('p', 'BH', ord ('E'), p['E']) + self.proto.send ('p', 'BH', ord ('I'), p['I']) + self.proto.send ('p', 'BH', ord ('D'), p['D']) + self.proto.send ('p', 'BHHH', ord ('b'), p['be'], p['bs'], p['bc']) + self.proto.send ('p', 'BHH', ord ('a'), p['ta'] * 256, + p['aa'] * 256) + self.proto.send ('p', 'BH', ord ('a'), p['a0a'] * 256) + self.proto.send ('p', 'BBBBB', ord ('s'), p['tsm'], p['asm'], + p['tss'], p['ass']) + self.proto.send ('p', 'BBB', ord ('s'), p['a0sm'], p['a0ss']) + self.proto.send ('p', 'BL', ord ('c'), p['c'] * 256 * 256 * 256) + self.proto.send ('p', 'BH', ord ('f'), p['f']) + self.proto.send ('p', 'BH', ord ('l'), p['l']) def write_eeprom (self): - self.proto.send ('p', 'BB', ord ('E'), 1) - time.sleep (1) - self.wait (lambda: True) + self.proto.send ('p', 'BB', ord ('E'), 1) + time.sleep (1) + self.wait (lambda: True) def handle_stats (self, stat, *args): - if self.stats_enabled is not None: - self.stats_line.extend (args) - if self.stats_counter == stat: - self.stats_list.append (self.stats_line) - self.stats_line = [ ] - self.stats_count += 1 + if self.stats_enabled is not None: + self.stats_line.extend (args) + if self.stats_counter == stat: + self.stats_list.append (self.stats_line) + self.stats_line = [ ] + self.stats_count += 1 def handle_ack (self, mseq, a0seq): - self.mseq_ack = mseq & 0x7f - self.a0seq_ack = a0seq & 0x7f - self.proto.send ('a', 'BB', mseq, a0seq) + self.mseq_ack = mseq & 0x7f + self.a0seq_ack = a0seq & 0x7f + self.proto.send ('a', 'BB', mseq, a0seq) def handle_pos (self, x, y, a): - x = x / 256 * self.param['scale'] - y = y / 256 * self.param['scale'] - a = a * 360 / (1 << 24) - self.pos_func (x, y, a) + x = x / 256 * self.param['scale'] + y = y / 256 * self.param['scale'] + a = a * 360 / (1 << 24) + self.pos_func (x, y, a) def wait (self, cond = None, auto = False): - if auto and self.async: - return - try: - cond_count = int (cond) - cond = lambda: self.stats_count > cond_count - except TypeError: - pass - self.proto.wait (cond) + if auto and self.async: + return + try: + cond_count = int (cond) + cond = lambda: self.stats_count > cond_count + except TypeError: + pass + self.proto.wait (cond) def finished (self): - return self.mseq == self.mseq_ack and self.a0seq == self.a0seq_ack + return self.mseq == self.mseq_ack and self.a0seq == self.a0seq_ack def free (self): - self.proto.send ('w') + self.proto.send ('w') def reset (self): - self.proto.send ('w') - self.proto.send ('w', 'H', 0) - self.proto.send ('z') - self.proto.send ('z') + self.proto.send ('w') + self.proto.send ('w', 'H', 0) + self.proto.send ('z') + self.proto.send ('z') def close (self): - self.reset () - self.wait (lambda: True) - self.proto.file.close () + self.reset () + self.wait (lambda: True) + self.proto.file.close () def fileno (self): - return self.proto.fileno () + return self.proto.fileno () diff --git a/digital/asserv/tools/asserv/init.py b/digital/asserv/tools/asserv/init.py index 42c34f5d..8e4c8d5d 100644 --- a/digital/asserv/tools/asserv/init.py +++ b/digital/asserv/tools/asserv/init.py @@ -1,18 +1,18 @@ """Default parameters for asserv.""" host = dict ( - scale = 0.0395840674352314, f = 0xdd1, - tkp = 1, tkd = 16, akp = 2, akd = 16, - a0kp = 1, a0kd = 16, - E = 0x3ff, D = 0x1ff, - ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, - a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, - ) + scale = 0.0395840674352314, f = 0xdd1, + tkp = 1, tkd = 16, akp = 2, akd = 16, + a0kp = 1, a0kd = 16, + E = 0x3ff, D = 0x1ff, + ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, + a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, + ) target = dict ( - scale = 0.0413530725332892, f = 0xcfa, - c = float (0xffefbe) / (1 << 24), - tkp = 1, tkd = 16, akp = 2, akd = 16, - a0kp = 1, a0kd = 0.05, - E = 0x3ff, D = 0x1ff, - ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, - a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, - ) + scale = 0.0413530725332892, f = 0xcfa, + c = float (0xffefbe) / (1 << 24), + tkp = 1, tkd = 16, akp = 2, akd = 16, + a0kp = 1, a0kd = 0.05, + E = 0x3ff, D = 0x1ff, + ta = 0.5, aa = 0.5, tsm = 0x60, tss = 0x20, asm = 0x30, ass = 0x10, + a0a = 0.5, a0sm = 0x0a, a0ss = 0x05, + ) diff --git a/digital/asserv/tools/inter_asserv.py b/digital/asserv/tools/inter_asserv.py index 44fb8492..1dd47fc3 100644 --- a/digital/asserv/tools/inter_asserv.py +++ b/digital/asserv/tools/inter_asserv.py @@ -40,86 +40,86 @@ class InterAsserv (Inter): """Inter, communicating with the asserv board.""" def __init__ (self, argv): - # Asserv. - if argv[0] == '!': - io = proto.popen_io.PopenIO (argv[1:]) - i = asserv.init.host - else: - io = serial.Serial (argv[0]) - i = asserv.init.target - self.a = Asserv (io, **i) - self.a.async = True - self.a.register_pos (self.pos) - # Inter. - Inter.__init__ (self) - self.tk.createfilehandler (self.a, READABLE, self.read) - self.timeout () + # Asserv. + if argv[0] == '!': + io = proto.popen_io.PopenIO (argv[1:]) + i = asserv.init.host + else: + io = serial.Serial (argv[0]) + i = asserv.init.target + self.a = Asserv (io, **i) + self.a.async = True + self.a.register_pos (self.pos) + # Inter. + Inter.__init__ (self) + self.tk.createfilehandler (self.a, READABLE, self.read) + self.timeout () def createWidgets (self): - Inter.createWidgets (self) - self.ArggButton = Button (self.rightFrame, text = 'Argggg!', - command = self.emergency_stop) - self.ArggButton.pack () - self.actionVar = StringVar () - self.actionVar.set ('goto') - self.actionSetPosButton = Radiobutton (self.rightFrame, - text = 'set pos', value = 'set_pos', - variable = self.actionVar) - self.actionSetPosButton.pack () - self.actionGotoButton = Radiobutton (self.rightFrame, - text = 'goto', value = 'goto', - variable = self.actionVar) - self.actionGotoButton.pack () - self.backwardOkVar = IntVar () - self.backwardOkButton = Checkbutton (self.rightFrame, - text = 'backward ok', variable = self.backwardOkVar) - self.backwardOkButton.pack () + Inter.createWidgets (self) + self.ArggButton = Button (self.rightFrame, text = 'Argggg!', + command = self.emergency_stop) + self.ArggButton.pack () + self.actionVar = StringVar () + self.actionVar.set ('goto') + self.actionSetPosButton = Radiobutton (self.rightFrame, + text = 'set pos', value = 'set_pos', + variable = self.actionVar) + self.actionSetPosButton.pack () + self.actionGotoButton = Radiobutton (self.rightFrame, + text = 'goto', value = 'goto', + variable = self.actionVar) + self.actionGotoButton.pack () + self.backwardOkVar = IntVar () + self.backwardOkButton = Checkbutton (self.rightFrame, + text = 'backward ok', variable = self.backwardOkVar) + self.backwardOkButton.pack () - self.tableview.configure (cursor = 'crosshair') - self.tableview.bind ('<1>', self.button1) - self.tableview.bind ('<3>', self.button3) + self.tableview.configure (cursor = 'crosshair') + self.tableview.bind ('<1>', self.button1) + self.tableview.bind ('<3>', self.button3) def read (self, file, mask): - """Handle asserv events.""" - self.a.proto.read () - self.a.proto.sync () + """Handle asserv events.""" + self.a.proto.read () + self.a.proto.sync () def timeout (self): - self.a.proto.sync () - self.after (100, self.timeout) + self.a.proto.sync () + self.after (100, self.timeout) def pos (self, x, y, a): - self.tableview.robot.pos = (x, y) - self.tableview.robot.angle = float (a) / 180 * math.pi - self.update (self.tableview.robot) - self.update () + self.tableview.robot.pos = (x, y) + self.tableview.robot.angle = float (a) / 180 * math.pi + self.update (self.tableview.robot) + self.update () def button1 (self, ev): - x, y = self.tableview.screen_coord ((ev.x, ev.y)) - action = self.actionVar.get () - if action == 'set_pos': - self.a.set_pos (x, y) - elif action == 'goto': - self.a.goto (x, y, self.backwardOkVar.get ()) - else: - assert 0 + x, y = self.tableview.screen_coord ((ev.x, ev.y)) + action = self.actionVar.get () + if action == 'set_pos': + self.a.set_pos (x, y) + elif action == 'goto': + self.a.goto (x, y, self.backwardOkVar.get ()) + else: + assert 0 def button3 (self, ev): - x, y = self.tableview.screen_coord ((ev.x, ev.y)) - a = math.degrees (math.atan2 (y - self.tableview.robot.pos[1], - x - self.tableview.robot.pos[0])) - action = self.actionVar.get () - if action == 'set_pos': - self.a.set_pos (a = a) - elif action == 'goto': - self.a.goto_angle (a) + x, y = self.tableview.screen_coord ((ev.x, ev.y)) + a = math.degrees (math.atan2 (y - self.tableview.robot.pos[1], + x - self.tableview.robot.pos[0])) + action = self.actionVar.get () + if action == 'set_pos': + self.a.set_pos (a = a) + elif action == 'goto': + self.a.goto_angle (a) def emergency_stop (self): - self.a.free () + self.a.free () if __name__ == '__main__': app = InterAsserv (sys.argv[1:]) try: - app.mainloop () + app.mainloop () finally: - app.a.close () + app.a.close () diff --git a/digital/asserv/tools/step.py b/digital/asserv/tools/step.py index b92311e5..a4538855 100644 --- a/digital/asserv/tools/step.py +++ b/digital/asserv/tools/step.py @@ -8,9 +8,9 @@ import Gnuplot def step (name, offset, kp, ki, kd, plots, **param): if sys.argv[1] == '!': - io = proto.popen_io.PopenIO (sys.argv[2:]) + io = proto.popen_io.PopenIO (sys.argv[2:]) else: - io = serial.Serial (sys.argv[1]) + io = serial.Serial (sys.argv[1]) p = { name + 'kp': kp, name + 'ki': ki, name + 'kd': kd} p.update (param) a = Asserv (io, **p) diff --git a/digital/avr/modules/path/test/test_path.py b/digital/avr/modules/path/test/test_path.py index b7eb63fd..ff9b425c 100644 --- a/digital/avr/modules/path/test/test_path.py +++ b/digital/avr/modules/path/test/test_path.py @@ -31,129 +31,129 @@ import re class Obstacle: def __init__ (self, pos, radius): - self.pos = pos - self.radius = radius + self.pos = pos + self.radius = radius def move (self, pos): - self.pos = pos + self.pos = pos class Area (Drawable): def __init__ (self, onto, border_min, border_max): - Drawable.__init__ (self, onto) - self.border_min = border_min - self.border_max = border_max - self.border = None - self.src = None - self.dst = None - self.obstacles = [ ] - self.path = [ ] + Drawable.__init__ (self, onto) + self.border_min = border_min + self.border_max = border_max + self.border = None + self.src = None + self.dst = None + self.obstacles = [ ] + self.path = [ ] def draw (self): - self.reset () - self.draw_rectangle (self.border_min, self.border_max, fill = 'white') - for o in self.obstacles: - if o.pos is not None: - self.draw_circle (o.pos, o.radius, fill = 'gray25') - if self.src is not None: - self.draw_circle (self.src, 10, fill = 'green') - if self.dst is not None: - self.draw_circle (self.dst, 10, fill = 'red') - if len (self.path) > 1: - fmt = dict (fill = 'blue', arrow = LAST) - self.draw_line (*self.path, **fmt) + self.reset () + self.draw_rectangle (self.border_min, self.border_max, fill = 'white') + for o in self.obstacles: + if o.pos is not None: + self.draw_circle (o.pos, o.radius, fill = 'gray25') + if self.src is not None: + self.draw_circle (self.src, 10, fill = 'green') + if self.dst is not None: + self.draw_circle (self.dst, 10, fill = 'red') + if len (self.path) > 1: + fmt = dict (fill = 'blue', arrow = LAST) + self.draw_line (*self.path, **fmt) def test (self): - self.src = (300, 750) - self.dst = (1200, 750) - self.obstacles.append (Obstacle ((600, 680), 100)) - self.obstacles.append (Obstacle ((900, 820), 100)) + self.src = (300, 750) + self.dst = (1200, 750) + self.obstacles.append (Obstacle ((600, 680), 100)) + self.obstacles.append (Obstacle ((900, 820), 100)) def update (self): - args = [ [ self.border_min[0], self.border_min[1], self.border_max[0], - self.border_max[1] ], self.src, self.dst ] - for o in self.obstacles: - args.append ([o.pos[0], o.pos[1], o.radius]) - args = [ ','.join (str (ai) for ai in a) for a in args ] - args[0:0] = [ './test_path.host' ] - p = Popen (args, stdout = PIPE) - output = p.communicate ()[0] - del p - output = output.split ('\n') - r = re.compile ('^// (\d+), (\d+)$') - self.path = [ ] - for line in output: - m = r.match (line) - if m is not None: - self.path.append (tuple (int (s) for s in m.groups ())) + args = [ [ self.border_min[0], self.border_min[1], self.border_max[0], + self.border_max[1] ], self.src, self.dst ] + for o in self.obstacles: + args.append ([o.pos[0], o.pos[1], o.radius]) + args = [ ','.join (str (ai) for ai in a) for a in args ] + args[0:0] = [ './test_path.host' ] + p = Popen (args, stdout = PIPE) + output = p.communicate ()[0] + del p + output = output.split ('\n') + r = re.compile ('^// (\d+), (\d+)$') + self.path = [ ] + for line in output: + m = r.match (line) + if m is not None: + self.path.append (tuple (int (s) for s in m.groups ())) class AreaView (DrawableCanvas): def __init__ (self, border_min, border_max, master = None): - self.border_min = border_min - self.border_max = border_max - width = border_max[0] - border_min[0] - height = border_max[1] - border_min[0] - DrawableCanvas.__init__ (self, width * 1.1, height * 1.1, -width / 2, - -height / 2, - master, borderwidth = 1, relief = 'sunken', - background = 'white') - self.area = Area (self, border_min, border_max) - self.area.test () - self.area.update () + self.border_min = border_min + self.border_max = border_max + width = border_max[0] - border_min[0] + height = border_max[1] - border_min[0] + DrawableCanvas.__init__ (self, width * 1.1, height * 1.1, -width / 2, + -height / 2, + master, borderwidth = 1, relief = 'sunken', + background = 'white') + self.area = Area (self, border_min, border_max) + self.area.test () + self.area.update () def draw (self): - self.area.draw () + self.area.draw () class TestPath (Frame): def __init__ (self, border_min, border_max, master = None): - Frame.__init__ (self, master) + Frame.__init__ (self, master) self.pack (expand = 1, fill = 'both') self.createWidgets (border_min, border_max) - self.move = None + self.move = None def createWidgets (self, border_min, border_max): - self.rightFrame = Frame (self) - self.rightFrame.pack (side = 'right', fill = 'y') - self.quitButton = Button (self.rightFrame, text = 'Quit', command = self.quit) - self.quitButton.pack (side = 'top', fill = 'x') - self.areaview = AreaView (border_min, border_max, self) - self.areaview.pack (expand = True, fill = 'both') - self.areaview.bind ('<1>', self.click) + self.rightFrame = Frame (self) + self.rightFrame.pack (side = 'right', fill = 'y') + self.quitButton = Button (self.rightFrame, text = 'Quit', command = self.quit) + self.quitButton.pack (side = 'top', fill = 'x') + self.areaview = AreaView (border_min, border_max, self) + self.areaview.pack (expand = True, fill = 'both') + self.areaview.bind ('<1>', self.click) def clear (self): - self.areaview.area.path = [ ] - self.areaview.area.draw () + self.areaview.area.path = [ ] + self.areaview.area.draw () def update (self): - self.areaview.area.update () - self.areaview.area.draw () + self.areaview.area.update () + self.areaview.area.draw () def click (self, ev): - pos = self.areaview.screen_coord ((ev.x, ev.y)) - pos = tuple (int (i) for i in pos) - if self.move is None: - def move_src (pos): - self.areaview.area.src = pos - def move_dst (pos): - self.areaview.area.dst = pos - objs = [ [ self.areaview.area.src, 10, move_src ], - [ self.areaview.area.dst, 10, move_dst ] ] - for o in self.areaview.area.obstacles: - objs.append ([ o.pos, o.radius, o.move ]) - for obj in objs: - dx = obj[0][0] - pos[0] - dy = obj[0][1] - pos[1] - if dx * dx + dy * dy < obj[1] * obj[1]: - self.move = obj[2] - if self.move is not None: - self.move (None) - self.clear () - else: - self.move (pos) - self.update () - self.move = None + pos = self.areaview.screen_coord ((ev.x, ev.y)) + pos = tuple (int (i) for i in pos) + if self.move is None: + def move_src (pos): + self.areaview.area.src = pos + def move_dst (pos): + self.areaview.area.dst = pos + objs = [ [ self.areaview.area.src, 10, move_src ], + [ self.areaview.area.dst, 10, move_dst ] ] + for o in self.areaview.area.obstacles: + objs.append ([ o.pos, o.radius, o.move ]) + for obj in objs: + dx = obj[0][0] - pos[0] + dy = obj[0][1] - pos[1] + if dx * dx + dy * dy < obj[1] * obj[1]: + self.move = obj[2] + if self.move is not None: + self.move (None) + self.clear () + else: + self.move (pos) + self.update () + self.move = None if __name__ == '__main__': app = TestPath ((0, 0), (1500, 1500)) diff --git a/digital/avr/modules/twi/test/test_twi_host.py b/digital/avr/modules/twi/test/test_twi_host.py index 10b3e208..5af0e669 100644 --- a/digital/avr/modules/twi/test/test_twi_host.py +++ b/digital/avr/modules/twi/test/test_twi_host.py @@ -17,8 +17,8 @@ fh = Forked (h.wait) n = Node () def nf (): while True: - time.sleep (1) - n.wait (n.date + 1) + time.sleep (1) + n.wait (n.date + 1) fn = Forked (nf) slave = os.popen ('./test_twi_sl.host', 'w') diff --git a/digital/io/tools/io/init.py b/digital/io/tools/io/init.py index 12c6bb9a..7b8d2b8a 100644 --- a/digital/io/tools/io/init.py +++ b/digital/io/tools/io/init.py @@ -1,6 +1,6 @@ """Default parameters for io.""" host = dict ( - trap = ((1, 255), (1, 255), (1, 255), (1, 255), (1, 255), (1, 255)), - sharp_threshold = ((0x7f, 0x90), (0x7f, 0x90), (0x7f, 0x90), - (0x7f, 0x90), (0x7f, 0x90)), - ) + trap = ((1, 255), (1, 255), (1, 255), (1, 255), (1, 255), (1, 255)), + sharp_threshold = ((0x7f, 0x90), (0x7f, 0x90), (0x7f, 0x90), + (0x7f, 0x90), (0x7f, 0x90)), + ) diff --git a/digital/io/tools/io/io.py b/digital/io/tools/io/io.py index 8b78a8cc..5d2826a1 100644 --- a/digital/io/tools/io/io.py +++ b/digital/io/tools/io/io.py @@ -3,34 +3,34 @@ import proto, time class Io: def __init__ (self, file, time = time.time, **param): - self.proto = proto.Proto (file, time, 0.1) - self.async = False - self.param = param - self.send_param () + self.proto = proto.Proto (file, time, 0.1) + self.async = False + self.param = param + self.send_param () def send_param (self): - p = self.param - for i, t in enumerate (p['trap']): - self.proto.send ('t', 'BBB', i, t[0], t[1]) - for i, t in enumerate (p['sharp_threshold']): - self.proto.send ('h', 'BHH', i, t[0], t[1]) + p = self.param + for i, t in enumerate (p['trap']): + self.proto.send ('t', 'BBB', i, t[0], t[1]) + for i, t in enumerate (p['sharp_threshold']): + self.proto.send ('h', 'BHH', i, t[0], t[1]) def write_eeprom (self): - self.proto.send ('p', 'BB', ord ('E'), ord ('s')) - time.sleep (1) - self.proto.wait (lambda: True) + self.proto.send ('p', 'BB', ord ('E'), ord ('s')) + time.sleep (1) + self.proto.wait (lambda: True) def reset (self): - self.proto.send ('w') - self.proto.send ('w', 'H', 0) - self.proto.send ('z') - self.proto.send ('z') + self.proto.send ('w') + self.proto.send ('w', 'H', 0) + self.proto.send ('z') + self.proto.send ('z') def close (self): - self.reset () - self.wait (lambda: True) - self.proto.file.close () + self.reset () + self.wait (lambda: True) + self.proto.file.close () def fileno (self): - return self.proto.fileno () + return self.proto.fileno () diff --git a/digital/io/tools/test_simu.py b/digital/io/tools/test_simu.py index 7f208911..1d7175a7 100644 --- a/digital/io/tools/test_simu.py +++ b/digital/io/tools/test_simu.py @@ -18,58 +18,58 @@ class TestSimu (InterNode): """Inter, with simulated programs.""" robot_start_pos = ((200, 2100 - 70, -90), - (3000 - 200, 2100 - 70, -90)) + (3000 - 200, 2100 - 70, -90)) def __init__ (self, asserv_cmd, io_cmd): - # Hub. - self.hub = mex.hub.Hub (min_clients = 2) - self.forked_hub = utils.forked.Forked (self.hub.wait) - # InterNode. - InterNode.__init__ (self) - def time (): - return self.node.date / self.TICK - # Asserv. - self.asserv = Asserv (PopenIO (asserv_cmd), time, **asserv.init.host) - self.asserv.async = True - self.tk.createfilehandler (self.asserv, READABLE, self.asserv_read) - # Io. - self.io = Io (PopenIO (io_cmd), time, **io.init.host) - self.io.async = True - self.tk.createfilehandler (self.io, READABLE, self.io_read) - # Color switch. - self.change_color () - self.colorVar.trace_variable ('w', self.change_color) + # Hub. + self.hub = mex.hub.Hub (min_clients = 2) + self.forked_hub = utils.forked.Forked (self.hub.wait) + # InterNode. + InterNode.__init__ (self) + def time (): + return self.node.date / self.TICK + # Asserv. + self.asserv = Asserv (PopenIO (asserv_cmd), time, **asserv.init.host) + self.asserv.async = True + self.tk.createfilehandler (self.asserv, READABLE, self.asserv_read) + # Io. + self.io = Io (PopenIO (io_cmd), time, **io.init.host) + self.io.async = True + self.tk.createfilehandler (self.io, READABLE, self.io_read) + # Color switch. + self.change_color () + self.colorVar.trace_variable ('w', self.change_color) def close (self): - self.forked_hub.kill () - import time - time.sleep (1) - self.asserv.close () - self.io.close () + self.forked_hub.kill () + import time + time.sleep (1) + self.asserv.close () + self.io.close () def asserv_read (self, file, mask): - self.asserv.proto.read () - self.asserv.proto.sync () + self.asserv.proto.read () + self.asserv.proto.sync () def io_read (self, file, mask): - self.io.proto.read () - self.io.proto.sync () + self.io.proto.read () + self.io.proto.sync () def step (self): - """Overide step to handle retransmissions, could be made cleaner using - simulated time.""" - InterNode.step (self) - self.asserv.proto.sync () - self.io.proto.sync () + """Overide step to handle retransmissions, could be made cleaner using + simulated time.""" + InterNode.step (self) + self.asserv.proto.sync () + self.io.proto.sync () def change_color (self, *dummy): - i = self.colorVar.get () - self.asserv.set_simu_pos (*self.robot_start_pos[i]); + i = self.colorVar.get () + self.asserv.set_simu_pos (*self.robot_start_pos[i]); if __name__ == '__main__': app = TestSimu (('../../asserv/src/asserv/asserv.host', '-m', 'giboulee'), - ('../src/io.host')) + ('../src/io.host')) try: - app.mainloop () + app.mainloop () finally: - app.close () + app.close () -- cgit v1.2.3