leylines/leylines/leylines/__init__.py

109 lines
3.0 KiB
Python

import binascii
from typing import Optional
from pyroute2 import IPRoute, WireGuard
import monocypher
from .database import Database, Node, SERVER_NODE_ID
IFNAME = 'leyline-wg'
DEFAULT_PORT = 31337
db = Database("leylines.db")
def net_init() -> None:
with open("/proc/sys/net/ipv4/ip_forward", "w") as f:
f.write("1")
def seckey_to_pubkey(seckey: str) -> str:
seckey_bytes = binascii.a2b_base64(seckey)
pubkey_bytes = monocypher.crypto_key_exchange_public_key(seckey_bytes)
return binascii.b2a_base64(pubkey_bytes).decode().strip()
def get_server_node() -> Node:
server_node = db.get_server_node()
if server_node is None:
raise Exception("no server node defined")
return server_node
def generate_node_config(id: int) -> Optional[str]:
node = db.get_node(id)
if node is None:
return None
server_node = get_server_node()
interface = db.get_subnet()
return (
"[Interface]\n" +
f"PrivateKey={node.seckey}\n"
f"Address={node.ip}/{interface.network.prefixlen}\n"
"[Peer]\n"
f"PublicKey={seckey_to_pubkey(server_node.seckey)}\n"
f"AllowedIPs={interface.network}\n"
f"Endpoint={server_node.public_ip}:{DEFAULT_PORT}\n"
)
def add_node_to_wg(wg: WireGuard, node: Node) -> None:
if node.id == SERVER_NODE_ID:
return
peer = {
"public_key": seckey_to_pubkey(node.seckey),
"persistent_keepalive": 60,
"allowed_ips": [f"{node.ip}/32"]
}
wg.set(IFNAME, peer=peer)
def create_or_get_interface() -> int:
with IPRoute() as ipr:
lookup = ipr.link_lookup(ifname=IFNAME)
if len(lookup) > 0:
return lookup[0]
server_node = get_server_node()
interface = db.get_subnet()
ipr.link("add", ifname=IFNAME, kind="wireguard")
dev = ipr.link_lookup(ifname=IFNAME)[0]
ipr.link("set", index=dev, state="down")
ipr.addr("add", index=dev, address=str(server_node.ip), mask=interface.network.prefixlen,
broadcast=str(interface.network.broadcast_address))
ipr.link("set", index=dev, state="up")
with WireGuard() as wg:
wg.set(IFNAME, private_key=server_node.seckey, listen_port=DEFAULT_PORT)
return dev
def delete_interface() -> None:
with IPRoute() as ipr:
ipr.link("del", ifname=IFNAME)
def sync_interface() -> None:
create_or_get_interface()
with WireGuard() as wg:
info = wg.info(IFNAME)[0].get_attr("WGDEVICE_A_PEERS")
if info is None:
info = []
node_keys = {
seckey_to_pubkey(node.seckey): node
for node in db.get_nodes() if node.id != SERVER_NODE_ID
}
for peer in info:
pubkey = peer.get_attr("WGPEER_A_PUBLIC_KEY")
if pubkey in node_keys:
del node_keys[pubkey]
else:
wg.set(IFNAME, peer={"public_key": pubkey, "remove": True})
for node in node_keys.values():
add_node_to_wg(wg, node)