diff --git a/host/dmctl.py b/host/dmctl.py index 3df9e75..3333ea3 100755 --- a/host/dmctl.py +++ b/host/dmctl.py @@ -129,9 +129,9 @@ def main(argv: List[str]) -> int: getmodeinfo = subcmds.add_parser("get-mode-info", help="Shows mode info."+\ " A mode can optionally be specified, "+\ "default is the current mode.") - getmodeinfo.add_argument('mode', default=None, nargs='?', + getmodeinfo.add_argument('mode', default=None, nargs='?', #type=int, help="Mode to get info of. Defaults to the " + \ - "current mode, a single asterisk (*) means all.") + "current mode, 'all' means all modes.") setmode = subcmds.add_parser("set-mode", help="Set the device mode") setmode.add_argument('mode', type=int, help="Mode to switch to, required.") diff --git a/host/dmctl/commands.py b/host/dmctl/commands.py index 807e7dc..36ce735 100644 --- a/host/dmctl/commands.py +++ b/host/dmctl/commands.py @@ -4,54 +4,105 @@ from typing import * from .protocol import * -def get_device_info(conn: DmjDevice) -> int: +def get_device_info(dev: DmjDevice) -> int: + print("%s: protocol version: %02x.%02x, currently in mode %d (%s)" % \ + (dev.infotext, dev.protocol_version >> 8, dev.protocol_version & 0xff, + dev.current_mode, dev.mode_info[dev.current_mode].infotext) + ) + print("available modes: %s" % ', '.join(str(x) for x in dev.mode_info.keys())) + #for mi, mv in dev.mode_info.items(): + # print("\t% 2d: '%s' version %02x.%02x with %sfeatures %s" % \ + # (mi, mv.infotext, mv.version >> 8, mv.version & 0xff, + # ("no " if len(mv.features) == 0 else ""), + # ', '.join(str(x) for x in mv.features)) # TODO: better features display? + # ) + return 0 -def get_mode_info(conn: DmjDevice, mode: Optional[int]) -> int: +def get_mode_info(dev: DmjDevice, mode: Optional[str]) -> int: + def try_parse(s: str): + try: return int(s) + except ValueError: return None + def is_int(s: str): + try: + int(s) + return True + except ValueError: return None + + if mode is None: + mode = dev.current_mode + + if mode == "all" or (is_int(mode) and int(mode) < 0): + for mi, mv in dev.mode_info.items(): + print("mode % 2d: %s: version %02x.%02x with %sfeatures %s" % \ + (mi, mv.infotext, mv.version >> 8, mv.version & 0xff, + ("no " if len(mv.features) == 0 else ""), + ', '.join(str(x) for x in mv.features)) # TODO: better features display? + ) + elif is_int(mode): + mode = int(mode) + if mode in dev.mode_info: + mv = dev.mode_info[mode] + print("mode %d: %s: version %02x.%02x with %sfeatures %s" % \ + (mode, mv.infotext, mv.version >> 8, mv.version & 0xff, + ("no " if len(mv.features) == 0 else ""), + ', '.join(str(x) for x in list(mv.features))) # TODO: better features display? + ) + return 0 + else: + print("No mode %d available" % mode) + return 1 + else: + print("Invalid mode '%s'" % mode) + return 1 + + +def set_mode(dev: DmjDevice, mode: int) -> int: + try: + dev.set_mode(mode) + return 0 + except Exception as e: + print(str(e)) + return 1 + + +# --- + + +def uart_hw_flowctl_get(dev: DmjDevice) -> int: return 0 -def set_mode(conn: DmjDevice, mode: int) -> int: +def uart_hw_flowctl_set(dev: DmjDevice, v: bool) -> int: return 0 # --- -def uart_hw_flowctl_get(conn: DmjDevice) -> int: +def tempsensor_get(dev: DmjDevice) -> int: return 0 -def uart_hw_flowctl_set(conn: DmjDevice, v: bool) -> int: +def tempsensor_set(dev, v: int) -> int: return 0 # --- -def tempsensor_get(conn: DmjDevice) -> int: - return 0 - - -def tempsensor_set(conn, v: int) -> int: +def jtag_scan(dev: DmjDevice) -> int: return 0 # --- -def jtag_scan(conn) -> int: +def sump_overclock_get(dev: DmjDevice) -> int: return 0 -# --- - - -def sump_overclock_get(conn) -> int: - return 0 - - -def sump_overclock_set(conn, v: int) -> int: +def sump_overclock_set(dev: DmjDevice, v: int) -> int: return 0 diff --git a/host/dmctl/connection.py b/host/dmctl/connection.py index f5adebc..7c34e1f 100644 --- a/host/dmctl/connection.py +++ b/host/dmctl/connection.py @@ -1,6 +1,7 @@ from __future__ import annotations +import glob import os import re import struct diff --git a/host/dmctl/protocol.py b/host/dmctl/protocol.py index 57f6f70..0bab62b 100644 --- a/host/dmctl/protocol.py +++ b/host/dmctl/protocol.py @@ -1,5 +1,6 @@ import array +import struct from typing import * @@ -16,10 +17,10 @@ STAT_BADARG = 0x04 def check_statpl(stat, pl, defmsg, minl=None, maxl=None): statmsgs = { STAT_OK: "ok", - STAT_ILLCMD = "Illegal/invalid/unknown command", - STAT_BADMODE = "Bad mode for this command", - STAT_NOSUCHMODE = "No such mode exists or is available", - STAT_BADARG = "Bad argument" + STAT_ILLCMD: "Illegal/invalid/unknown command", + STAT_BADMODE: "Bad mode for this command", + STAT_NOSUCHMODE: "No such mode exists or is available", + STAT_BADARG: "Bad argument" } if stat != STAT_OK: @@ -51,31 +52,33 @@ class DmjDevice: self._buf = array.array('B') self._buf.fromlist([0]*64) self._bufpos = 64 + self._buffill = 0 def read(self, nb: int) -> bytes: - if len(self._buf) - self._bufpos > nb: + #print("==> buffill=%d bufpos=%d nb=%d"%(self._buffill, self._bufpos, nb)) + if self._buffill - self._bufpos >= nb: rv = bytes(self._buf[self._bufpos:self._bufpos+nb]) self._bufpos += nb - print("==> return quick", rv) + #print("==> return quick bufpos=%d"%self._bufpos, rv) return rv - rv = list(self._buf[self._bufpos:]) + rv = list(self._buf[self._bufpos:self._buffill]) while True: # TODO: timeout? - nrd = self.conn.read_raw(self._buf) - print("==> read raw", repr(self._buf[:nrd])) + nrd = self._conn.read_raw(self._buf) + self._buffill = nrd + #print("==> read raw", repr(self._buf[:nrd])) if len(rv) + nrd >= nb: # last read, will have enough now - bytes(rv = rv + list(self._buf[nb - len(rv):])) - self._bufpos = nb - len(rv) - print("==> return", rv) + rvold = len(rv) + nadd = nb - len(rv) + rv = bytes(rv + list(self._buf[:nadd])) + self._bufpos = nadd + #print("==> bufpos=%d rv=%d->%d nadd=%d nb=%d" % (self._bufpos, rvold, len(rv), nadd, nb)) + #print("==> bufpos=%d return"%self._bufpos, rv) return rv else: rv += list(self._buf) - def write(self, b: bytes): - print("==> write", b) - return self._conn.write(b) - def read_resp(self) -> Tuple[int, bytes]: resplen = self.read(2) resp = resplen[0] @@ -90,12 +93,13 @@ class DmjDevice: plen |= self.read(1) << 14 bs = self.read(plen) - print("==> got resp %d res %s" % (resp, repr(bs))) + #print("==> got resp %d res %s" % (resp, repr(bs))) return (resp, bs) # TODO: buffer(/retry) writes as well? def write(self, b: bytes): - return self.conn.write_raw(b) + #print("==> write raw", b) + return self._conn.write_raw(b) def __enter__(self): self._conn.__enter__() @@ -115,7 +119,7 @@ class DmjDevice: return struct.unpack(' Set[int]: - self.write('b\x01') + self.write(b'\x01') stat, pl = self.read_resp() check_statpl(stat, pl, "get modes", 2, 2) @@ -124,7 +128,7 @@ class DmjDevice: return { i for i in range(1,16) if (modemap & (1< int: - self.write('b\x02') + self.write(b'\x02') stat, pl = self.read_resp() check_statpl(stat, pl, "get mode", 1, 1) @@ -139,7 +143,7 @@ class DmjDevice: check_statpl(stat, pl, "set mode", 0, 0) def get_info_text(self) -> str: - self.write('\x04') + self.write(b'\x04') stat, pl = self.read_resp() check_statpl(stat, pl, "get info string", 1) @@ -148,7 +152,7 @@ class DmjDevice: # common mode commands def get_mode_name(self, mode: int) -> str: - cmd = b'\x00' + cmd = bytearray(b'\x00') cmd[0] |= mode << 4 self.write(cmd) stat, pl = self.read_resp() @@ -157,7 +161,7 @@ class DmjDevice: return pl.rstrip(b'\0').decode('utf-8') def get_mode_version(self, mode: int) -> str: - cmd = b'\x01' + cmd = bytearray(b'\x01') cmd[0] |= mode << 4 self.write(cmd) stat, pl = self.read_resp() @@ -166,13 +170,13 @@ class DmjDevice: return struct.unpack(' Set[int]: - cmd = b'\x02' + cmd = bytearray(b'\x02') cmd[0] |= mode << 4 self.write(cmd) stat, pl = self.read_resp() check_statpl(stat, pl, "get mode features", 1, 1) - return pl[0] + return { i for i in range(1, 8) if (pl[0] & (1< Tuple[Optional[int], Optional[int]] + def m1_tempsensor_i2cemul_set(self, addr: Optional[int]) -> Tuple[Optional[int], Optional[int]]: cmd = b'\x15\x01\xff' cmd[2] = 0xff if addr is None else addr self.write(cmd)