diff --git a/README.md b/README.md index 60ac970..84c0348 100644 --- a/README.md +++ b/README.md @@ -137,3 +137,14 @@ for ad-hoc code testing on a real worker %dask also installs `client`, a reference to the client, and `tqdmprogress` from `leylines.dask`, and `upload` which uploads a file and returns a delayed function which will fetch the filename on a worker + +### resources + +there is an abstract idea of nodes having resources which can be controlled by `leylines +add-resource` and `leylines del-resource` (and `leylines status` shows you the resources). currently +this assigns those with quantity 1 when starting the workers. due to a limitation of dask every +worker process inherits the same quantity of resources. you can assign resources in a more ad-hoc +way by opening an ipython session to a worker and then calling `await +distribted.get_worker().set_resources(someresource=1)`, which will _temporarily_ assign that to +the worker. if you modify resources through leylines you will need to run the ansible playbook again +to apply the changes. you can use `--start-at-task "install systemd task"` to save some time diff --git a/leylines/leylines/__init__.py b/leylines/leylines/__init__.py index db4aac9..adc53b2 100644 --- a/leylines/leylines/__init__.py +++ b/leylines/leylines/__init__.py @@ -89,7 +89,7 @@ class NodeImpl(LeylinesApi.Node.Server): ) def getResources(self, _context: _Ctx, **kwargs) -> List[str]: - return db.get_node_resources(self._node.id) + return list(db.get_node_resources(self._node.id)) def addResource(self, resource: str, _context: _Ctx, **kwargs) -> None: db.add_node_resource(self._node.id, resource) diff --git a/leylines/leylines/__main__.py b/leylines/leylines/__main__.py index d2408ce..dd2c85f 100644 --- a/leylines/leylines/__main__.py +++ b/leylines/leylines/__main__.py @@ -31,8 +31,9 @@ async def client_main(args: argparse.Namespace) -> None: for node in nodes: info = (await node.getInfo().a_wait()); public_ip = info.publicIp.some if info.publicIp.which() == "some" else None + resources = (await node.getResources().a_wait()).resources print(f"NODE {info.id}:", info.name, public_ip, info.ip, info.pubkey, - "" if info.sshkey.which() == "some" else "") + "" if info.sshkey.which() == "some" else "", resources) if info.id == SERVER_NODE_ID: server_node = node @@ -55,6 +56,14 @@ async def client_main(args: argparse.Namespace) -> None: print((await node.some.getConfig().a_wait()).config) elif args.cmd == "sync": await api.sync().a_wait() + elif args.cmd == "add-resource" or args.cmd == "del-resource": + node = await api.getNode(args.id).a_wait() + if node.which() == "none": + print("no such node!") + elif args.cmd == "add-resource": + await node.some.addResource(args.resource).a_wait() + else: + await node.some.delResource(args.resource).a_wait() parser = argparse.ArgumentParser(description="wireguard management system for dragons") @@ -79,6 +88,14 @@ cmd_add.add_argument("-i", "--ip", type=str, help="Public IP of the node, if any cmd_add.add_argument("-k", "--ssh-key", type=argparse.FileType("r"), help="SSH private keyfile for the node, if any", required=False) +cmd_add_resource = cmd.add_parser("add-resource") +cmd_add_resource.add_argument("id", type=int, help="Node ID") +cmd_add_resource.add_argument("resource", type=str, help="Resource name") + +cmd_del_resource = cmd.add_parser("del-resource") +cmd_del_resource.add_argument("id", type=int, help="Node ID") +cmd_del_resource.add_argument("resource", type=str, help="Resource name") + cmd_get_conf = cmd.add_parser("get-conf") cmd_get_conf.add_argument("id", type=int, help="Node ID") diff --git a/leylines/leylines/dask.py b/leylines/leylines/dask.py index 673f679..d0ae179 100644 --- a/leylines/leylines/dask.py +++ b/leylines/leylines/dask.py @@ -47,6 +47,15 @@ async def init_dask_async() -> Client: return await Client(dest, asynchronous=True) +def get_settings(settings: List[str]) -> List[str]: + return asyncio.run(get_settings_async(settings)) + + +async def get_settings_async(settings: List[str]) -> List[str]: + async with ClientSession() as api: + return [(await api.getSetting(setting).a_wait()).value for setting in settings] + + class DaskTqdmBar(TextProgressBar): def __init__(self, *args, **kwargs) -> None: self.p = tqdm(desc="scheduling...")