chardev connection

This commit is contained in:
Triss 2021-07-25 03:43:38 +02:00
parent 1d890a2aa2
commit cd3bea13f2
1 changed files with 70 additions and 18 deletions

View File

@ -5,6 +5,7 @@ import array
import os import os
import re import re
import struct import struct
import sys
import abc import abc
from typing import * from typing import *
@ -16,16 +17,17 @@ from typing import *
class DevConn: class DevConn:
_VER_MIN = 0x0010
_VER_MAX = 0x00ff # ???
pass pass
class UsbConn(DevConn): class UsbConn(DevConn):
_USB_DEFAULT_VID = 0xcafe _USB_DEFAULT_VID = 0xcafe
_USB_DEFAULT_PID = 0x1312 _USB_DEFAULT_PID = 0x1312
_SUBCLASS = 42 _SUBCLASS = 42
_PROTOCOL = 69 _PROTOCOL = 69
_VER_MIN = 0x0010
_VER_MAX = 0x00ff # ???
def _open_dev(dev) -> Union[UsbConn, str]: def _open_dev(dev) -> Union[UsbConn, str]:
import usb, usb.core import usb, usb.core
@ -61,20 +63,21 @@ class UsbConn(DevConn):
# is in use by something else # is in use by something else
epout.write(b'\x00') epout.write(b'\x00')
resp = epin.read(4) resp = epin.read(4)
if resp[0] != 0 or resp[1] != 2:
return "Device does not recognise the 'get protocol version' command"
verno = struct.unpack('<H', resp[2:])[0]
if verno < UsbConn._VER_MIN:
return "Version of device (%04x) too old, must be at least %04x" \
% (hex(verno, USbConn._VER_MIN))
if verno > UsbConn._VER_MAX:
return "Version of device (%04x) too new, must be max. %04x" \
% (hex(verno, USbConn._VER_MAX))
except usb.core.USBError: except usb.core.USBError:
return "Device is busy, already used by something else? (If you use "+\ return "Device is busy, already used by something else? (If you use "+\
"the kernel module, use a character device from /dev instead.)" "the kernel module, use a character device from /dev instead.)"
if len(resp) < 4 or resp[0] != 0 or resp[1] != 2:
return "Device does not recognise the 'get protocol version' command"
verno = struct.unpack('<H', resp[2:])[0]
if verno < DevConn._VER_MIN:
return "Version of device (%04x) too old, must be at least %04x" \
% (hex(verno, DevConn._VER_MIN))
if verno > DevConn._VER_MAX:
return "Version of device (%04x) too new, must be max. %04x" \
% (hex(verno, DevConn._VER_MAX))
return UsbConn(dev, cfg, itf, epout, epin) return UsbConn(dev, cfg, itf, epout, epin)
def try_find() -> Optional[UsbConn]: def try_find() -> Optional[UsbConn]:
@ -86,7 +89,8 @@ class UsbConn(DevConn):
if dev is None or len(dev) != 1: if dev is None or len(dev) != 1:
return None return None
return UsbConn._open_dev(dev[0]) rv = UsbConn._open_dev(dev[0])
return None if isinstance(rv, str) else rv
def is_usbdev_path(conn: str) -> bool: def is_usbdev_path(conn: str) -> bool:
# eg. cafe:1312 # eg. cafe:1312
@ -175,17 +179,65 @@ class UsbConn(DevConn):
class ChardevConn(DevConn): class ChardevConn(DevConn):
_DEVCLASSNAME = "dmj"
def try_find() -> Optional[ChardevConn]: def try_find() -> Optional[ChardevConn]:
if sys.platform != 'linux':
return None return None
opts = glob.glob("/dev/%s-*" % ChardevConn._DEVCLASSNAME)
if len(opts) != 1:
return None
rv = ChardevConn.try_open(opts[0])
return None if isinstance(rv, str) else rv
def is_chardev_path(conn: str) -> bool: def is_chardev_path(conn: str) -> bool:
return None if sys.platform != 'linux':
return False
return re.match('^/dev/%s-[0-9]+$' % ChardevConn._DEVCLASSNAME, conn) is not None
def try_open(conn: str) -> Union[ChardevConn, str]: def try_open(conn: str) -> Union[ChardevConn, str]:
return "Could not open character device '%s'" % conn if sys.platform != 'linux':
return "Chardev connections not available on %s, as these require "+\
"a Linux kernel module" % sys.platform
try:
fd = os.open(conn, os.O_RDWR)
if fd < 0:
raise OSError("Negative file descriptor returned")
except OSError as e:
return "Could not open character device '%s': %s" % \
(conn, e.message if hasattr(e, 'message') else e.strerror)
def __init__(self, fd): os.write(fd, b'\x00')
resp = os.read(fd, 4)
if len(resp) < 4 or resp[0] != 0 or resp[1] != 2:
return "Device does not recognise the 'get protocol version' command"
verno = struct.unpack('<H', resp[2:])[0]
if verno < DevConn._VER_MIN:
return "Version of device (%04x) too old, must be at least %04x" \
% (hex(verno, DevConn._VER_MIN))
if verno > DevConn._VER_MAX:
return "Version of device (%04x) too new, must be max. %04x" \
% (hex(verno, DevConn._VER_MAX))
return ChardevConn(fd)
def read_raw(self, arr) -> int:
blob = os.read(self._fd, len(arr))
for i in range(len(blob)): # TODO: memcpy?
arr[i] = blob[i]
return len(blob)
def write_raw(self, b: bytes) -> int:
return os.write(self._fd, b)
def __init__(self, fd: int):
self._fd = fd self._fd = fd
def __enter__(self): def __enter__(self):