diff --git a/host/dmctl.py b/host/dmctl.py new file mode 100755 index 0000000..cf4a300 --- /dev/null +++ b/host/dmctl.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 + +import argparse +import glob, shutil +import sys +import traceback + +from typing import * + +import dmctl.connection as devconn +import dmctl.commands as devcmds + + +def dmctl_do(args: Any) -> int: + def get_device_info(conn, args): return devcmds.get_device_info(conn) + def get_mode_info(conn, args): return devcmds.get_mode_info(conn, args.mode) + def set_mode(conn, args): return devcmds.set_mode(conn, args.mode) + + def uart_hw_flowctl(conn, args): + if args.get: return devcmds.uart_hw_flowctl_get(conn) + fcen = args.set + if fcen is None: + if args.enable: fcen = True + elif args.disable: fcen = False + if fcen is None: + print("Error: none of '--get', '--set', '--enable' or '--disable' specified.") + return 1 + return devcmds.uart_hw_flowctl_set(conn, fcen) + def tempsensor(conn, args): + if args.get: return devcmds.tempsensor_get(conn) + tsen = args.set + if tsen is None: + if args.disable: tsen = 0 + if tsen is None: + print("Error: none of '--get', '--set' or '--disable' specified.") + return 1 + return devcmds.tempsensor_set(conn, tsen) + def jtag_scan(conn, args): + return devcmds.jtag_scan(args.start, args.end) + def sump_ovclk(conn, args): + if args.get: return devcmds.sump_overclock_get(conn) + oven = args.set + if oven is None: + if args.enable: oven = 1 + elif args.disable: oven = 0 + if oven is None: + print("Error: none of '--get', '--set', '--enable' or '--disable' specified.") + return 1 + return devcmds.sump_overclock_set(conn, oven) + + + print(repr(args)) + cmds = { + 'get-device-info': get_device_info, + 'get-mode-info': get_mode_info, + 'set-mode': set_mode, + + 'uart-cts-rts': uart_hw_flowctl, + 'tempsensor': tempsensor, + 'jtag-scan': jtag_scan, + 'sump-overclock': sump_ovclk, + } + + if args.subcmd is None: + print("No subcommand specified?!") + return 1 + + subfn = cmds.get(args.subcmd, None) + if subfn is None: + print("Unknown subcommand '%s'" % args.subcmd) + return 1 + + conn = devconn.connect(args.conn) + if isinstance(conn, str): + print("Could not connect to a device: %s." % conn) + return 1 + + with conn as dev: + return subfn(dev, args) + + +def main(argv: List[str]) -> int: + parser = argparse.ArgumentParser() + + def auto_int(x): + return int(x, 0) + + # commands: + # * get device info + # * get mode info + # * set mode + # + # * mode 1 (general): + # * 0x16 0x??: usb hwflowctl on/off, 0x??=0xc3: get current value + # * 0x15 0x00: get tempsensor active/address + # * 0x15 0x01 0x??: set tempsensor active/address + # + # * mode 2 (isp/jtag/...): probably nothing + # + # * mode 3 (jtag pinout scanner): + # * 0x30: get status + # * 0x31: get result (5 bytes: pin numbers of tck,tms,tdi,tdo,trst) + # * 0x32 0xNN 0xMM: start scan (pins 0xNN..0xMM) + # + # * mode 4 (sump logic analyzer): + # * 0x40: get overclock + # * 0x41: set overclock + # + # * mode 5 (ftdi/fx2 emul): probably nothing + + parser.add_argument('--conn', type=str, default=None, + help="Connection string. Either a dmj-char device in"+\ + " /dev, a USB bus.device number, or a USB VID:PID " + \ + "pair. Defaults to trying /dev/dmj-* (if there is " + \ + "only one), and cafe:1312 otherwise.") + #parser.descripiton = ... + + subcmds = parser.add_subparsers(required=True, metavar="subcommand", + dest="subcmd", help="Command to send to "+\ + "the device", + description="For more info on each " + \ + "subcommand, run the program with " + \ + "'subcommand --help' as arguments.") + + # general subcommands + getdevinfo = subcmds.add_parser("get-device-info", help="Shows device info") + + getmodeinfo = subcmds.add_parser("get-mode-info", help="Shows mode info."+\ + " A mode can optionally be specified, "+\ + "default is the current mode.") + getmodeinfo.add_argument('mode', default=None, nargs='?', + help="Mode to get info of. Defaults to the " + \ + "current mode, a single asterisk (*) means all.") + + setmode = subcmds.add_parser("set-mode", help="Set the device mode") + setmode.add_argument('mode', type=int, help="Mode to switch to, required.") + + # mode 1 commands + usbhwfctl = subcmds.add_parser("uart-cts-rts", help="Get, Enable/disable"+\ + " UART hardware flow control") + uartopts = usbhwfctl.add_mutually_exclusive_group() + uartopts.add_argument('--get', default=False, action='store_true', + help="Get current hardware flow control setting") + uartopts.add_argument('--set', default=None, type=bool, nargs=1, + help="Set hardware flow control") + uartopts.add_argument('--enable', default=False, action='store_true', + help="Enable hardware flow control, short for "+\ + "--set true") + uartopts.add_argument('--disable', default=False, action='store_true', + help="Disable hardware flow control, short for "+\ + "--set false") + + tempsense = subcmds.add_parser("tempsensor", help="Get or set the IRC " + \ + "emulation enable/address of the " + \ + "temperature sensor.") + tsopts = tempsense.add_mutually_exclusive_group() + tsopts.add_argument('--get', default=False, action='store_true', + help="Get current I2C emul state/address") + tsopts.add_argument('--set', default=None, type=auto_int, nargs=1, + help="Set emulated I2C address of the temperature "+\ + "sensor. 0 (or another invalid I2C device address) "+\ + "to disable the emulated I2C sensor device.") + tsopts.add_argument('--disable', default=False, action='store_true', + help="Disable emulated I2C temperature sensor, "+\ + "short for --set true") + + jtagscan = subcmds.add_parser("jtag-scan", help="JTAG pinout scanner") + jtagscan.add_argument("start-pin", type=int, help="Number of the start "+\ + "of the pin range to scan (inclusive)") + jtagscan.add_argument("end-pin", type=int, help="Number of the end of "+\ + "the pin range to scan (inclusive)") + + sumpla = subcmds.add_parser("sump-overclock", + help="SUMP logic analyzer overclock") + sumpopts = sumpla.add_mutually_exclusive_group() + sumpopts.add_argument('--get', default=False, action='store_true', + help="Get current overclocking state") + sumpopts.add_argument('--set', default=None, type=int, nargs=1, + help="Set current overclocking state") + sumpopts.add_argument('--enable', default=False, action='store_true', + help="Enable overclocking, short for --set 1") + sumpopts.add_argument('--disable', default=False, action='store_true', + help="Disable overclocking, short for --set 0") + + args = parser.parse_args()#args=argv) + return dmctl_do(args) + +if __name__ == '__main__': + try: + sys.exit(main(sys.argv) or 0) + except Exception:# as e + traceback.print_exc() + sys.exit(1); + diff --git a/host/dmctl/connection.py b/host/dmctl/connection.py index 84883b7..0d35340 100644 --- a/host/dmctl/connection.py +++ b/host/dmctl/connection.py @@ -1,7 +1,10 @@ from __future__ import annotations +import array import os +import re +import struct import abc from typing import * @@ -12,39 +15,163 @@ from typing import * # * implement device commands -class DevConn(abc.ABC): - def read(self, nb: int) -> bytes: - raise NotImplementedError() - - def write(self, b: bytes): - raise NotImplementedError() - - def __enter__(self): - raise NotImplementedError() - - def __exit__(self, type, value, tb): - raise NotImplementedError() - +class DevConn: + pass class UsbConn(DevConn): + _USB_DEFAULT_VID = 0xcafe + _USB_DEFAULT_PID = 0x1312 + _SUBCLASS = 42 + _PROTOCOL = 69 + _VER_MIN = 0x0010 + _VER_MAX = 0x00ff # ??? + + def _open_dev(dev) -> Union[UsbConn, str]: + import usb, usb.core + + cfg = dev.get_active_configuration() + + if cfg is None: # should be configured already, but eh + dev.set_configuration() + cfg = dev.get_active_configuration() + + if cfg is None: + return "Couldn't get or set device configuration, aaaaa" + + + itf = [i for i in cfg.interfaces() + if i.bInterfaceClass == usb.CLASS_VENDOR_SPEC and + i.bInterfaceSubClass == UsbConn._SUBCLASS and + i.bInterfaceProtocol == UsbConn._PROTOCOL] + + if len(itf) == 0: + return "No vendor control interface found for device" + if len(itf) != 1: + return "Multiple vendor control interfaces found for device, wtf?" + + itf = itf[0] + + epout = usb.util.find_descriptor(itf, custom_match = + lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT) + epin = usb.util.find_descriptor(itf, custom_match = + lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN) + + try: + # try to read the version number. if it throws, it means the usbdev + # is in use by something else + epout.write(b'\x00') + resp = epin.read(4) + if resp[0] != 0 or resp[1] != 2: + return "Device does not recognise the 'get protocol version' command" + + verno = struct.unpack(' UsbConn._VER_MAX: + return "Version of device (%04x) too new, must be max. %04x" \ + % (hex(verno, USbConn._VER_MAX)) + except usb.core.USBError: + return "Device is busy, already used by something else? (If you use "+\ + "the kernel module, use a character device from /dev instead.)" + + return UsbConn(dev, cfg, itf, epout, epin) + def try_find() -> Optional[UsbConn]: - return None + import usb.core + + dev = list(usb.core.find(find_all=True, idVendor=UsbConn._USB_DEFAULT_VID, + idProduct=UsbConn._USB_DEFAULT_PID)) + + if dev is None or len(dev) != 1: + return None + + return UsbConn._open_dev(dev[0]) def is_usbdev_path(conn: str) -> bool: - return None + # eg. cafe:1312 + match_vidpid = re.match('^[0-9a-fA-F]{4}:[0-9a-fA-F]{4}$', conn) + if match_vidpid is not None: + return True + + # eg. 1.123 or 1.123.1 + match_busdev = re.match('^[0-9]{1}\.[0-9]{1,3}(\.[0-9]{1,3})?$', conn) + return match_busdev is not None def try_open(conn: str) -> Union[UsbConn, str]: - return "Could not open USB device '%s'" % conn + import usb.core + conn_busdev = False + conntup = None - def __init__(self): - pass + # eg. cafe:1312 + match_vidpid = re.match('^([0-9a-fA-F]{4}):([0-9a-fA-F]{4})$', conn) + if match_vidpid is not None: + conntup = tuple(int(x,16) for x in match_vidpid.groups()) + else: + # eg. 1.123 + match_busdev = re.match('^([0-9]{1,3})\.([0-9]{1,3})(\.([0-9]{1,3}))?$', conn) + if match_busdev is not None: + conn_busdev = True + conntup = match_busdev.groups() + if conntup is not None: + if conntup[3] is None: + conntup = tuple(int(x) for x in conntup[0:2]) + else: + conntup = tuple(int(x) for x in (conntup[0:2] + (conntup[3],))) + + if conntup is None: + return "Could not open USB device '%s': not recognised" % conn + + dev = None + if conn_busdev: + if len(conntup) == 2: + dev = list(usb.core.find(find_all=True, bus=conntup[0], address=conntup[1])) + elif len(conntup) == 3: + dev = list(usb.core.find(find_all=True, bus=conntup[0], address=conntup[1], port=conntup[2])) + else: + assert False, ("huh? conntup=%s"%repr(conntup)) + else: + dev = list(usb.core.find(find_all=True, idVendor=conntup[0], idProduct=conntup[1])) + + if len(dev) == 0: + return "Connect to '%s' (%s): no such device found" % \ + (conn, "bus.address(.port)" if conn_busdev else "VID:PID") + if len(dev) != 1: + # TODO: nicer usb device list? + return "Connection string '%s' ambiguous, found more than one device: %s" % (conn, str(dev)) + + return UsbConn._open_dev(dev[0]) + + def read_raw(self, arr) -> int: + return self._epin.read(arr) + + def write_raw(self, b: bytes) -> int: + return self._epout.write(b) + + def __init__(self, dev, cfg, itf, epin, epout): + self._dev = dev + self._cfg = cfg + self._itf = itf + self._epin = epin + self._epout = epout def __enter__(self): return self def __exit__(self, type, value, tb): - pass + import usb.util + + usb.util.release_interface(self._dev, self._itf) + usb.util.dispose_resources(self._dev) + self._epout = None + self._epin = None + self._itf = None + self._cfg = None + self._dev = None + + def __str__(self): + return str(self._dev) class ChardevConn(DevConn): @@ -72,6 +199,30 @@ class ChardevConn(DevConn): class DmjDevice: def __init__(self, conn: DevConn): self._conn = conn + self._buf = array.array('B') + self._buf.fromlist([0]*64) + self._bufpos = 64 + + def read(self, nb: int) -> bytes: + if len(self._buf) - self._bufpos > nb: + rv = bytes(self._buf[self._bufpos:self._bufpos+nb]) + self._bufpos += nb + return rv + + rv = list(self._buf[self._bufpos:]) + + while True: # TODO: timeout? + nrd = self.conn.read_raw(self._buf) + if len(rv) + nrd >= nb: # last read, will have enough now + rv = rv + list(self._buf[nb - len(rv):]) + self._bufpos = nb - len(rv) + return bytes(rv) + else: + rv += list(self._buf) + + # TODO: buffer(/retry) writes as well? + def write(self, b: bytes): + return self.conn.write_raw(b) def __enter__(self): self._conn.__enter__() @@ -82,6 +233,7 @@ class DmjDevice: def connect(conn: Optional[str]) -> Union[DmjDevice, str]: + # TODO: some kind of connection backend registration? shrug if conn is None: attempt = ChardevConn.try_find() if attempt is not None: