diff --git a/host/dmctl/connection.py b/host/dmctl/connection.py index 0d35340..35fd35c 100644 --- a/host/dmctl/connection.py +++ b/host/dmctl/connection.py @@ -5,6 +5,7 @@ import array import os import re import struct +import sys import abc from typing import * @@ -16,16 +17,17 @@ from typing import * class DevConn: + _VER_MIN = 0x0010 + _VER_MAX = 0x00ff # ??? + pass + class UsbConn(DevConn): _USB_DEFAULT_VID = 0xcafe _USB_DEFAULT_PID = 0x1312 _SUBCLASS = 42 _PROTOCOL = 69 - _VER_MIN = 0x0010 - _VER_MAX = 0x00ff # ??? - def _open_dev(dev) -> Union[UsbConn, str]: import usb, usb.core @@ -61,20 +63,21 @@ class UsbConn(DevConn): # is in use by something else epout.write(b'\x00') resp = epin.read(4) - if resp[0] != 0 or resp[1] != 2: - return "Device does not recognise the 'get protocol version' command" - - verno = struct.unpack(' UsbConn._VER_MAX: - return "Version of device (%04x) too new, must be max. %04x" \ - % (hex(verno, USbConn._VER_MAX)) except usb.core.USBError: return "Device is busy, already used by something else? (If you use "+\ "the kernel module, use a character device from /dev instead.)" + if len(resp) < 4 or resp[0] != 0 or resp[1] != 2: + return "Device does not recognise the 'get protocol version' command" + + verno = struct.unpack(' DevConn._VER_MAX: + return "Version of device (%04x) too new, must be max. %04x" \ + % (hex(verno, DevConn._VER_MAX)) + return UsbConn(dev, cfg, itf, epout, epin) def try_find() -> Optional[UsbConn]: @@ -86,7 +89,8 @@ class UsbConn(DevConn): if dev is None or len(dev) != 1: return None - return UsbConn._open_dev(dev[0]) + rv = UsbConn._open_dev(dev[0]) + return None if isinstance(rv, str) else rv def is_usbdev_path(conn: str) -> bool: # eg. cafe:1312 @@ -175,17 +179,65 @@ class UsbConn(DevConn): class ChardevConn(DevConn): + _DEVCLASSNAME = "dmj" + def try_find() -> Optional[ChardevConn]: - return None + if sys.platform != 'linux': + return None + + opts = glob.glob("/dev/%s-*" % ChardevConn._DEVCLASSNAME) + + if len(opts) != 1: + return None + + rv = ChardevConn.try_open(opts[0]) + return None if isinstance(rv, str) else rv def is_chardev_path(conn: str) -> bool: - return None + if sys.platform != 'linux': + return False + + return re.match('^/dev/%s-[0-9]+$' % ChardevConn._DEVCLASSNAME, conn) is not None def try_open(conn: str) -> Union[ChardevConn, str]: - return "Could not open character device '%s'" % conn + if sys.platform != 'linux': + return "Chardev connections not available on %s, as these require "+\ + "a Linux kernel module" % sys.platform + try: + fd = os.open(conn, os.O_RDWR) + if fd < 0: + raise OSError("Negative file descriptor returned") + except OSError as e: + return "Could not open character device '%s': %s" % \ + (conn, e.message if hasattr(e, 'message') else e.strerror) - def __init__(self, fd): + os.write(fd, b'\x00') + resp = os.read(fd, 4) + + if len(resp) < 4 or resp[0] != 0 or resp[1] != 2: + return "Device does not recognise the 'get protocol version' command" + + verno = struct.unpack(' DevConn._VER_MAX: + return "Version of device (%04x) too new, must be max. %04x" \ + % (hex(verno, DevConn._VER_MAX)) + + return ChardevConn(fd) + + def read_raw(self, arr) -> int: + blob = os.read(self._fd, len(arr)) + for i in range(len(blob)): # TODO: memcpy? + arr[i] = blob[i] + return len(blob) + + def write_raw(self, b: bytes) -> int: + return os.write(self._fd, b) + + def __init__(self, fd: int): self._fd = fd def __enter__(self):