#!/usr/bin/env python3 import asyncio import logging import os import pkgutil import re import secrets import stat import time from aiohttp import web import aiofiles import aiofiles.os as aios aios.statvfs = aios.wrap(os.statvfs) aios.listdir = aios.wrap(os.listdir) STORAGE_PATH = os.environ.get("PIRATEBOX_STORAGE", "/storage") WAP_INTERFACE = os.environ.get("PIRATEBOX_IF", "wlan0") SERVER_SOCK = os.environ.get("PIRATEBOX_SOCK", "/run/piratebox/piratebox.sock") CHUNK_SIZE = 256 * 1024 routes = web.RouteTableDef() logger = logging.getLogger(__name__) class DiskService: def __init__(self, path): self._path = path self._last_run = asyncio.get_event_loop().time() self._usage = None async def get_usage(self): if self._usage is not None and asyncio.get_event_loop().time() - self._last_run < 60: return self._usage else: st = await aios.statvfs(self._path) self._usage = int(100*(st.f_blocks - st.f_bfree)/st.f_blocks) self._last_run = asyncio.get_event_loop().time() return self._usage async def unlock(self, password): proc = await asyncio.create_subprocess_exec( "e4crypt", "add_key", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate(password.encode() + b"\n") if proc.returncode != 0: return stderr.decode() return "Success" class ChatService: def __init__(self): self._chat = [] self._chat.append( {"date": "00:00:00", "name": "PirateBox", "color": "def", "data": "Chat and share files anonymously!"}) def post(self, name, data, color, thetime): self._chat.append({ "date": thetime, "name": name, "color": color, "data": data }) def get(self): return self._chat class StaService: def __init__(self, ifname): self.ifname = ifname self._last_run = asyncio.get_event_loop().time() self._count = None async def get_sta_count(self): if self._count is not None and asyncio.get_event_loop().time() - self._last_run < 60: return self._count else: proc = await asyncio.create_subprocess_exec( "iw", self.ifname, "station", "dump", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) count = 0 stdout, stderr = await proc.communicate() for line in stdout.decode().splitlines(): if "Station" in line: count += 1 self._count = count self._last_run = asyncio.get_event_loop().time() return self._count @routes.get("/piratebox/admin/unlock") async def unlock_page(request): res = request.app["index_resources"] content = res[0] content += "
" content += res[1] return web.Response(text=content, content_type="text/html") @routes.post("/piratebox/admin/unlock") async def unlock_page(request): diskservice = request.app["DiskService"] form = await request.post() password = form.get("password", None) if password is None: raise web.HTTPBadRequest() msg = await diskservice.unlock(password) res = request.app["index_resources"] content = res[0] content += msg content += res[1] return web.Response(text=content, content_type="text/html") @routes.get("/piratebox/diskusage") async def diskusage(request): diskservice = request.app["DiskService"] usage = await diskservice.get_usage() return web.json_response({"usage": usage}) @routes.get("/piratebox/station-cnt") async def station_cnt(request): count = await request.app["StaService"].get_sta_count() return web.json_response({"count": count}) @routes.get("/piratebox/chat") async def chat_get(request): chat = request.app["ChatService"].get() return web.json_response(chat) @routes.post("/piratebox/chat") async def chat_post(request): form = await request.post() name = form.get("name", "Anonymous") data = form.get("data", "") color = form.get("color", "def") thetime = time.strftime("%H:%M:%S") request.app["ChatService"].post(name, data, color, thetime) return web.json_response({"status": "success"}) NAME_ALLOWED = re.compile(r"[^a-zA-Z0-9-_,. ]+") async def sanitize_name(name, upload_path): name = NAME_ALLOWED.sub("", name) unique_name = name while True: try: await aios.stat(os.path.join(upload_path, unique_name)) unique_name = secrets.token_hex(4) + name except FileNotFoundError: break return unique_name @routes.post("/piratebox/upload") async def upload(request): reader = await request.multipart() upload_path = request.app["storage_path"] while True: part = await reader.next() if part is None: break name = await sanitize_name(part.filename, upload_path) async with aiofiles.open(os.path.join(upload_path, name), "wb") as file: while True: chunk = await part.read_chunk(CHUNK_SIZE) if chunk is None or len(chunk) == 0: break await file.write(chunk) res = request.app["index_resources"] content = res[0] + "

Success!

Thanks for sharing.
Browse files
Back

" + res[1] return web.Response(text=content, content_type="text/html") def escape(htmlstring): escapes = {'\"': '"', '\'': ''', '<': '<', '>': '>'} htmlstring = htmlstring.replace('&', '&') for seq, esc in escapes.items(): htmlstring = htmlstring.replace(seq, esc) return htmlstring def sizeof_fmt(num, suffix='B'): for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: if abs(num) < 1024.0: if unit == '': return "%3.0f%s%s" % (num, unit, suffix) else: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix) @routes.get("/Shared{path:.*}") async def browse(request): upload_path = request.app["storage_path"] mp = request.match_info["path"] path = os.path.abspath(upload_path + os.sep + mp) if not path.startswith(upload_path): raise web.HTTPBadRequest() st = await aios.stat(path) if stat.S_ISDIR(st.st_mode): if len(mp) > 0 and mp[-1] != "/": raise web.HTTPFound(location="/Shared" + mp + "/") flist = await aios.listdir(path) res = request.app["index_resources"] content = res[0] content += f"

Index of {escape(mp)}


" content += "" content += res[1] return web.Response(text=content, content_type="text/html") else: return web.FileResponse(path, CHUNK_SIZE, headers={"Content-Disposition": f"attachment; filename=\"{os.path.basename(path)}\""}) def setup_logging(): root = logging.getLogger() root.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) fm = logging.Formatter( "%(asctime)s - %(name)-25s - %(funcName)-10s - %(levelname)-5s" + " - %(message)s") ch.setFormatter(fm) root.addHandler(ch) async def load_index_resources(app): hdr = pkgutil.get_data(__name__, "autoindex_header.html").decode() ftr = pkgutil.get_data(__name__, "autoindex_footer.html").decode() app["index_resources"] = (hdr, ftr) def main(): setup_logging() logger.info("Starting piratebox") app = web.Application() app["storage_path"] = STORAGE_PATH app["wap_if"] = WAP_INTERFACE app["DiskService"] = DiskService(STORAGE_PATH) app["ChatService"] = ChatService() app["StaService"] = StaService(WAP_INTERFACE) app.add_routes(routes) app.on_startup.append(load_index_resources) os.umask(0o002) web.run_app(app, path=SERVER_SOCK) if __name__ == "__main__": main()