leylines/leylines-support/02-dask.ipy

99 lines
3.3 KiB
Python

# vim: ft=python
ipy = __import__("IPython")
@ipy.core.magic.register_line_magic
@ipy.core.magic.needs_local_scope
def dask(line, local_ns):
"initializes dask"
from leylines.client import ClientSession, SERVER_NODE_ID, DEFAULT_PORT
from leylines.dask import init_dask, tqdmprogress
local_ns['tqdmprogress'] = tqdmprogress
import asyncio
async def get_nodes_info():
async with ClientSession() as api:
nodes = (await api.getNodes().a_wait()).nodes
server = None
workers = []
for node in nodes:
info = await node.getInfo().a_wait()
out_info = (info.ip, info.name)
if info.id == SERVER_NODE_ID:
server = out_info
elif info.sshkey.which() != "none":
workers.append(out_info)
return server, workers
server, workers = asyncio.run(get_nodes_info())
dest = f"{server[0]}:{DEFAULT_PORT}"
client = init_dask(dest)
local_ns['client'] = client
def upload(client, file):
import dask
import distributed
import os
name = os.path.basename(file)
client.upload_file(file)
def get_file():
return os.path.join(distributed.get_worker().local_directory, name)
return dask.delayed(get_file)()
local_ns['upload'] = lambda file: upload(client, file)
print("connected to APEX at", dest)
workers_by_ip = {str(node[0]):node for node in workers}
workers_status = {str(node[0]):False for node in workers}
for addr, info in client.scheduler_info()["workers"].items():
workers_status[info["host"]] = True
for ip, node in sorted(workers_by_ip.items(), key=lambda x:x[1][1]):
if workers_status[ip]:
print(f"{node[1]} ({node[0]}): up")
else:
print(f"{node[1]} ({node[0]}): down")
@ipy.core.magic.register_line_magic
@ipy.core.magic.needs_local_scope
def daskworker(line, local_ns):
"picks a worker to launch ipython on"
from distributed.client import default_client
import subprocess
import json
import tempfile
import time
import os
import shutil
splitter = None
if os.environ.get("TMUX", "") != "":
splitter = ["tmux", "split"]
elif shutil.which("gnome-terminal") is not None:
splitter = ["/usr/bin/env", "gnome-terminal", "--"]
else:
raise Exception("don't know how to split terminal!")
client = default_client()
workers = client.scheduler_info()["workers"].items()
sorted_workers = sorted(workers,
key=lambda w: w[1]["metrics"]["memory"] - w[1]["memory_limit"]
+ (100000000000 * w[1]["metrics"]["executing"]))
worker = sorted_workers[0][0]
print("starting ipython kernel on", sorted_workers[0][1]["id"])
info = list(client.start_ipython_workers([worker]).values())[0]
if isinstance(info["key"], bytes):
info["key"] = info["key"].decode()
with tempfile.NamedTemporaryFile(mode="w", prefix="apex-") as f:
json.dump(info, f)
f.flush()
subprocess.check_call(splitter + ["/usr/bin/env", "jupyter", "console", "--existing",
f.name])
time.sleep(1)
del dask, daskworker, ipy