summaryrefslogtreecommitdiff
path: root/digital/io/tools/test_simu.py
blob: 73e0259c88cd240ecb21fa2058ee26840d4d4a7c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import sys
sys.path.append (sys.path[0] + '/../../../host')
sys.path.append (sys.path[0] + '/../../asserv/tools')
import math

import mex.hub
import utils.forked

from asserv import Asserv
import asserv.init
from io import Io
import io.init
from proto.popen_io import PopenIO

from inter.inter_node import InterNode
from Tkinter import *

class TestSimu (InterNode):
    """Inter, with simulated programs."""

    robot_start_pos = ((200, 2100 - 70, math.radians (-90)),
            (3000 - 200, 2100 - 70, math.radians (-90)))

    def __init__ (self, asserv_cmd, io_cmd):
        # Hub.
        self.hub = mex.hub.Hub (min_clients = 2)
        self.forked_hub = utils.forked.Forked (self.hub.wait)
        # InterNode.
        InterNode.__init__ (self)
        def time ():
            return self.node.date / self.TICK
        # Asserv.
        self.asserv = Asserv (PopenIO (asserv_cmd), time, **asserv.init.host)
        self.asserv.async = True
        self.tk.createfilehandler (self.asserv, READABLE, self.asserv_read)
        # Io.
        self.io = Io (PopenIO (io_cmd), time, **io.init.host)
        self.io.async = True
        self.tk.createfilehandler (self.io, READABLE, self.io_read)
        # Color switch.
        self.change_color ()
        self.colorVar.trace_variable ('w', self.change_color)

    def close (self):
        self.forked_hub.kill ()
        import time
        time.sleep (1)
        self.asserv.close ()
        self.io.close ()

    def asserv_read (self, file, mask):
        self.asserv.proto.read ()
        self.asserv.proto.sync ()

    def io_read (self, file, mask):
        self.io.proto.read ()
        self.io.proto.sync ()

    def step (self):
        """Overide step to handle retransmissions, could be made cleaner using
        simulated time."""
        InterNode.step (self)
        self.asserv.proto.sync ()
        self.io.proto.sync ()

    def change_color (self, *dummy):
        i = self.colorVar.get ()
        self.asserv.set_simu_pos (*self.robot_start_pos[i]);

if __name__ == '__main__':
    app = TestSimu (('../../asserv/src/asserv/asserv.host', '-m', 'giboulee'),
            ('../src/io.host'))
    try:
        app.mainloop ()
    finally:
        app.close ()