initial commit

This commit is contained in:
xenia 2021-05-12 15:41:58 -04:00
parent ba98dfb949
commit e007f7b986
3 changed files with 146 additions and 6 deletions

View File

@ -0,0 +1,137 @@
import argparse
import asyncio
import contextlib
import os
import re
import sys
import termios
import tty
from types import TracebackType
from typing import Optional, Tuple, Type
import serial
import serial_asyncio
MODE_RE = re.compile(r"^([856])([NEOMS])(1|1.5|2)$")
MODE_LOOKUP = {
"bytesize": {
"8": serial.EIGHTBITS, "5": serial.FIVEBITS, "6": serial.SIXBITS
},
"parity": {
"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD,
"M": serial.PARITY_MARK, "S": serial.PARITY_SPACE
},
"stopbits": {
"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO
}
}
class TtyRaw:
__slots__ = ["isatty", "fd", "settings"]
def __init__(self) -> None:
self.isatty = False
self.fd = None
self.settings = None
def __enter__(self) -> None:
if sys.stdin.isatty():
self.isatty = True
self.fd = sys.stdin.fileno()
self.settings = termios.tcgetattr(self.fd)
tty.setraw(self.fd)
return None
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> bool:
if self.isatty:
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.settings)
async def setup_async() -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout.buffer)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, loop)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer)
return (reader, writer)
async def megacom(tty: str, baud: int, mode: str) -> None:
(stdin, stdout) = await setup_async()
m = MODE_RE.match(mode)
if m is None:
raise Exception(f"invalid mode: {mode}")
bytesize = MODE_LOOKUP["bytesize"][m.group(1)]
parity = MODE_LOOKUP["parity"][m.group(2)]
stopbits = MODE_LOOKUP["stopbits"][m.group(3)]
(serialin, serialout) = await serial_asyncio.open_serial_connection(
loop=asyncio.get_event_loop(), url=tty, baudrate=baud, bytesize=bytesize, parity=parity,
stopbits=stopbits)
stdout.write(f"megacom connected to {tty}:{baud}:{mode}\r\n".encode())
await stdout.drain()
exit_flag = asyncio.Event()
async def connect_pipe(pin: asyncio.StreamReader, pout: asyncio.StreamWriter,
ctrl: bool = False) -> None:
nonlocal exit_flag
esc: bool = False
while not pin.at_eof():
c: bytes = await pin.read(1)
if len(c) == 0:
continue
if esc:
esc = False
if c == b"\x01":
pout.write(b"\x01")
await pout.drain()
elif c == b"q":
exit_flag.set()
break
if c == b"\x01" and ctrl:
esc = True
continue
pout.write(c)
await pout.drain()
stdin_to_serial = asyncio.create_task(connect_pipe(stdin, serialout, True))
serial_to_stdout = asyncio.create_task(connect_pipe(serialin, stdout))
await exit_flag.wait()
stdin_to_serial.cancel()
serial_to_stdout.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stdin_to_serial
with contextlib.suppress(asyncio.CancelledError):
await serial_to_stdout
stdout.write(b"\r\n\r\nmegacom is exiting\r\n")
await serialout.drain()
serialout.close()
await stdout.drain()
def main() -> None:
parser = argparse.ArgumentParser(description="Alternative console-based UART client")
parser.add_argument("tty", type=str, default="/dev/ttyUSB0", nargs="?",
help="Path to UART device [/dev/ttyUSB0]")
parser.add_argument("-b", "--baud", type=int, default=115200, help="UART baud rate [115200]")
parser.add_argument("-m", "--mode", type=str, default="8N1", help="UART mode string [8N1]")
args = parser.parse_args()
with TtyRaw():
asyncio.run(megacom(args.tty, args.baud, args.mode))

View File

@ -1 +1,2 @@
print("Hello, World!")
import megacom
megacom.main()

View File

@ -2,17 +2,19 @@ from setuptools import setup
setup(name='megacom',
version='0.1',
description='Sample text',
url='https://example.com',
author='John Smith',
author_email='none@example.com',
license='Undecided',
description='Alternative console-based UART client',
url='https://awoo.systems',
author='haskal',
author_email='haskal@awoo.systems',
license='AGPLv3',
packages=['megacom'],
install_requires=[
"pyserial-asyncio"
],
include_package_data=True,
entry_points={
'console_scripts': [
"megacom=megacom:main"
]
},
zip_safe=False)