writeups/2023/misc/pcap.py

107 lines
2.9 KiB
Python

import asyncio
import ctypes
import fcntl
import logging
import signal
import socket
import struct
logger = logging.getLogger(__name__)
ETH_P_IP = 0x0800
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
SIOCGSTAMP = 0x8906
class ifreq(ctypes.Structure):
_fields_ = [("ifr_ifrn", ctypes.c_char * 16),
("ifr_flags", ctypes.c_short)]
class timeval(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long), ("tv_usec", ctypes.c_long)]
PCAP_MAGIC_MICRO = 0xA1B2C3D4
PCAP_MAJ = 2
PCAP_MIN = 4
PCAP_SNAPLEN = 2048
LINKTYPE_ETHERNET = 1
async def do_pcap(devname="wlp2s0", pcapname="capture.pcap"):
loop = asyncio.get_event_loop()
if not isinstance(loop, asyncio.selector_events.BaseSelectorEventLoop):
raise Exception("you gotta run it on linux")
# because we're about to do some evil fuckery
logger.info("opening packet capture")
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
sock.setblocking(False)
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, devname)
# NO htons on this call
sock.bind((devname, ETH_P_IP))
logger.info("setting promisc mode")
ifr = ifreq()
ifr.ifr_ifrn = devname.encode()
fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, ifr)
ifr.ifr_flags |= IFF_PROMISC
fcntl.ioctl(sock.fileno(), SIOCSIFFLAGS, ifr)
logger.info("opening output file")
outfile = open(pcapname, "wb")
outfile.write(struct.pack("<IHHIIII", PCAP_MAGIC_MICRO, PCAP_MAJ, PCAP_MIN, 0, 0,
PCAP_SNAPLEN, LINKTYPE_ETHERNET))
outfile.flush()
conn_evt = asyncio.Event()
logger.info("loading socket into asyncio")
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
logger.info("conn made!")
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
logger.info("recv packet")
trunc = data[0:PCAP_SNAPLEN]
outfile.write(struct.pack("<IIII", tv.tv_sec, tv.tv_usec, len(trunc), len(data)))
outfile.write(trunc)
outfile.flush()
def connection_lost(self, exc):
conn_evt.set()
protocol = PcapRecvProtocol(sock)
waiter = loop.create_future()
transport = loop._make_datagram_transport(sock, protocol, waiter=waiter)
await waiter
run_capture = asyncio.Event()
def shutdown_capture():
logger.info("ctrl-c")
run_capture.set()
logger.info("running capture loop")
loop.add_signal_handler(signal.SIGINT, shutdown_capture)
await run_capture.wait()
logger.info("closing")
transport.close()
await conn_evt.wait()
outfile.close()
logger.info("done")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(do_pcap())