From b8a9597a5e1fba0b024110e2d42640481c41677f Mon Sep 17 00:00:00 2001 From: Tat-Chee Wan (USM) Date: Wed, 27 Apr 2011 14:52:48 +0800 Subject: wip pyfantom integration with nxt-python Work In Progress: refactor nxt-python to add pyfantom support --- nxt-python-fantom/nxt/bluesock.py | 6 ++- nxt-python-fantom/nxt/fantomglue.py | 78 ++++++++++++++++++++++++++++------ nxt-python-fantom/nxt/pyusbglue.py | 84 +++++++++++++++++++++++++++++++++++++ nxt-python-fantom/nxt/usbsock.py | 45 ++++++++------------ 4 files changed, 172 insertions(+), 41 deletions(-) create mode 100644 nxt-python-fantom/nxt/pyusbglue.py (limited to 'nxt-python-fantom/nxt') diff --git a/nxt-python-fantom/nxt/bluesock.py b/nxt-python-fantom/nxt/bluesock.py index c57894d..b4a2739 100644 --- a/nxt-python-fantom/nxt/bluesock.py +++ b/nxt-python-fantom/nxt/bluesock.py @@ -15,7 +15,11 @@ try: import bluetooth except ImportError: - import lightblueglue as bluetooth + try: + import lightblueglue as bluetooth + except ImportError: + import pyfantom as bluetooth + import os from .brick import Brick diff --git a/nxt-python-fantom/nxt/fantomglue.py b/nxt-python-fantom/nxt/fantomglue.py index 3e3c68f..89c08b2 100644 --- a/nxt-python-fantom/nxt/fantomglue.py +++ b/nxt-python-fantom/nxt/fantomglue.py @@ -16,40 +16,92 @@ import pyfantom -FANTOM_BT='BT' -FANTOM_USB='USB' - +RFCOMM=11 # lightblue const +FANTOM_BT = RFCOMM # For compatibilty with lightblue +FANTOM_USB = 0 +# Bluetooth Iterator def discover_devices(lookup_names=False): # parameter is ignored pairs = [] - d = pyfantom.finddevices(proto = FANTOM_BT) - for p in d: - h = p[0] - n = p[1] + for d in NXTIterator(True): + # h: host, n: name + h = d.get_resource_string() + nxt = d.get_nxt() + device_info = nxt.get_device_info() + n = device_info.name + del nxt pairs.append((h, n)) return pairs class BluetoothSocket: def __init__(self, proto = FANTOM_BT, _sock=None): - if _sock is None: - _sock = pyfantom.socket(proto, passkey = None) + # We instantiate a NXT object only when we connect if none supplied self._sock = _sock self._proto = proto def connect(self, addrport): addr, port = addrport - self._sock.connect( (addr, port )) + if self._sock is None: + # Port is ignored + self._sock = NXT(addr) def send(self, data): - return self._sock.send( data ) + return self._sock.write( data ) def recv(self, numbytes): - return self._sock.recv( numbytes ) + return self._sock.read( numbytes ) def close(self): - return self._sock.close() + del self._sock class BluetoothError(IOError): pass +def _check_brick(arg, value): + return arg is None or arg == value + + +def find_devices(lookup_names=False): # parameter is ignored + devicelist = [] + for d in NXTIterator(False): + nxt = d.get_nxt() + devicelist.append(nxt) + return devicelist + +# FIXME: The semantics of find_devices is different from discover_devices +# +# # h: host, n: name +# hostmatched = None +# namematched = None +# h = d.get_resource_string() +# nxt = d.get_nxt() +# device_info = nxt.get_device_info() +# n = device_info.name +# if _check_brick(host, h) and _check_brick(name, n): +# yield nxt +# else: +# del nxt # Does not match criteria + +class USBSocket: + + def __init__(self, device=None): + # We instantiate a NXT object only when we connect if none supplied + #self.device = device + self._sock = device + self.debug = False + + def connect(self): + addr, port = addrport + if self._sock is None: + # Port is ignored + self._sock = NXT(addr) + + def send(self, data): + return self._sock.write( data ) + + def recv(self, numbytes): + return self._sock.read( numbytes ) + + def close(self): + del self._sock diff --git a/nxt-python-fantom/nxt/pyusbglue.py b/nxt-python-fantom/nxt/pyusbglue.py new file mode 100644 index 0000000..2d64d24 --- /dev/null +++ b/nxt-python-fantom/nxt/pyusbglue.py @@ -0,0 +1,84 @@ +# bluetooth.py module -- Glue code from NXT_Python to Lightblue, allowing +# NXT_Python to run on Mac without modification. Supports subset of +# PyBluez/bluetooth.py used by NXT_Python. +# +# Copyright (C) 2011 Tat-Chee Wan +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import usb + + +ID_VENDOR_LEGO = 0x0694 +ID_PRODUCT_NXT = 0x0002 + +class USBSocket: + bsize = 60 # USB socket block size + + def __init__(self, device): + self.device = device + self.handle = None + self.debug = False + + def __str__(self): + return 'USB (%s)' % (self.device.filename) + + def connect(self): + 'Use to connect to NXT.' + if self.debug: + print 'PyUSB Connecting...' + config = self.device.configurations[0] + iface = config.interfaces[0][0] + self.blk_out, self.blk_in = iface.endpoints + self.handle = self.device.open() + self.handle.setConfiguration(1) + self.handle.claimInterface(0) + self.handle.reset() + if self.debug: + print 'Connected.' + return Brick(self) + + def close(self): + 'Use to close the connection.' + if self.debug: + print 'Closing USB connection...' + self.device = None + self.handle = None + self.blk_out = None + self.blk_in = None + if self.debug: + print 'USB connection closed.' + + def send(self, data): + 'Use to send raw data over USB connection ***ADVANCED USERS ONLY***' + if self.debug: + print 'Send:', + print ':'.join('%02x' % ord(c) for c in data) + self.handle.bulkWrite(self.blk_out.address, data) + + def recv(self, numbytes): + 'Use to recieve raw data over USB connection ***ADVANCED USERS ONLY***' + data = self.handle.bulkRead(self.blk_in.address, numbytes) + if self.debug: + print 'Recv:', + print ':'.join('%02x' % (c & 0xFF) for c in data) + # NOTE: bulkRead returns a tuple of ints ... make it sane + return ''.join(chr(d & 0xFF) for d in data) + +def find_devices(lookup_names=False): # parameter is ignored + devicelist = [] + for bus in usb.busses(): + for device in bus.devices: + if device.idVendor == ID_VENDOR_LEGO and device.idProduct == ID_PRODUCT_NXT: + devicelist.append(device) + return devicelist + + diff --git a/nxt-python-fantom/nxt/usbsock.py b/nxt-python-fantom/nxt/usbsock.py index 1856a10..3e2ed58 100644 --- a/nxt-python-fantom/nxt/usbsock.py +++ b/nxt-python-fantom/nxt/usbsock.py @@ -12,11 +12,12 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -import usb -from nxt.brick import Brick +try: + import pyusbglue as usb +except ImportError: + import pyfantom as usb -ID_VENDOR_LEGO = 0x0694 -ID_PRODUCT_NXT = 0x0002 +from nxt.brick import Brick class USBSock(object): 'Object for USB connection to NXT' @@ -24,36 +25,25 @@ class USBSock(object): bsize = 60 # USB socket block size def __init__(self, device): - self.device = device - self.handle = None + self.sock = USBSocket(device) self.debug = False def __str__(self): - return 'USB (%s)' % (self.device.filename) + # FIXME: This breaks encapsulation + return 'USB (%s)' % (self.sock.filename) def connect(self): 'Use to connect to NXT.' if self.debug: print 'Connecting via USB...' - config = self.device.configurations[0] - iface = config.interfaces[0][0] - self.blk_out, self.blk_in = iface.endpoints - self.handle = self.device.open() - self.handle.setConfiguration(1) - self.handle.claimInterface(0) - self.handle.reset() - if self.debug: - print 'Connected.' + self.sock.connect() return Brick(self) def close(self): 'Use to close the connection.' if self.debug: print 'Closing USB connection...' - self.device = None - self.handle = None - self.blk_out = None - self.blk_in = None + self.sock.close() if self.debug: print 'USB connection closed.' @@ -62,21 +52,22 @@ class USBSock(object): if self.debug: print 'Send:', print ':'.join('%02x' % ord(c) for c in data) - self.handle.bulkWrite(self.blk_out.address, data) + self.sock.send(data) def recv(self): 'Use to recieve raw data over USB connection ***ADVANCED USERS ONLY***' - data = self.handle.bulkRead(self.blk_in.address, 64) + data = self.sock.recv(64) if self.debug: print 'Recv:', print ':'.join('%02x' % (c & 0xFF) for c in data) # NOTE: bulkRead returns a tuple of ints ... make it sane return ''.join(chr(d & 0xFF) for d in data) +def _check_brick(arg, value): + return arg is None or arg == value + def find_bricks(host=None, name=None): 'Use to look for NXTs connected by USB only. ***ADVANCED USERS ONLY***' - # FIXME: probably should check host and name - for bus in usb.busses(): - for device in bus.devices: - if device.idVendor == ID_VENDOR_LEGO and device.idProduct == ID_PRODUCT_NXT: - yield USBSock(device) + for d in usb.find_devices(lookup_names=True): + # FIXME: probably should check host and name + yield USBSock(d) -- cgit v1.2.3