From 542eef5d12b8b3ae4d57458cd3c98f958d8b0b31 Mon Sep 17 00:00:00 2001 From: haskal Date: Wed, 16 Jun 2021 06:00:37 -0400 Subject: [PATCH] add ipython boilerplate --- leylines-support/02-dask.ipy | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 leylines-support/02-dask.ipy diff --git a/leylines-support/02-dask.ipy b/leylines-support/02-dask.ipy new file mode 100644 index 0000000..089cf56 --- /dev/null +++ b/leylines-support/02-dask.ipy @@ -0,0 +1,62 @@ +ipy = __import__("IPython") +@ipy.core.magic.register_line_magic +@ipy.core.magic.needs_local_scope +def dask(line, local_ns): + "initializes dask" + from distributed.client import default_client + from distributed.diagnostics.progressbar import TextProgressBar + from distributed import futures_of + from tqdm import tqdm + + class TqdmBar(TextProgressBar): + def __init__(self, *args, **kwargs): + self.p = tqdm(desc="scheduling...") + self.last = None + TextProgressBar.__init__(self, *args, **kwargs) + def _draw_bar(self, remaining, all, **kwargs): + if not all: + return + if self.last is None: + self.last = 0 + self.p.set_description("🦈") + self.p.reset(total=all) + self.p.update((all - remaining) - self.last) + self.last = (all - remaining) + def _draw_stop(self, **kwargs): + self.p.close() + + def tqdmprogress(future): + futures = futures_of(future) + if not isinstance(futures, (set, list)): + futures = [futures] + TqdmBar(futures, complete=True) + + local_ns['tqdmprogress'] = tqdmprogress + + try: + default = default_client() + local_ns['client'] = default + return + except ValueError: + pass + from distributed import Client + import re + from leylines import get_server_node, DEFAULT_PORT, db + server_node = get_server_node() + workers = [node for node in db.get_nodes() + if node.id != server_node.id and node.ssh_key is not None] + dest = f"{server_node.ip}:{DEFAULT_PORT}" + print("Connecting to APEX at", dest) + client = Client(dest) + workers_by_ip = {str(node.ip):node for node in workers} + workers_status = {str(node.ip):False for node in workers} + for addr, info in client.scheduler_info()["workers"].items(): + workers_status[info["host"]] = True + for ip, node in sorted(workers_by_ip.items(), key=lambda x:x[1].name): + if workers_status[ip]: + print(f"{node.name} ({node.ip}): up") + else: + print(f"{node.name} ({node.ip}): down") + local_ns['client'] = client + +del dask, ipy