summaryrefslogtreecommitdiff
path: root/nxt-python-fantom/nxt/direct.py
blob: 50b7f2acfe49e7585b42e778a22c917977c900a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# nxt.direct module -- LEGO Mindstorms NXT direct telegrams
# Copyright (C) 2006, 2007  Douglas P Lau
# Copyright (C) 2009  Marcus Wanner
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

'Use for direct communication with the NXT ***EXTREMELY ADVANCED USERS ONLY***'

def _create(opcode):
    'Create a simple direct telegram'
    from telegram import Telegram
    return Telegram(True, opcode)

def start_program(opcode, fname):
    tgram = _create(opcode)
    tgram.add_filename(fname)
    return tgram

def _parse_simple(tgram):
    tgram.check_status()

def stop_program(opcode):
    return _create(opcode)

def play_sound_file(opcode, loop, fname):
    tgram = _create(opcode)
    tgram.add_u8(loop)
    tgram.add_filename(fname)
    return tgram

def play_tone(opcode, frequency, duration):
    'Play a tone at frequency (Hz) for duration (ms)'
    tgram = _create(opcode)
    tgram.add_u16(frequency)
    tgram.add_u16(duration)
    return tgram

def set_output_state(opcode, port, power, mode, regulation, turn_ratio,
    run_state, tacho_limit):
    tgram = _create(opcode)
    tgram.add_u8(port)
    tgram.add_s8(power)
    tgram.add_u8(mode)
    tgram.add_u8(regulation)
    tgram.add_s8(turn_ratio)
    tgram.add_u8(run_state)
    tgram.add_u32(tacho_limit)
    return tgram

def set_input_mode(opcode, port, sensor_type, sensor_mode):
    tgram = _create(opcode)
    tgram.add_u8(port)
    tgram.add_u8(sensor_type)
    tgram.add_u8(sensor_mode)
    return tgram

def get_output_state(opcode, port):
    tgram = _create(opcode)
    tgram.add_u8(port)
    return tgram

def _parse_get_output_state(tgram):
    tgram.check_status()
    port = tgram.parse_u8()
    power = tgram.parse_s8()
    mode = tgram.parse_u8()
    regulation = tgram.parse_u8()
    turn_ratio = tgram.parse_s8()
    run_state = tgram.parse_u8()
    tacho_limit = tgram.parse_u32()
    tacho_count = tgram.parse_s32()
    block_tacho_count = tgram.parse_s32()
    rotation_count = tgram.parse_s32()
    return (port, power, mode, regulation, turn_ratio, run_state,
        tacho_limit, tacho_count, block_tacho_count, rotation_count)

def get_input_values(opcode, port):
    tgram = _create(opcode)
    tgram.add_u8(port)
    return tgram

def _parse_get_input_values(tgram):
    tgram.check_status()
    port = tgram.parse_u8()
    valid = tgram.parse_u8()
    calibrated = tgram.parse_u8()
    sensor_type = tgram.parse_u8()
    sensor_mode = tgram.parse_u8()
    raw_ad_value = tgram.parse_u16()
    normalized_ad_value = tgram.parse_u16()
    scaled_value = tgram.parse_s16()
    calibrated_value = tgram.parse_s16()
    return (port, valid, calibrated, sensor_type, sensor_mode, raw_ad_value,
        normalized_ad_value, scaled_value, calibrated_value)

def reset_input_scaled_value(opcode, port):
    tgram = _create(opcode)
    tgram.add_u8(port)
    return tgram

def message_write(opcode, inbox, message):
    tgram = _create(opcode)
    tgram.add_u8(inbox)
    tgram.add_u8(len(message) + 1)
    tgram.add_string(len(message), message)
    tgram.add_u8(0)
    return tgram

def reset_motor_position(opcode, port, relative):
    tgram = _create(opcode)
    tgram.add_u8(port)
    tgram.add_u8(relative)
    return tgram

def get_battery_level(opcode):
    return _create(opcode)

def _parse_get_battery_level(tgram):
    tgram.check_status()
    millivolts = tgram.parse_u16()
    return millivolts

def stop_sound_playback(opcode):
    return _create(opcode)

def keep_alive(opcode):
    return _create(opcode)

def _parse_keep_alive(tgram):
    tgram.check_status()
    sleep_time = tgram.parse_u32()
    return sleep_time

def ls_get_status(opcode, port):
    'Get status of low-speed sensor (ultrasonic)'
    tgram = _create(opcode)
    tgram.add_u8(port)
    return tgram

def _parse_ls_get_status(tgram):
    tgram.check_status()
    n_bytes = tgram.parse_u8()
    return n_bytes

def ls_write(opcode, port, tx_data, rx_bytes):
    'Write a low-speed command to a sensor (ultrasonic)'
    tgram = _create(opcode)
    tgram.add_u8(port)
    tgram.add_u8(len(tx_data))
    tgram.add_u8(rx_bytes)
    tgram.add_string(len(tx_data), tx_data)
    return tgram

def ls_read(opcode, port):
    'Read a low-speed sensor value (ultrasonic)'
    tgram = _create(opcode)
    tgram.add_u8(port)
    return tgram

def _parse_ls_read(tgram):
    tgram.check_status()
    n_bytes = tgram.parse_u8()
    contents = tgram.parse_string()
    return contents[:n_bytes]

def get_current_program_name(opcode):
    return _create(opcode)

def _parse_get_current_program_name(tgram):
    tgram.check_status()
    fname = tgram.parse_string()
    return fname

def message_read(opcode, remote_inbox, local_inbox, remove):
    tgram = _create(opcode)
    tgram.add_u8(remote_inbox)
    tgram.add_u8(local_inbox)
    tgram.add_u8(remove)
    return tgram

def _parse_message_read(tgram):
    tgram.check_status()
    local_inbox = tgram.parse_u8()
    n_bytes = tgram.parse_u8()
    message = tgram.parse_string()
    return (local_inbox, message[:n_bytes])

OPCODES = {
    0x00: (start_program, _parse_simple),
    0x01: (stop_program, _parse_simple),
    0x02: (play_sound_file, _parse_simple),
    0x03: (play_tone, _parse_simple),
    0x04: (set_output_state, _parse_simple),
    0x05: (set_input_mode, _parse_simple),
    0x06: (get_output_state, _parse_get_output_state),
    0x07: (get_input_values, _parse_get_input_values),
    0x08: (reset_input_scaled_value, _parse_simple),
    0x09: (message_write, _parse_simple),
    0x0A: (reset_motor_position, _parse_simple),
    0x0B: (get_battery_level, _parse_get_battery_level),
    0x0C: (stop_sound_playback, _parse_simple),
    0x0D: (keep_alive, _parse_keep_alive),
    0x0E: (ls_get_status, _parse_ls_get_status),
    0x0F: (ls_write, _parse_simple),
    0x10: (ls_read, _parse_ls_read),
    0x11: (get_current_program_name, _parse_get_current_program_name),
    0x13: (message_read, _parse_message_read),
}