aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyfantom.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyfantom.py')
-rw-r--r--pyfantom.py165
1 files changed, 147 insertions, 18 deletions
diff --git a/pyfantom.py b/pyfantom.py
index f5ddbb2..3e20574 100644
--- a/pyfantom.py
+++ b/pyfantom.py
@@ -1,9 +1,15 @@
"""NXT Fantom driver wrapper."""
-from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, byref, POINTER
+from ctypes import c_int, c_uint, c_ushort, c_ubyte, c_char_p, c_char, byref, POINTER, Structure, cast
import ctypes.util
import platform
import collections
+kMaxFileNameLength = 101
+BT_PIN = "1234"
+VI_ERROR_IO = -0x4000ffc2 # Equivalent to 0xBFFF003E
+
+DEBUG = True
+
# Check platform.
if platform.system() == 'Darwin':
import sys
@@ -12,6 +18,7 @@ if platform.system() == 'Darwin':
"You can run python in 32 bit mode using:\n"
"arch -i386 python2.6\n")
libpath = '/Library/Frameworks/Fantom.framework/Fantom'
+ from AppKit import NSAutoreleasePool, NSApplication
#libpath = ctypes.util.find_library('Fantom')
else:
raise RuntimeError('unsupported platform')
@@ -44,11 +51,26 @@ dll.nFANTOM100_iNXT_write.restype = c_uint
dll.nFANTOM100_iNXT_read.argtypes = [c_uint, c_char_p, c_uint,
POINTER(c_int)]
dll.nFANTOM100_iNXT_read.restype = c_uint
+dll.nFANTOM100_iNXT_pollAvailableLength.argtypes = [c_uint, c_uint, POINTER(c_int)]
+dll.nFANTOM100_iNXT_pollAvailableLength.restype = c_uint
+dll.nFANTOM100_iNXT_readBufferData.argtypes = [c_uint, c_char_p, c_uint, c_uint,
+ POINTER(c_int)]
+dll.nFANTOM100_iNXT_readBufferData.restype = c_uint
+dll.nFANTOM100_iNXT_sendDirectCommand.argtypes = [c_uint, c_ushort, c_char_p, c_uint,
+ c_char_p, c_uint, POINTER(c_int)]
+dll.nFANTOM100_iNXT_sendDirectCommand.restype = c_uint
dll.nFANTOM100_iNXT_getDeviceInfo.restype = None
dll.nFANTOM100_iNXT_getResourceString.argtypes = [c_uint, c_char_p,
POINTER(c_int)]
dll.nFANTOM100_iNXT_getResourceString.restype = None
+dll.nFANTOM100_pairBluetooth.argtypes = [c_char_p, c_char_p, c_char_p, POINTER(c_int)]
+dll.nFANTOM100_pairBluetooth.restype = None
+dll.nFANTOM100_unpairBluetooth.argtypes = [c_char_p, POINTER(c_int)]
+dll.nFANTOM100_unpairBluetooth.restype = None
+dll.nFANTOM100_isPaired.argtypes = [c_char_p, POINTER(c_int)]
+dll.nFANTOM100_isPaired.restype = c_ushort
+
class FantomException(RuntimeError):
"""Exception thrown on Fantom library error."""
pass
@@ -162,7 +184,7 @@ class Status:
if status.value in Status.description:
description = Status.description[status.value]
else:
- description = 'error %d' % status.value
+ description = 'error %x (hex)' % status.value
raise FantomException(description)
class NXTIterator:
@@ -170,14 +192,24 @@ class NXTIterator:
def __init__(self, search_bluetooth, bluetooth_search_timeout_s=5):
"""Initialize iterator."""
+ self.debug = DEBUG
self.search_bluetooth = search_bluetooth
self.bluetooth_search_timeout_s = bluetooth_search_timeout_s
self.handle = None
self.stop = False
+ self.nsapp = NSApplication.sharedApplication()
+ self.pool = NSAutoreleasePool.alloc().init()
def destroy():
"""To be used in destructor."""
+ if self.debug:
+ print "pyfantom: Destroying NXTIterator."
status = c_int(0)
- dll.nFANTOM100_destroyNXTIterator(self.handle, byref(status))
+ dll.nFANTOM100_destroyNXTIterator(self.handle, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ del self.pool
+ del self.nsapp
+ if self.debug:
+ print "pyfantom: NXTIterator destroyed."
self.__destroy = destroy
def __iter__(self):
@@ -192,10 +224,10 @@ class NXTIterator:
status = c_int(0)
if self.handle is None:
handle = dll.nFANTOM100_createNXTIterator(self.search_bluetooth,
- self.bluetooth_search_timeout_s, byref(status))
+ self.bluetooth_search_timeout_s, cast(byref(status), POINTER(c_int)))
else:
handle = self.handle
- dll.nFANTOM100_iNXTIterator_advance(handle, byref(status))
+ dll.nFANTOM100_iNXTIterator_advance(handle, cast(byref(status), POINTER(c_int)))
# Check result.
if status.value == Status.NoMoreItemsFound:
self.stop = True
@@ -211,9 +243,9 @@ class NXTIterator:
if self.handle is None or self.stop:
raise FantomException('invalid iterator')
status = c_int(0)
- handle = dll.nFANTOM100_iNXTIterator_getNXT(self.handle, byref(status))
+ handle = dll.nFANTOM100_iNXTIterator_getNXT(self.handle, cast(byref(status), POINTER(c_int)))
Status.check(status)
- return NXT(handle)
+ return NXT(handle, self.search_bluetooth)
def get_name(self):
"""Get the NXT resource name."""
@@ -221,7 +253,7 @@ class NXTIterator:
raise FantomException('invalid iterator')
status = c_int(0)
name = ctypes.create_string_buffer(256)
- dll.nFANTOM100_iNXTIterator_getName(self.handle, name, byref(status))
+ dll.nFANTOM100_iNXTIterator_getName(self.handle, name, cast(byref(status), POINTER(c_int)))
Status.check(status)
return name.value
@@ -231,6 +263,9 @@ class NXTIterator:
"""Destroy iterator."""
if self.handle is not None:
self.__destroy()
+ else:
+ if self.debug:
+ print "pyfantom: No NXTIterator in __del__."
class NXT:
"""Interface to the NXT brick."""
@@ -238,20 +273,36 @@ class NXT:
DeviceInfo = collections.namedtuple('DeviceInfo',
'name bluetooth_address signal_strength available_flash')
- def __init__(self, name_or_handle):
+ def __init__(self, name_or_handle, bluetooth=False, pin=BT_PIN):
"""Initialize interface."""
+ self.debug = DEBUG
+ self.handle = None
+ self.isBTLink = bluetooth # Not used for now
if isinstance(name_or_handle, basestring):
status = c_int(0)
- handle = dll.nFANTOM100_createNXT(name_or_handle, byref(status),
+ handle = dll.nFANTOM100_createNXT(name_or_handle, cast(byref(status), POINTER(c_int)),
True)
Status.check(status)
self.handle = handle
else:
self.handle = name_or_handle
+ # Initialize self.pool here to avoid dll.nFANTOM100_createNXT() triggering
+ # an autorelease pool error message if it failed
+ self.pool = NSAutoreleasePool.alloc().init()
def destroy():
"""To be used in destructor."""
- status = c_int(0)
- dll.nFANTOM100_destroyNXT(self.handle, byref(status))
+ if self.debug:
+ print "pyfantom: Destroying NXT."
+ if self.handle is not None:
+ status = c_int(0)
+ dll.nFANTOM100_destroyNXT(self.handle, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ del self.pool
+ if self.debug:
+ print "pyfantom: NXT destroyed."
+ else:
+ if self.debug:
+ print "pyfantom: NXT handle is None!"
self.__destroy = destroy
def get_firmware_version(self):
@@ -264,7 +315,7 @@ class NXT:
dll.nFANTOM100_iNXT_getFirmwareVersion(self.handle,
byref(protocol_major), byref(protocol_minor),
byref(firmware_major), byref(firmware_minor),
- byref(status))
+ cast(byref(status), POINTER(c_int)))
Status.check(status)
return (protocol_major.value, protocol_minor.value,
firmware_major.value, firmware_minor.value)
@@ -274,7 +325,7 @@ class NXT:
status = c_int(0)
data_buffer = ctypes.create_string_buffer(data)
ret = dll.nFANTOM100_iNXT_write(self.handle, data_buffer, len(data),
- byref(status))
+ cast(byref(status), POINTER(c_int)))
Status.check(status)
return ret
@@ -283,7 +334,49 @@ class NXT:
status = c_int(0)
data_buffer = ctypes.create_string_buffer(length)
ret = dll.nFANTOM100_iNXT_read(self.handle, data_buffer, length,
- byref(status))
+ cast(byref(status), POINTER(c_int)))
+ if self.debug:
+ print "pyfantom: read() returned ", ret, " of ", length, " bytes."
+ try:
+ Status.check(status)
+ except FantomException as err:
+ if status.value != VI_ERROR_IO:
+ raise err
+ assert ret <= length
+ return data_buffer.raw[0:ret]
+
+ def send_direct_command(self, data, response=None, requireResponse=False):
+ """Sends the specified direct command to this NXT."""
+ status = c_int(0)
+ data_buffer = ctypes.create_string_buffer(data)
+ if requireResponse:
+ assert response is not None
+ response_buffer = ctypes.create_string_buffer(response)
+ responseLen = len(response)
+ else:
+ response_buffer = None
+ responseLen = 0
+ ret = dll.nFANTOM100_iNXT_sendDirectCommand(self.handle, requireResponse, data_buffer, len(data),
+ response_buffer, responseLen, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ return ret
+
+ def poll_command_length(self, buffer=0):
+ """Polls the data buffer on this NXT for the number of bytes available to be read."""
+ buffer_selector = c_uint(buffer)
+ status = c_int(0)
+ ret = dll.nFANTOM100_iNXT_pollAvailableLength(self.handle, buffer_selector,
+ cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ return ret
+
+ def poll_command(self, length, buffer=0):
+ """Reads data from the data buffer on this NXT."""
+ status = c_int(0)
+ buffer_selector = c_uint(buffer)
+ data_buffer = ctypes.create_string_buffer(length)
+ ret = dll.nFANTOM100_iNXT_readBufferData(self.handle, data_buffer, buffer_selector, length,
+ cast(byref(status), POINTER(c_int)))
Status.check(status)
assert ret <= length
return data_buffer.raw[0:ret]
@@ -297,11 +390,11 @@ class NXT:
available_flash = c_uint(0)
dll.nFANTOM100_iNXT_getDeviceInfo(self.handle, name,
bluetooth_address, signal_strength, byref(available_flash),
- byref(status))
+ cast(byref(status), POINTER(c_int)))
return self.DeviceInfo(
name = name.value,
bluetooth_address = ':'.join('%02x' % c
- for c in bluetooth_address[0:5]),
+ for c in bluetooth_address[0:6]),
signal_strength = tuple(c for c in signal_strength),
available_flash = available_flash.value,
)
@@ -311,14 +404,50 @@ class NXT:
status = c_int(0)
name = ctypes.create_string_buffer(256)
dll.nFANTOM100_iNXT_getResourceString(self.handle, name,
- byref(status))
+ cast(byref(status), POINTER(c_int)))
Status.check(status)
return name.value
+ def close(self):
+ if self.debug:
+ print "pyfantom: close()"
+ if self.handle is not None:
+ self.__destroy()
+ self.handle = None
+ else:
+ if self.debug:
+ print "pyfantom: No NXT in close()."
+
+ def pair_bluetooth(self, resource, pin=BT_PIN):
+ btpin = ctypes.create_string_buffer(256)
+ btpin.value = pin
+ paired_name = ctypes.create_string_buffer(256)
+ #status = c_int(0)
+ status = StatusVar(0)
+ dll.nFANTOM100_pairBluetooth(resource, btpin, paired_name, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ return paired_name.value
+
+ def unpair_bluetooth(self, resource):
+ #status = c_int(0)
+ status = StatusVar(0)
+ dll.nFANTOM100_pairBluetooth(resource, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+
+ def ispaired_bluetooth(self, resource):
+ #status = c_int(0)
+ status = StatusVar(0)
+ paired = dll.nFANTOM100_isPaired(resource, cast(byref(status), POINTER(c_int)))
+ Status.check(status)
+ return paired
+
def __del__(self):
"""Destroy interface."""
if self.handle is not None:
self.__destroy()
+ else:
+ if self.debug:
+ print "pyfantom: No NXT in __del__."
if __name__ == '__main__':
get_info = False