document and add CLI for resources

This commit is contained in:
xenia 2021-06-18 04:11:01 -04:00
parent eda9fa47b0
commit aec8ce34a7
4 changed files with 39 additions and 2 deletions

View File

@ -137,3 +137,14 @@ for ad-hoc code testing on a real worker
%dask also installs `client`, a reference to the client, and `tqdmprogress` from `leylines.dask`,
and `upload` which uploads a file and returns a delayed function which will fetch the filename on a
worker
### resources
there is an abstract idea of nodes having resources which can be controlled by `leylines
add-resource` and `leylines del-resource` (and `leylines status` shows you the resources). currently
this assigns those with quantity 1 when starting the workers. due to a limitation of dask every
worker process inherits the same quantity of resources. you can assign resources in a more ad-hoc
way by opening an ipython session to a worker and then calling `await
distribted.get_worker().set_resources(someresource=1)`, which will _temporarily_ assign that to
the worker. if you modify resources through leylines you will need to run the ansible playbook again
to apply the changes. you can use `--start-at-task "install systemd task"` to save some time

View File

@ -89,7 +89,7 @@ class NodeImpl(LeylinesApi.Node.Server):
)
def getResources(self, _context: _Ctx, **kwargs) -> List[str]:
return db.get_node_resources(self._node.id)
return list(db.get_node_resources(self._node.id))
def addResource(self, resource: str, _context: _Ctx, **kwargs) -> None:
db.add_node_resource(self._node.id, resource)

View File

@ -31,8 +31,9 @@ async def client_main(args: argparse.Namespace) -> None:
for node in nodes:
info = (await node.getInfo().a_wait());
public_ip = info.publicIp.some if info.publicIp.which() == "some" else None
resources = (await node.getResources().a_wait()).resources
print(f"NODE {info.id}:", info.name, public_ip, info.ip, info.pubkey,
"<Have SSH Key>" if info.sshkey.which() == "some" else "")
"<Have SSH Key>" if info.sshkey.which() == "some" else "", resources)
if info.id == SERVER_NODE_ID:
server_node = node
@ -55,6 +56,14 @@ async def client_main(args: argparse.Namespace) -> None:
print((await node.some.getConfig().a_wait()).config)
elif args.cmd == "sync":
await api.sync().a_wait()
elif args.cmd == "add-resource" or args.cmd == "del-resource":
node = await api.getNode(args.id).a_wait()
if node.which() == "none":
print("no such node!")
elif args.cmd == "add-resource":
await node.some.addResource(args.resource).a_wait()
else:
await node.some.delResource(args.resource).a_wait()
parser = argparse.ArgumentParser(description="wireguard management system for dragons")
@ -79,6 +88,14 @@ cmd_add.add_argument("-i", "--ip", type=str, help="Public IP of the node, if any
cmd_add.add_argument("-k", "--ssh-key", type=argparse.FileType("r"),
help="SSH private keyfile for the node, if any", required=False)
cmd_add_resource = cmd.add_parser("add-resource")
cmd_add_resource.add_argument("id", type=int, help="Node ID")
cmd_add_resource.add_argument("resource", type=str, help="Resource name")
cmd_del_resource = cmd.add_parser("del-resource")
cmd_del_resource.add_argument("id", type=int, help="Node ID")
cmd_del_resource.add_argument("resource", type=str, help="Resource name")
cmd_get_conf = cmd.add_parser("get-conf")
cmd_get_conf.add_argument("id", type=int, help="Node ID")

View File

@ -47,6 +47,15 @@ async def init_dask_async() -> Client:
return await Client(dest, asynchronous=True)
def get_settings(settings: List[str]) -> List[str]:
return asyncio.run(get_settings_async(settings))
async def get_settings_async(settings: List[str]) -> List[str]:
async with ClientSession() as api:
return [(await api.getSetting(setting).a_wait()).value for setting in settings]
class DaskTqdmBar(TextProgressBar):
def __init__(self, *args, **kwargs) -> None:
self.p = tqdm(desc="scheduling...")