# vim: ft=python ipy = __import__("IPython") @ipy.core.magic.register_line_magic @ipy.core.magic.needs_local_scope def dask(line, local_ns): "initializes dask" from leylines.client import ClientSession, SERVER_NODE_ID from leylines.dask import init_dask, tqdmprogress local_ns['tqdmprogress'] = tqdmprogress client = init_dask() local_ns['client'] = client def upload(client, file): import dask import distributed import os name = os.path.basename(file) client.upload_file(file) def get_file(): return os.path.join(distributed.get_worker().local_directory, name) return dask.delayed(get_file)() local_ns['upload'] = lambda file: upload(client, file) import asyncio async def get_nodes_info(): async with ClientSession() as api: nodes = (await api.getNodes().a_wait()).nodes server = None workers = [] for node in nodes: info = (await node.getInfo().a_wait()).info out_info = (info.ip, info.name) if info.id == SERVER_NODE_ID: server = out_info elif info.sshkey.which() != "none": workers.append(out_info) return server, workers server, workers = asyncio.run(get_nodes_info()) dest = f"{server[0]}:{DEFAULT_PORT}" print("connected to APEX at", dest) workers_by_ip = {str(node[0]):node for node in workers} workers_status = {str(node[0]):False for node in workers} for addr, info in client.scheduler_info()["workers"].items(): workers_status[info["host"]] = True for ip, node in sorted(workers_by_ip.items(), key=lambda x:x[1].name): if workers_status[ip]: print(f"{node[1]} ({node[0]}): up") else: print(f"{node[1]} ({node[0]}): down") @ipy.core.magic.register_line_magic @ipy.core.magic.needs_local_scope def daskworker(line, local_ns): "picks a worker to launch ipython on" from distributed.client import default_client import subprocess import json import tempfile import time import os import shutil splitter = None if os.environ.get("TMUX", "") != "": splitter = ["tmux", "split"] elif shutil.which("gnome-terminal") is not None: splitter = ["/usr/bin/env", "gnome-terminal", "--"] else: raise Exception("don't know how to split terminal!") client = default_client() workers = client.scheduler_info()["workers"].items() sorted_workers = sorted(workers, key=lambda w: w[1]["metrics"]["memory"] - w[1]["memory_limit"] + (100000000000 * w[1]["metrics"]["executing"])) worker = sorted_workers[0][0] print("starting ipython kernel on", sorted_workers[0][1]["id"]) info = list(client.start_ipython_workers([worker]).values())[0] if isinstance(info["key"], bytes): info["key"] = info["key"].decode() with tempfile.NamedTemporaryFile(mode="w", prefix="apex-") as f: json.dump(info, f) f.flush() subprocess.check_call(splitter + ["/usr/bin/env", "jupyter", "console", "--existing", f.name]) time.sleep(1) del dask, daskworker, ipy