This commit is contained in:
xenia 2023-07-04 03:48:12 -04:00
parent 20c93b6e9d
commit dc5c4b5ceb
2 changed files with 317 additions and 0 deletions

211
2023/misc/pcap.md Normal file
View File

@ -0,0 +1,211 @@
# packet capture from scratch
we're going to write a basic packet capture program for linux from scratch, in python, using the direct kernel interface instead of a library like libpcap. also, it's going to attach to `asyncio` allowing for async packet capture that can be integrated into other I/O without blocking
(for the sake of simplicity, the file I/O is not going to be `asyncio`-based, but you can use `aiofiles` instead of the standard file interface if you want)
`libpcap`, the standard packet capture library on linux is absolutely massive and contains a lot of code, but we can get a basic capture system working by simplifying the scope
- no legacy compatibility. we're only targeting the latest linux kernel
- no BPF (for now). BPF (Berkeley Packet Filter) is a VM that allows filtering captured packets in the kernel before they get delivered to our capture application. unfortunately BPF is Complicated so we're skipping it and just receiving every packet with no filter
## how does packet capture work in linux
a lot of linux kernel interfaces actually aren't magic (unless it's netlink, or weird device specific ioctls, or DRI, or..... ignore all that stuff for now)
for example, besides being allowed to make standard TCP and UDP (ie, layer 3) sockets, using the kernel syscalls like `socket`, `setsockopt`, `bind`, etc, linux actually allows you to directly create layer 2 sockets, which rather than being filtered by port number, are filtered by ethertype
in order to do this, you need to possess the `CAP_NET_RAW` capability (and let's add `CAP_NET_ADMIN` too, because we'll eventually need to set promiscuous mode)
```bash
systemd-run -tS --uid=$UID -pAmbientCapabilities="CAP_NET_RAW CAP_NET_ADMIN"
```
(this starts a shell as your normal user, but with the additional capabilities available. it's useful to avoid running things as root needlessly)
ok now, in python,
```python
# the ethertype for ipv4
ETH_P_IP = 0x0800
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
```
by default, `PF_PACKET` sockets are set up to capture from all interfaces, but we can set a specific interface using the `bind` syscall. note that `setsockopt` with `SO_BINDTODEVICE` will *not* work -- see [socket(7)](https://linux.die.net/man/7/socket)
```python
devname = "eth0"
sock.bind((devname, ETH_P_IP))
```
also note that in this call, `htons` is not required (even though it was for `socket`). python is just weird. don't ask too many questions
## promiscuous mode
now we want to set promiscuous mode, so we can capture all packets we get instead of just ones addressed to us
side note: normal ethernet switches make this kind of nonfunctional by default. you'll either want a much more sophisticated switch (ie managed, where you can explicitly set your port to mirror all other traffic), *or* a much *less* sophisticated switch (ie, not a switch, one of those old school 10/100 hubs, which are basic enough to just mirror all traffic on all ports anyway). and i'm pretty sure if you're using an internal bridge interface rather than a physical connection promiscuous mode doesn't actually matter, but i haven't tested this
so for the record
```python
class ifreq(ctypes.Structure):
_fields_ = [("ifr_ifrn", ctypes.c_char * 16),
("ifr_flags", ctypes.c_short)]
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
```
we love a little tiny bit of boilerplate because this isn't in the python stdlib. now we can just get the cool flags, add the one we want, and set it back
```python
ifr = ifreq()
ifr.ifr_ifrn = devname.encode()
fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, ifr)
ifr.ifr_flags |= IFF_PROMISC
fcntl.ioctl(sock.fileno(), SIOCSIFFLAGS, ifr)
```
## no blocking allowed
so at this point we're ready to capture. but there's a bit of an issue...... which is that blocking I/O is kinda for losers. we want the cool cooperative multitasking stuff, so it would be nice if there were a way to lift `PF_PACKET` sockets into `asyncio`
now `asyncio` has facilities for packet-based sockets already -- normal UDP stuff. the issue is, the UDP stuff expects the socket to be of type `AF_INET` / `SOCK_DGRAM`, and it checks for this specifically. luckily after digging through cpython i quickly identified the right cool internal function to call the bypasses the checks. here's how you do that
first we need a protocol, just like if we were doing UDP
```python
loop = asyncio.get_event_loop()
if not isinstance(loop, asyncio.selector_events.BaseSelectorEventLoop):
# windows is unsupported, and correspondingly non-selector event loops
# don't have the cool internal function we need :(
raise Exception("you gotta run it on linux")
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print("got packet", data)
sock.setblocking(False)
protocol = PcapRecvProtocol(sock)
waiter = loop.create_future()
transport = loop._make_datagram_transport(sock, protocol, waiter=waiter)
await waiter
```
and..... that's pretty much it. this should print out (layer 2 level) packets to stdout
## writing a pcap file
ok so you might be thinking, writing packets to stdout is cool and all but really it would be nice to put them in like a normal pcap file
it turns out pcap files (not pcapng, i have no idea how those work) are really simple actually
they consist of a file header, and then a sequence of captured packet headers and contents
here's the file header
```
magic_number: u32
major_version: u16
minor_version: u16
reserved1: u32
reserved2: u32
snaplen: u32
linktype: u32
```
there are two magic numbers, one for if the file timestamps are in microseconds and one for nanoseconds. micoseconds are fine for us, so we use magic number `0xA1B2C3D4`. the current version in major 2, minor 4. "snaplen" is the maximum length of a packet: if packets are larger they get truncated. 2048 is more than enough to cover standard packet MSS. finally linktype (and some other stuff in a bitfield we also don't really care about -- if you want the full details you can read the [actual spec](https://datatracker.ietf.org/doc/id/draft-gharris-opsawg-pcap-00.html) [or well like this is a draft of it but whatever]) which we set to 1 for ethernet
```python
PCAP_MAGIC_MICRO = 0xA1B2C3D4
PCAP_MAJ = 2
PCAP_MIN = 4
PCAP_SNAPLEN = 2048
LINKTYPE_ETHERNET = 1
pcapname = "capture.pcap"
outfile = open(pcapname, "wb")
outfile.write(struct.pack("<IHHIIII", PCAP_MAGIC_MICRO, PCAP_MAJ, PCAP_MIN,
0, 0, PCAP_SNAPLEN, LINKTYPE_ETHERNET))
outfile.flush()
```
the format for each captured packet is simple
```
timestamp_seconds: u32
timestamp_microseconds: u32
truncated_length: u32
original_length: u32
... packet data (of length truncated_length) ...
```
## hold on... we need timestamps
so pcaps need timestamps. we could just record `time.time()` when we receive a packet on our userspace side but that's going to be Inaccurate and is therefore Unacceptable. instead, it turns out we can actually ask the kernel when the last packet we got was actually received if you consult [socket(7)](https://linux.die.net/man/7/socket) for the right `ioctl` to use (there's also a way to do this in `recvmsg` control data but that's Hard so even though it lets us do one syscall per packet instead of 2 we're not doing it right now)
first we need more boilerplate
```python
class timeval(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long),
("tv_usec", ctypes.c_long)]
SIOCGSTAMP = 0x8906
```
now we amend the `Protocol` from before
```python
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
print("got packet", tv.tv_sec, tv.tv_usec, data)
```
shrimple as that
so now we're finally able to save the packet contents to the pcap file in the format above
```python
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
trunc = data[0:PCAP_SNAPLEN]
outfile.write(struct.pack("<IIII", tv.tv_sec, tv.tv_usec,
len(trunc), len(data)))
outfile.write(trunc)
outfile.flush()
```
## that's it lmao
overall, not actually super hard right?
i added some logging and an actual ctrl-c handler in the full example code that you can see here: <https://git.lain.faith/haskal/writeups/src/branch/main/2023/misc/pcap.py>
```python
raise [x for x in ().__class__.__base__.__subclasses__() if x.__name__ == 'Codec'][0].decode.__globals__["__builtins__"]["SystemExit"](0)
```

106
2023/misc/pcap.py Normal file
View File

@ -0,0 +1,106 @@
import asyncio
import ctypes
import fcntl
import logging
import signal
import socket
import struct
logger = logging.getLogger(__name__)
ETH_P_IP = 0x0800
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
SIOCGSTAMP = 0x8906
class ifreq(ctypes.Structure):
_fields_ = [("ifr_ifrn", ctypes.c_char * 16),
("ifr_flags", ctypes.c_short)]
class timeval(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long), ("tv_usec", ctypes.c_long)]
PCAP_MAGIC_MICRO = 0xA1B2C3D4
PCAP_MAJ = 2
PCAP_MIN = 4
PCAP_SNAPLEN = 2048
LINKTYPE_ETHERNET = 1
async def do_pcap(devname="wlp2s0", pcapname="capture.pcap"):
loop = asyncio.get_event_loop()
if not isinstance(loop, asyncio.selector_events.BaseSelectorEventLoop):
raise Exception("you gotta run it on linux")
# because we're about to do some evil fuckery
logger.info("opening packet capture")
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
sock.setblocking(False)
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, devname)
# NO htons on this call
sock.bind((devname, ETH_P_IP))
logger.info("setting promisc mode")
ifr = ifreq()
ifr.ifr_ifrn = devname.encode()
fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, ifr)
ifr.ifr_flags |= IFF_PROMISC
fcntl.ioctl(sock.fileno(), SIOCSIFFLAGS, ifr)
logger.info("opening output file")
outfile = open(pcapname, "wb")
outfile.write(struct.pack("<IHHIIII", PCAP_MAGIC_MICRO, PCAP_MAJ, PCAP_MIN, 0, 0,
PCAP_SNAPLEN, LINKTYPE_ETHERNET))
outfile.flush()
conn_evt = asyncio.Event()
logger.info("loading socket into asyncio")
class PcapRecvProtocol:
def __init__(self, sock):
self.sock = sock
def connection_made(self, transport):
logger.info("conn made!")
self.transport = transport
def datagram_received(self, data, addr):
tv = timeval()
fcntl.ioctl(self.sock.fileno(), SIOCGSTAMP, tv)
logger.info("recv packet")
trunc = data[0:PCAP_SNAPLEN]
outfile.write(struct.pack("<IIII", tv.tv_sec, tv.tv_usec, len(trunc), len(data)))
outfile.write(trunc)
outfile.flush()
def connection_lost(self, exc):
conn_evt.set()
protocol = PcapRecvProtocol(sock)
waiter = loop.create_future()
transport = loop._make_datagram_transport(sock, protocol, waiter=waiter)
await waiter
run_capture = asyncio.Event()
def shutdown_capture():
logger.info("ctrl-c")
run_capture.set()
logger.info("running capture loop")
loop.add_signal_handler(signal.SIGINT, shutdown_capture)
await run_capture.wait()
logger.info("closing")
transport.close()
await conn_evt.wait()
outfile.close()
logger.info("done")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(do_pcap())