add ipython boilerplate
This commit is contained in:
parent
ccd20dbc5f
commit
542eef5d12
|
@ -0,0 +1,62 @@
|
|||
ipy = __import__("IPython")
|
||||
@ipy.core.magic.register_line_magic
|
||||
@ipy.core.magic.needs_local_scope
|
||||
def dask(line, local_ns):
|
||||
"initializes dask"
|
||||
from distributed.client import default_client
|
||||
from distributed.diagnostics.progressbar import TextProgressBar
|
||||
from distributed import futures_of
|
||||
from tqdm import tqdm
|
||||
|
||||
class TqdmBar(TextProgressBar):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.p = tqdm(desc="scheduling...")
|
||||
self.last = None
|
||||
TextProgressBar.__init__(self, *args, **kwargs)
|
||||
def _draw_bar(self, remaining, all, **kwargs):
|
||||
if not all:
|
||||
return
|
||||
if self.last is None:
|
||||
self.last = 0
|
||||
self.p.set_description("🦈")
|
||||
self.p.reset(total=all)
|
||||
self.p.update((all - remaining) - self.last)
|
||||
self.last = (all - remaining)
|
||||
def _draw_stop(self, **kwargs):
|
||||
self.p.close()
|
||||
|
||||
def tqdmprogress(future):
|
||||
futures = futures_of(future)
|
||||
if not isinstance(futures, (set, list)):
|
||||
futures = [futures]
|
||||
TqdmBar(futures, complete=True)
|
||||
|
||||
local_ns['tqdmprogress'] = tqdmprogress
|
||||
|
||||
try:
|
||||
default = default_client()
|
||||
local_ns['client'] = default
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
from distributed import Client
|
||||
import re
|
||||
from leylines import get_server_node, DEFAULT_PORT, db
|
||||
server_node = get_server_node()
|
||||
workers = [node for node in db.get_nodes()
|
||||
if node.id != server_node.id and node.ssh_key is not None]
|
||||
dest = f"{server_node.ip}:{DEFAULT_PORT}"
|
||||
print("Connecting to APEX at", dest)
|
||||
client = Client(dest)
|
||||
workers_by_ip = {str(node.ip):node for node in workers}
|
||||
workers_status = {str(node.ip):False for node in workers}
|
||||
for addr, info in client.scheduler_info()["workers"].items():
|
||||
workers_status[info["host"]] = True
|
||||
for ip, node in sorted(workers_by_ip.items(), key=lambda x:x[1].name):
|
||||
if workers_status[ip]:
|
||||
print(f"{node.name} ({node.ip}): up")
|
||||
else:
|
||||
print(f"{node.name} ({node.ip}): down")
|
||||
local_ns['client'] = client
|
||||
|
||||
del dask, ipy
|
Loading…
Reference in New Issue