megacom/megacom/__init__.py

239 lines
7.7 KiB
Python

import argparse
import asyncio
import contextlib
import errno
import re
import sys
import termios
import tty
from types import TracebackType
from typing import Any, List, Optional, Tuple, Type
from typing_extensions import Literal
import serial
import serial_asyncio
MODE_RE = re.compile(r"^([856])([NEOMS])(1|1.5|2)$")
MODE_LOOKUP = {
"bytesize": {
"8": serial.EIGHTBITS, "5": serial.FIVEBITS, "6": serial.SIXBITS
},
"parity": {
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE
},
"stopbits": {
"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO
}
}
class TtyRaw:
__slots__ = ["isatty", "fd", "settings"]
isatty: bool
fd: int
settings: List[Any]
def __init__(self) -> None:
self.isatty = False
self.fd = 0
self.settings = []
def __enter__(self) -> None:
if sys.stdin.isatty():
self.isatty = True
self.fd = sys.stdin.fileno()
self.settings = termios.tcgetattr(self.fd)
tty.setraw(self.fd)
return None
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> Literal[False]:
if self.isatty:
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.settings)
return False
async def setup_async() -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout.buffer)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, loop)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer)
return (reader, writer)
# CTRL-A
ESC_CHAR = b"\x01"
class KeycodeHandler:
__slots__ = ["exit_flag", "esc"]
exit_flag: asyncio.Event
esc: bool
def __init__(self) -> None:
self.exit_flag = asyncio.Event()
self.esc = False
def process(self, byte: bytes) -> bytes:
if self.esc:
self.esc = False
if byte == b"q":
self.exit_flag.set()
return b""
elif byte == ESC_CHAR:
return ESC_CHAR
else:
return b""
if byte == ESC_CHAR:
self.esc = True
return b""
return byte
async def megacom(tty: str, baud: int, mode: str) -> None:
(stdin, stdout) = await setup_async()
m = MODE_RE.match(mode)
if m is None:
raise Exception(f"invalid mode: {mode}")
bytesize = MODE_LOOKUP["bytesize"][m.group(1)]
parity = MODE_LOOKUP["parity"][m.group(2)]
stopbits = MODE_LOOKUP["stopbits"][m.group(3)]
loop = asyncio.get_event_loop()
keycodes = KeycodeHandler()
return await megacom_main(stdin, stdout, tty, baud, bytesize, parity, stopbits, loop, keycodes)
async def megacom_main(stdin: asyncio.StreamReader, stdout: asyncio.StreamWriter, tty: str,
baud: int, bytesize: Any, parity: Any, stopbits: Any,
loop: asyncio.AbstractEventLoop, keycodes: KeycodeHandler) -> None:
printed_fnf = False
while True:
try:
(serialin, serialout) = await serial_asyncio.open_serial_connection(
loop=loop, url=tty, baudrate=baud, bytesize=bytesize,
parity=parity, stopbits=stopbits)
break
except serial.SerialException as e:
if e.errno == errno.ENOENT:
# the device could not be plugged in yet.. just wait
if not printed_fnf:
printed_fnf = True
stdout.write(f"waiting for {tty} to become available...\r\n".encode())
await stdout.drain()
else:
# permanent failure
stdout.write(f"failed to open port because: {e}\r\n".encode())
await stdout.drain()
return
except Exception as e:
# permanant failure
stdout.write(f"failed to open port because: {e}\r\n".encode())
await stdout.drain()
return
# wait a bit
start = loop.time()
while loop.time() - start < 2.0 and not keycodes.exit_flag.is_set():
timeout = loop.time() - start
try:
byte = await asyncio.wait_for(stdin.read(1), timeout=timeout)
keycodes.process(byte)
except asyncio.TimeoutError:
continue
if keycodes.exit_flag.is_set():
stdout.write(b"connection cancelled\r\n")
await stdout.drain()
return
stdout.write(f"megacom connected to {tty}\r\n".encode())
await stdout.drain()
async def connect_pipe(pin: asyncio.StreamReader, pout: asyncio.StreamWriter,
ctrl: bool = False) -> None:
while not pin.at_eof():
c: bytes = await pin.read(1)
if len(c) == 0:
continue
c = keycodes.process(c)
if len(c) == 0:
continue
pout.write(c)
await pout.drain()
stdin_to_serial: asyncio.Task = asyncio.create_task(connect_pipe(stdin, serialout, True))
serial_to_stdout: asyncio.Task = asyncio.create_task(connect_pipe(serialin, stdout))
time_to_exit: asyncio.Task = asyncio.create_task(keycodes.exit_flag.wait())
do_retry = False
def handle_done(task):
nonlocal do_retry
if task.done():
exc = task.exception()
if exc is not None:
stdout.write(f"\r\n\r\nmegacom encountered error: {exc}\r\n".encode())
if isinstance(exc, serial.SerialException):
do_retry = True
else:
task.result()
return task.done()
await asyncio.wait([time_to_exit, stdin_to_serial, serial_to_stdout],
return_when=asyncio.FIRST_COMPLETED)
if handle_done(time_to_exit):
pass
elif handle_done(stdin_to_serial):
pass
elif handle_done(serial_to_stdout):
pass
stdin_to_serial.cancel()
serial_to_stdout.cancel()
with contextlib.suppress(asyncio.CancelledError):
with contextlib.suppress(serial.SerialException):
await stdin_to_serial
with contextlib.suppress(asyncio.CancelledError):
with contextlib.suppress(serial.SerialException):
await serial_to_stdout
if not do_retry:
stdout.write(b"\r\n\r\nmegacom is exiting\r\n")
with contextlib.suppress(serial.SerialException):
await serialout.drain()
with contextlib.suppress(serial.SerialException):
serialout.close()
await stdout.drain()
if do_retry:
return await megacom_main(stdin, stdout, tty, baud, bytesize, parity, stopbits, loop,
keycodes)
def main() -> None:
parser = argparse.ArgumentParser(prog="megacom",
description="Alternative console-based UART client")
parser.add_argument("tty", type=str, default="/dev/ttyUSB0", nargs="?",
help="Path to UART device [/dev/ttyUSB0]")
parser.add_argument("-b", "--baud", type=int, default=115200, help="UART baud rate [115200]")
parser.add_argument("-m", "--mode", type=str, default="8N1", help="UART mode string [8N1]")
args = parser.parse_args()
with TtyRaw():
asyncio.run(megacom(args.tty, args.baud, args.mode))