piratebox-web/piratebox/__init__.py

263 lines
8.8 KiB
Python

#!/usr/bin/env python3
import asyncio
import logging
import os
import pkgutil
import re
import secrets
import stat
import time
from aiohttp import web
import aiofiles
import aiofiles.os as aios
aios.statvfs = aios.wrap(os.statvfs)
aios.listdir = aios.wrap(os.listdir)
STORAGE_PATH = os.environ.get("PIRATEBOX_STORAGE", "/storage/piratebox")
WAP_INTERFACE = os.environ.get("PIRATEBOX_IF", "piratebox0")
SERVER_SOCK = os.environ.get("PIRATEBOX_SOCK", "/run/piratebox/piratebox.sock")
CHUNK_SIZE = 256 * 1024
routes = web.RouteTableDef()
logger = logging.getLogger(__name__)
class DiskService:
def __init__(self, path):
self._path = path
self._last_run = asyncio.get_event_loop().time()
self._usage = None
async def get_usage(self):
if self._usage is not None and asyncio.get_event_loop().time() - self._last_run < 60:
return self._usage
else:
st = await aios.statvfs(self._path)
self._usage = int(100*(st.f_blocks - st.f_bfree)/st.f_blocks)
self._last_run = asyncio.get_event_loop().time()
return self._usage
async def unlock(self, password):
proc = await asyncio.create_subprocess_exec(
"e4crypt", "add_key",
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(password.encode() + b"\n")
if proc.returncode != 0:
return stderr.decode()
return "Success"
class ChatService:
def __init__(self):
self._chat = []
self._chat.append(
{"date": "00:00:00", "name": "PirateBox", "color": "def", "data": "Chat and share files anonymously!"})
def post(self, name, data, color, thetime):
self._chat.append({
"date": thetime,
"name": name,
"color": color,
"data": data
})
def get(self):
return self._chat
class StaService:
def __init__(self, ifname):
self.ifname = ifname
self._last_run = asyncio.get_event_loop().time()
self._count = None
async def get_sta_count(self):
if self._count is not None and asyncio.get_event_loop().time() - self._last_run < 60:
return self._count
else:
proc = await asyncio.create_subprocess_exec(
"iw", self.ifname, "station", "dump",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
count = 0
stdout, stderr = await proc.communicate()
for line in stdout.decode().splitlines():
if "Station" in line:
count += 1
self._count = count
self._last_run = asyncio.get_event_loop().time()
return self._count
@routes.get("/piratebox/admin/unlock")
async def unlock_page(request):
res = request.app["index_resources"]
content = res[0]
content += "<form action='/piratebox/admin/unlock' method='POST'><input type='password' name='password' /><button type='submit'>Unlock</button></form>"
content += res[1]
return web.Response(text=content, content_type="text/html")
@routes.post("/piratebox/admin/unlock")
async def unlock_page(request):
diskservice = request.app["DiskService"]
form = await request.post()
password = form.get("password", None)
if password is None:
raise web.HTTPBadRequest()
msg = await diskservice.unlock(password)
res = request.app["index_resources"]
content = res[0]
content += msg
content += res[1]
return web.Response(text=content, content_type="text/html")
@routes.get("/piratebox/diskusage")
async def diskusage(request):
diskservice = request.app["DiskService"]
usage = await diskservice.get_usage()
return web.json_response({"usage": usage})
@routes.get("/piratebox/station-cnt")
async def station_cnt(request):
count = await request.app["StaService"].get_sta_count()
return web.json_response({"count": count})
@routes.get("/piratebox/chat")
async def chat_get(request):
chat = request.app["ChatService"].get()
return web.json_response(chat)
@routes.post("/piratebox/chat")
async def chat_post(request):
form = await request.post()
name = form.get("name", "Anonymous")
data = form.get("data", "<Blank message>")
color = form.get("color", "def")
thetime = time.strftime("%H:%M:%S")
request.app["ChatService"].post(name, data, color, thetime)
return web.json_response({"status": "success"})
NAME_ALLOWED = re.compile(r"[^a-zA-Z0-9-_,. ]+")
async def sanitize_name(name, upload_path):
name = NAME_ALLOWED.sub("", name)
unique_name = name
while True:
try:
await aios.stat(os.path.join(upload_path, unique_name))
unique_name = secrets.token_hex(4) + name
except FileNotFoundError:
break
return unique_name
@routes.post("/piratebox/upload")
async def upload(request):
reader = await request.multipart()
upload_path = request.app["storage_path"]
while True:
part = await reader.next()
if part is None:
break
name = await sanitize_name(part.filename, upload_path)
async with aiofiles.open(os.path.join(upload_path, name), "wb") as file:
while True:
chunk = await part.read_chunk(CHUNK_SIZE)
if chunk is None or len(chunk) == 0:
break
await file.write(chunk)
res = request.app["index_resources"]
content = res[0] + "<h1>Success!</h1><p>Thanks for sharing.<br/><a href='/Shared/'>Browse files</a><br/><a href='/'>Back</a></p>" + res[1]
return web.Response(text=content, content_type="text/html")
def escape(htmlstring):
escapes = {'\"': '&quot;',
'\'': '&#39;',
'<': '&lt;',
'>': '&gt;'}
htmlstring = htmlstring.replace('&', '&amp;')
for seq, esc in escapes.items():
htmlstring = htmlstring.replace(seq, esc)
return htmlstring
def sizeof_fmt(num, suffix='B'):
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(num) < 1024.0:
if unit == '':
return "%3.0f%s%s" % (num, unit, suffix)
else:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
@routes.get("/Shared{path:.*}")
async def browse(request):
upload_path = request.app["storage_path"]
mp = request.match_info["path"]
path = os.path.abspath(upload_path + os.sep + mp)
if not path.startswith(upload_path):
raise web.HTTPBadRequest()
st = await aios.stat(path)
if stat.S_ISDIR(st.st_mode):
if len(mp) > 0 and mp[-1] != "/":
raise web.HTTPFound(location="/Shared" + mp + "/")
flist = await aios.listdir(path)
res = request.app["index_resources"]
content = res[0]
content += f"<h1>Index of {escape(mp)}</h1><br/>"
content += "<ul>"
if path != upload_path:
content += "<a href='..'>..</a>"
for file in flist:
fp = os.path.join(path, file)
st = await aios.stat(fp)
if stat.S_ISDIR(st.st_mode):
size = "-"
link = escape(file) + "/"
name = f"<strong>{link}</strong>"
else:
size = sizeof_fmt(st.st_size)
link = escape(file)
name = link
content += f"<li><a href=\"{link}\">{name}</a><span class='filesize'>{size}</span></li>"
content += "</ul>"
content += res[1]
return web.Response(text=content, content_type="text/html")
else:
return web.FileResponse(path, CHUNK_SIZE, headers={"Content-Disposition": f"attachment; filename=\"{os.path.basename(path)}\""})
def setup_logging():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
fm = logging.Formatter(
"%(asctime)s - %(name)-25s - %(funcName)-10s - %(levelname)-5s"
+ " - %(message)s")
ch.setFormatter(fm)
root.addHandler(ch)
async def load_index_resources(app):
hdr = pkgutil.get_data(__name__, "autoindex_header.html").decode()
ftr = pkgutil.get_data(__name__, "autoindex_footer.html").decode()
app["index_resources"] = (hdr, ftr)
def main():
setup_logging()
logger.info("Starting piratebox")
app = web.Application()
app["storage_path"] = STORAGE_PATH
app["wap_if"] = WAP_INTERFACE
app["DiskService"] = DiskService(STORAGE_PATH)
app["ChatService"] = ChatService()
app["StaService"] = StaService(WAP_INTERFACE)
app.add_routes(routes)
app.on_startup.append(load_index_resources)
os.umask(0o002)
web.run_app(app, path=SERVER_SOCK)
if __name__ == "__main__":
main()