8.9 KiB
packet capture from scratch
we're going to write a basic packet capture program for linux from scratch, in python, using the direct kernel interface instead of a library like libpcap. also, it's going to attach to asyncio
allowing for async packet capture that can be integrated into other I/O without blocking
(for the sake of simplicity, the file I/O is not going to be asyncio
-based, but you can use aiofiles
instead of the standard file interface if you want)
libpcap
, the standard packet capture library on linux is absolutely massive and contains a lot of code, but we can get a basic capture system working by simplifying the scope
- no legacy compatibility. we're only targeting the latest linux kernel
- no BPF (for now). BPF (Berkeley Packet Filter) is a VM that allows filtering captured packets in the kernel before they get delivered to our capture application. unfortunately BPF is Complicated so we're skipping it and just receiving every packet with no filter
how does packet capture work in linux
a lot of linux kernel interfaces actually aren't magic (unless it's netlink, or weird device specific ioctls, or DRI, or..... ignore all that stuff for now)
for example, besides being allowed to make standard TCP and UDP (ie, layer 3) sockets, using the kernel syscalls like socket
, setsockopt
, bind
, etc, linux actually allows you to directly create layer 2 sockets, which rather than being filtered by port number, are filtered by ethertype
in order to do this, you need to possess the CAP_NET_RAW
capability (and let's add CAP_NET_ADMIN
too, because we'll eventually need to set promiscuous mode)
systemd-run -tS --uid=$UID -pAmbientCapabilities="CAP_NET_RAW CAP_NET_ADMIN"
(this starts a shell as your normal user, but with the additional capabilities available. it's useful to avoid running things as root needlessly)
ok now, in python,
# the ethertype for ipv4
ETH_P_IP = 0x0800
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
by default, PF_PACKET
sockets are set up to capture from all interfaces, but we can set a specific interface using the bind
syscall. note that setsockopt
with SO_BINDTODEVICE
will not work -- see socket(7)
devname = "eth0"
sock.bind((devname, ETH_P_IP))
also note that in this call, htons
is not required (even though it was for socket
). python is just weird. don't ask too many questions
promiscuous mode
now we want to set promiscuous mode, so we can capture all packets we get instead of just ones addressed to us
side note: normal ethernet switches make this kind of nonfunctional by default. you'll either want a much more sophisticated switch (ie managed, where you can explicitly set your port to mirror all other traffic), or a much less sophisticated switch (ie, not a switch, one of those old school 10/100 hubs, which are basic enough to just mirror all traffic on all ports anyway). and i'm pretty sure if you're using an internal bridge interface rather than a physical connection promiscuous mode doesn't actually matter, but i haven't tested this
so for the record
class ifreq(ctypes.Structure):
_fields_ = [("ifr_ifrn", ctypes.c_char * 16),
("ifr_flags", ctypes.c_short)]
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
we love a little tiny bit of boilerplate because this isn't in the python stdlib. now we can just get the cool flags, add the one we want, and set it back
ifr = ifreq()
ifr.ifr_ifrn = devname.encode()
fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, ifr)
ifr.ifr_flags |= IFF_PROMISC
fcntl.ioctl(sock.fileno(), SIOCSIFFLAGS, ifr)
no blocking allowed
so at this point we're ready to capture. but there's a bit of an issue...... which is that blocking I/O is kinda for losers. we want the cool cooperative multitasking stuff, so it would be nice if there were a way to lift PF_PACKET
sockets into asyncio
now asyncio
has facilities for packet-based sockets already -- normal UDP stuff. the issue is, the UDP stuff expects the socket to be of type AF_INET
/ SOCK_DGRAM
, and it checks for this specifically. luckily after digging through cpython i quickly identified the right cool internal function to call the bypasses the checks. here's how you do that
first we need a protocol, just like if we were doing UDP
loop = asyncio.get_event_loop()
if not isinstance(loop, asyncio.selector_events.BaseSelectorEventLoop):
# windows is unsupported, and correspondingly non-selector event loops
# don't have the cool internal function we need :(
raise Exception("you gotta run it on linux")
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print("got packet", data)
sock.setblocking(False)
protocol = PcapRecvProtocol(sock)
waiter = loop.create_future()
transport = loop._make_datagram_transport(sock, protocol, waiter=waiter)
await waiter
and..... that's pretty much it. this should print out (layer 2 level) packets to stdout
writing a pcap file
ok so you might be thinking, writing packets to stdout is cool and all but really it would be nice to put them in like a normal pcap file
it turns out pcap files (not pcapng, i have no idea how those work) are really simple actually
they consist of a file header, and then a sequence of captured packet headers and contents
here's the file header
magic_number: u32
major_version: u16
minor_version: u16
reserved1: u32
reserved2: u32
snaplen: u32
linktype: u32
there are two magic numbers, one for if the file timestamps are in microseconds and one for nanoseconds. micoseconds are fine for us, so we use magic number 0xA1B2C3D4
. the current version in major 2, minor 4. "snaplen" is the maximum length of a packet: if packets are larger they get truncated. 2048 is more than enough to cover standard packet MSS. finally linktype (and some other stuff in a bitfield we also don't really care about -- if you want the full details you can read the actual spec [or well like this is a draft of it but whatever]) which we set to 1 for ethernet
PCAP_MAGIC_MICRO = 0xA1B2C3D4
PCAP_MAJ = 2
PCAP_MIN = 4
PCAP_SNAPLEN = 2048
LINKTYPE_ETHERNET = 1
pcapname = "capture.pcap"
outfile = open(pcapname, "wb")
outfile.write(struct.pack("<IHHIIII", PCAP_MAGIC_MICRO, PCAP_MAJ, PCAP_MIN,
0, 0, PCAP_SNAPLEN, LINKTYPE_ETHERNET))
outfile.flush()
the format for each captured packet is simple
timestamp_seconds: u32
timestamp_microseconds: u32
truncated_length: u32
original_length: u32
... packet data (of length truncated_length) ...
hold on... we need timestamps
so pcaps need timestamps. we could just record time.time()
when we receive a packet on our userspace side but that's going to be Inaccurate and is therefore Unacceptable. instead, it turns out we can actually ask the kernel when the last packet we got was actually received if you consult socket(7) for the right ioctl
to use (there's also a way to do this in recvmsg
control data but that's Hard so even though it lets us do one syscall per packet instead of 2 we're not doing it right now)
first we need more boilerplate
class timeval(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long),
("tv_usec", ctypes.c_long)]
SIOCGSTAMP = 0x8906
now we amend the Protocol
from before
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
print("got packet", tv.tv_sec, tv.tv_usec, data)
shrimple as that
so now we're finally able to save the packet contents to the pcap file in the format above
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
trunc = data[0:PCAP_SNAPLEN]
outfile.write(struct.pack("<IIII", tv.tv_sec, tv.tv_usec,
len(trunc), len(data)))
outfile.write(trunc)
outfile.flush()
that's it lmao
overall, not actually super hard right?
i added some logging and an actual ctrl-c handler in the full example code that you can see here: https://git.lain.faith/haskal/writeups/src/branch/main/2023/misc/pcap.py
raise [x for x in ().__class__.__base__.__subclasses__() if x.__name__ == 'Codec'][0].decode.__globals__["__builtins__"]["SystemExit"](0)