summaryrefslogtreecommitdiff
path: root/digital/io/tools/test_simu.py
blob: 7f208911c2361323fa4eb4b0cf08533933623fd4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import sys
sys.path.append (sys.path[0] + '/../../../host')
sys.path.append (sys.path[0] + '/../../asserv/tools')

import mex.hub
import utils.forked

from asserv import Asserv
import asserv.init
from io import Io
import io.init
from proto.popen_io import PopenIO

from inter.inter_node import InterNode
from Tkinter import *

class TestSimu (InterNode):
    """Inter, with simulated programs."""

    robot_start_pos = ((200, 2100 - 70, -90),
	    (3000 - 200, 2100 - 70, -90))

    def __init__ (self, asserv_cmd, io_cmd):
	# Hub.
	self.hub = mex.hub.Hub (min_clients = 2)
	self.forked_hub = utils.forked.Forked (self.hub.wait)
	# InterNode.
	InterNode.__init__ (self)
	def time ():
	    return self.node.date / self.TICK
	# Asserv.
	self.asserv = Asserv (PopenIO (asserv_cmd), time, **asserv.init.host)
	self.asserv.async = True
	self.tk.createfilehandler (self.asserv, READABLE, self.asserv_read)
	# Io.
	self.io = Io (PopenIO (io_cmd), time, **io.init.host)
	self.io.async = True
	self.tk.createfilehandler (self.io, READABLE, self.io_read)
	# Color switch.
	self.change_color ()
	self.colorVar.trace_variable ('w', self.change_color)

    def close (self):
	self.forked_hub.kill ()
	import time
	time.sleep (1)
	self.asserv.close ()
	self.io.close ()

    def asserv_read (self, file, mask):
	self.asserv.proto.read ()
	self.asserv.proto.sync ()

    def io_read (self, file, mask):
	self.io.proto.read ()
	self.io.proto.sync ()

    def step (self):
	"""Overide step to handle retransmissions, could be made cleaner using
	simulated time."""
	InterNode.step (self)
	self.asserv.proto.sync ()
	self.io.proto.sync ()

    def change_color (self, *dummy):
	i = self.colorVar.get ()
	self.asserv.set_simu_pos (*self.robot_start_pos[i]);

if __name__ == '__main__':
    app = TestSimu (('../../asserv/src/asserv/asserv.host', '-m', 'giboulee'),
	    ('../src/io.host'))
    try:
	app.mainloop ()
    finally:
	app.close ()