From 71a3a6d2ef9d66e5a6f48695aff302549d6dd20e Mon Sep 17 00:00:00 2001 From: haskal Date: Sun, 22 Nov 2020 04:34:30 -0500 Subject: [PATCH] migrate rpc system to signature/unit-based --- crossfire/comms.rkt | 61 +++--- crossfire/manifest.rkt | 8 +- crossfire/protocol.rkt | 460 ++--------------------------------------- crossfire/server.rkt | 460 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 513 insertions(+), 476 deletions(-) create mode 100644 crossfire/server.rkt diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index ee53527..fe25101 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -17,8 +17,8 @@ ;; along with this program. If not, see . (require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list - racket/match racket/tcp syntax/parse/define - (for-syntax racket/base racket/syntax) + racket/match racket/tcp racket/unit syntax/parse/define + (for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime) "not-crypto.rkt") ;; define message types (they must all be prefab for fasl) @@ -503,46 +503,43 @@ ;; utility functions and macros for defining rpcs -;; id generation helpers -(define-for-syntax (rpc-type-id type) - (format-id type "rpc-type-~a" (syntax-e type))) -(define-for-syntax (rpc-impl-id type name) - (format-id type "rpc-impl-~a-~a" (syntax-e type) (syntax-e name))) - ;; parameters for comms, tm, and targeted node (define current-comms (make-parameter #f)) (define current-tm (make-parameter #f)) (define current-to-node (make-parameter #f)) (define current-from-node (make-parameter #f)) -;; defines a class of rpcs -(define-simple-macro (define-rpc-type type:id) - #:with def-id (rpc-type-id #'type) - (define def-id (make-hash))) -;; defines an rpc implementation, registers it with a given class of rpcs and makes a wrapper to -;; call it -(define-simple-macro (define-rpc type:id (name:id args:id ...) body:expr ...) - #:with def-id (rpc-type-id #'type) - #:with impl-id (rpc-impl-id #'type #'name) - (begin - (define (impl-id args ...) body ...) - (define (name args ...) - (tm-transact (current-tm) (node-id (current-to-node)) (quote name) (list args ...))) - (hash-set! def-id (quote name) impl-id))) +;; this is entirely a proc macro because idk how to do this in any fancier way +;; big weh +;; also macro expanding the results of macros is pretty wack because if racket/unit is not required +;; in this file, racket assumes "unit" refers to a runtime procedure instead of a macro and proceeds +;; to expand the "arguments" to it, so you end up with weird errors like "define not permitted in +;; expression context" and "illegal usage of ^" which only make sense once you realize that's +;; what's going on +;; u__u +(define-for-syntax (rpc-wrapper-unit-helper sig-name) + (define-values [parent members vars stxs] (signature-members sig-name sig-name)) + (define unit-out + #`(unit + (import) + (export #,sig-name) + #,@(for/list ([mem (in-list members)]) + #`(define (#,mem . args) + (tm-transact (current-tm) (node-id (current-to-node)) (quote #,mem) args))))) + unit-out) -(define-simple-macro (rpc-impl type:id name:id) - #:with impl-id (rpc-impl-id #'type #'name) - impl-id) - -;; installs all rpcs of a given rpc class into the transaction manager -(define-simple-macro (install-rpc-type type:id) - #:with def-id (rpc-type-id #'type) - (for ([(k v) (in-hash def-id)]) - (tm-register-rpc (current-tm) k v))) +;; this creates a wrapper unit for the given signature that delegates to the transaction manager +;; (current-tm) using (current-to-node) +;; (define-signature mastodon^ [make-post florp eat-hot-chip lie]) +;; (define wrapper@ (make-rpc-wrapper-unit mastodon^)) +;; (define-values/invoke-unit wrapper@ (import) (export mastodon^)) +(define-syntax make-rpc-wrapper-unit + (lambda (stx) + (rpc-wrapper-unit-helper (second (syntax-e stx))))) (provide current-comms current-tm current-to-node current-from-node - define-rpc-type define-rpc rpc-impl install-rpc-type) + make-rpc-wrapper-unit) ; ;; demo code ; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") diff --git a/crossfire/manifest.rkt b/crossfire/manifest.rkt index aa8ae8a..20569d7 100644 --- a/crossfire/manifest.rkt +++ b/crossfire/manifest.rkt @@ -65,11 +65,11 @@ (cons/c 'iset (listof isub/c)) (cons/c 'pattern (listof isub/c))))) -(struct manifest [data pattern size] #:transparent) +(struct manifest [data pattern psize] #:transparent) (provide (contract-out [struct manifest ((data manifest-def/c) (pattern (vector/c integer-set?)) - (size integer?))])) + (psize integer?))])) ;; this "parses" to the extent that is necessary to avoid unnecessary pattern computations @@ -112,8 +112,8 @@ [_ (error "unrecognized pattern element in manifest" x)]))) (define pattern (apply vector-append patterns)) - (define size (pattern-count pattern)) - (manifest manifest-def pattern size)) + (define psize (pattern-count pattern)) + (manifest manifest-def pattern psize)) ;; get data from the manifest (define/contract (manifest-data-get mf key [fail-thunk #f]) diff --git a/crossfire/protocol.rkt b/crossfire/protocol.rkt index 1c0efff..088bd09 100644 --- a/crossfire/protocol.rkt +++ b/crossfire/protocol.rkt @@ -16,443 +16,23 @@ ;; You should have received a copy of the GNU Affero General Public License ;; along with this program. If not, see . -(require db/base db/sqlite3 - data/queue racket/bool racket/fasl racket/file racket/match - racket/path racket/random racket/runtime-path racket/set racket/string - north/base north/adapter/base north/adapter/sqlite - "comms.rkt" "not-crypto.rkt" - ;; port-fsync - (submod "static-support.rkt" misc-calls)) - -;; configuration - -(define PRODUCTION? #f) - -(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/")) -(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite")) -(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/")) -(define AGENT-ARCH-PREFIX "arch_") -(define AGENT-BINARY "crossfire-agent") - -;; comms node for server (without secret key) -(define current-server-public-node (make-parameter #f)) - -;; north migrations -(define-runtime-path migrations-dir "migrations/") - -;; database - -(define current-db (make-parameter #f)) - -(define (open-server-db [mode 'read/write]) - (let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)]) - (query-exec db "pragma foreign_keys=1;") - db)) - -;; this allows the server to be capable of migrating itself -(define (migrate-server-db [db (current-db)]) - ;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all - ;; programmatically accessible...) - ;; i also use a runtime path to be a bit more robust - (define base (path->migration migrations-dir)) - (define adapter (sqlite-adapter db)) - (adapter-init adapter) - (define current-revision (adapter-current-revision adapter)) - (define target-revision (migration-revision (migration-most-recent base))) - (define plan (migration-plan base current-revision target-revision)) - (for ([migration (in-list plan)]) - (displayln (format "applying migration: ~a" (migration-revision migration))) - (adapter-apply! adapter (migration-revision migration) (migration-up migration))) - (void)) - -(define-syntax-rule (define-stmt name what) - (define name (virtual-statement what))) - -(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)") -(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)") -(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?") -(define-stmt q-get-nodes "select id, name, arch from node where type=?") -(define-stmt q-get-all-resources - "select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid - where node.type = ?") -(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?") -(define-stmt q-edit-node "update node set name=? where id=?") -(define-stmt q-delete-node "delete from node where id=?") -(define-stmt q-get-node-type "select type from node where id=?") -(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?") - -(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)") -(define-stmt q-get-task-id-commit "select id, committed from task") -(define-stmt q-set-task-commit "update task set committed=1 where id=?") -(define-stmt q-delete-task "delete from task where id=?") -(define-stmt q-get-tasks "select id, name, manifest, complete from task") - -(define-stmt q-get-task-log - "select worker, time_start, time_end, pattern from task_log where taskid=?") -(define-stmt q-add-task-log - "insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)") - -;; utils - -(define (query/insert-id db stmt . args) - (define info (simple-result-info (apply query db stmt args))) - (cdr (assoc 'insert-id info))) - -;; cleanly recovers from potential crash situations -(define (server-cleanup-unused-files) - (define existing-ids (mutable-set)) - (call-with-transaction (current-db) (lambda () - (define (cleanup id exists? path) - (displayln (format "removing corrupted/incomplete task ~a" id)) - (when exists? (delete-file path)) - (query-exec (current-db) q-delete-task id)) - (for ([(id committed) (in-query (current-db) q-get-task-id-commit)]) - (set-add! existing-ids (number->string id)) - (define path (build-path SERVER-FILES-PATH (number->string id))) - (define exists? (file-exists? path)) - (define committed? (= 1 committed)) - (cond - ;; potentially crashed while file was fsync'd but the directory was not - [(and committed? (not exists?)) (cleanup id exists? path)] - ;; crashed between row insert and file fsync - [(not committed?) (cleanup id exists? path)] - [else (void)])))) - ;; delete any unaffiliated files - (for ([subpath (in-directory SERVER-FILES-PATH)]) - (define name (path->string (file-name-from-path subpath))) - (unless (set-member? existing-ids name) - (delete-file subpath))) - (void)) - -;; commits a file corresponding to the task -(define (server-commit-file taskid data) - (define path (build-path SERVER-FILES-PATH (number->string taskid))) - (call-with-output-file path - (lambda (out) - (write-bytes data out) - (port-fsync out)) - #:mode 'binary - #:exists 'truncate) - (query-exec (current-db) q-set-task-commit taskid)) - -;; rpc calls - -(define-rpc-type server) - -(struct node-info [id name arch type resources online?] #:prefab) -(struct project-info [id name manifest complete?] #:prefab) - -(define (get-nodes type) - (define type-str (symbol->string type)) - (define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list) - (query (current-db) q-get-all-resources type-str))) - (for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)]) - (define online? (and (current-comms) (comms-channel-available? (current-comms) id))) - (node-info id name arch type (hash-ref resources id) online?))) - -(define (make-node name arch type resources) - (call-with-transaction (current-db) (lambda () - (define secret (crypto-sign-make-key)) - (define public (crypto-sign-public-key secret)) - (define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret)) - (for ([res (in-list resources)]) - (query-exec (current-db) q-add-node-res id res)) - (values id public)))) - -(define (configure-agent-binary agent-node agent-arch server-node) - (define binary - (file->bytes - (build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY))) - - (define (configure.linux-gnu) - (define trailing-data (s-exp->fasl (list agent-node server-node))) - ;; write 32 bit unsigned big endian trailer size (including size) - (define trailing-size - (integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t)) - (bytes-append binary trailing-data trailing-size)) - - (match (string-split agent-arch "-") - [(list _ ... "linux" "gnu") (configure.linux-gnu)] - [_ (error "XXX: don't know how to configure arch" agent-arch)])) - -(define (make-task name manifest tar) - (define manifest-data (s-exp->fasl manifest)) - (define id (query/insert-id (current-db) q-new-task name manifest-data)) - (server-commit-file id tar) - id) - -(define (enforce-subject type) - (unless (symbol=? type (node-type (current-from-node))) - (error "unauthorized"))) - -(define (enforce-object id type) - (match (query-maybe-value (current-db) q-get-node-type id) - [#f (error "node doesn't exist" id)] - [(== (symbol->string type)) (void)] - [x (error "wrong node type" x)])) - -;; client rpcs - -(define-rpc server (get-agents) - (enforce-subject 'client) - (get-nodes 'agent)) - -(define-rpc server (new-agent name arch resources) - (enforce-subject 'client) - (define-values [id public] (make-node name arch 'agent resources)) - (define comms-node (node id name 'agent public #f #f #f)) - (comms-set-node-info (current-comms) comms-node) - id) - -(define-rpc server (edit-agent id name resources) - (enforce-subject 'client) - (call-with-transaction (current-db) (lambda () - (enforce-object id 'agent) - (define existing-resource (list->set (query-list (current-db) q-get-node-resources id))) - (define new-resource (list->set resources)) - (query-exec (current-db) q-edit-node name id) - (for ([res (in-set (set-subtract new-resource existing-resource))]) - (query-exec (current-db) q-add-node-res id res)) - (for ([res (in-set (set-subtract existing-resource new-resource))]) - (query-exec (current-db) q-del-node-res id res)))) - (define comms-node (comms-get-node-info (current-comms) id)) - (comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))) - -(define-rpc server (get-agent-deployment id) - (enforce-subject 'client) - (match (query-maybe-row (current-db) q-get-node-info id) - [(vector name arch "agent" secret) - (configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f) - arch (current-server-public-node))] - [_ (error "invalid id or wrong node type")])) - -(define-rpc server (delete-agent id) - (enforce-subject 'client) - (call-with-transaction (current-db) (lambda () - (enforce-object id 'agent) - (query-exec (current-db) q-delete-node id))) - (comms-delete-node (current-comms) id)) - -;; client rpcs :: projects - -(define-rpc server (new-project name manifest tar) - (enforce-subject 'client) - (define id (make-task name manifest tar)) - (void id) - id) - -(define-rpc server (get-projects) - (enforce-subject 'client) - (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)]) - (project-info id name (fasl->s-exp manifest) complete?))) - - -;; agent handling -;; distributing subtasks - -;; minimum batch of inputs that will be scheduled for a node -;; the batch size scales to how fast the node completes a batch -(define MIN-SUBTASK-SIZE 16) -;; aim to batch every 5 minutes -;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved -(define OPTIMAL-COMPLETION-SECS 300) -;; tasks will be reassigned if not completed within this time -(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS)) - -(define (agent-handler) - ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way - (define cust (make-custodian)) - (define this-thread (current-thread)) - ;; make an auto cleanup thread :P - (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust))) - - ;; tasks - ;; semaphore guarding mutation (sigh) - ;; manifest: task manifest - ;; work-pattern: the integer-set representing the work left to do - ;; agent-todo: hash of agent id to integer-set representing work the agent is working on - (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent) - - (define (initialize-task! id manifest) - (define sema (make-semaphore 1)) - (define agent-todo (make-hash)) - ;; TODO : manifest processing. 100 hardcoded here u___u - (define init-pattern-range (range->integer-set 0 100)) - ;; subtract the pattern ranges that were already logged as complete - (define pattern-range - (for/fold ([pattern-range init-pattern-range]) - ([(_ _ _ pat-fasl) (in-query (current-db) q-get-task-log id)]) - (define sub (make-integer-set (fasl->s-exp pat-fasl))) - (integer-set-subtract pattern-range sub))) - (task-state id sema manifest pattern-range agent-todo)) - - ;; this doesn't update the database - that only gets updated when the work is complete - (define (task-assign! ts agent-id requested-amount) - (unless (positive? requested-amount) (error "requested amount must be positive")) - (call-with-semaphore (task-state-sema ts) (lambda () - (when (hash-has-key? (task-state-agent-todo) agent-id) - (error "agent already has work assigned")) - (define-values [assignment new-wp] - (pattern-range-take (task-state-work-pattern ts) requested-amount)) - (cond - ;; done! (maybe) - ;; check other agents work - [(zero? (integer-set-count assignment)) #f] - ;; update tracking - [else - (hash-set! (task-state-agent-todo agent-id assignment)) - (set-task-state-work-pattern! ts new-wp) - assignment])))) - - ;; returns work from agent back to the regular work pool - (define (task-unassign! ts agent-id) - (call-with-semaphore (task-state-sema ts) (lambda () - (match (hash-ref (task-state-agent-todo ts) agent-id #f) - [#f (void)] - [assignment - (define new-wp (integer-set-union assignment (task-state-work-pattern ts))) - (set-task-state-work-pattern! ts new-wp) - (hash-remove! (task-state-agent-todo ts) agent-id)])))) - - (define (task-complete! ts agent-id time-start time-end) - (call-with-semaphore (task-state-sema ts) (lambda () - (match (hash-ref (task-state-agent-todo ts) agent-id #f) - [#f (void)] - [assignment - ;; add a new work log - (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end - (s-exp->fasl (integer-set-contents assignment))) - ;; remove tracking - this work is now done - (hash-remove! (task-state-agent-todo ts) agent-id)])))) - - (define (agent-thd id resources-in) - ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track - ;; what they're currently working on) - (define make-assignment-id - (let ([counter (random 0 1000)]) - (lambda () (begin0 counter (set! counter (add1 counter)))))) - ;; set of resources - (define resources (list->set resources-in)) - (set! resources-in #f) - - ;; tracks the sizes of task pieces given to the agent - (define task-size (make-hash)) - ;; current tasks, mirrors parent but only contains tasks we have resources for - (define current-tasks (make-hash)) - - ;; assignments - (struct assignment [id taskid assign-data start-time handler] #:transparent) - ;; active assignments, by assignment id - (define assigned-tasks (make-hash)) - - ;; keep running? - (define run-agent-thd? #t) - - (define (send-assignment assignment) - (void "TODO")) - - (define (cancel-assignment assignment) - (void "TODO")) - - (define (handle-thd-msg) - (match (thread-recieve) - [(cons 'new-task ts) - (void "TODO")] - [(cons 'cancel-task ts) (void "TODO")] - ;; got completion report from agent - [(cons 'agent-report (cons assignment-id state)) - (match state - ['complete (error "TODO")] - [success-input (error "TODO")])] - ['shutdown (set! run-agent-thd? #f)])) - - ;; TODO : agent rpc to cancel current task, with error handling - - (let loop () - ;; handle events - (define thd-evt (thread-receive-evt)) - ; (define task-timeout - ; (if task-start-time - ; (- TASK-TIMEOUT (- (current-seconds) task-start-time)) - ; #f)) - (define task-timeout #f) - (match (sync/timeout task-timeout thd-evt) - [#f (void "TODO: task timeout")] - [(== thd-evt) (handle-thd-msg)]) - (when run-agent-thd? - (loop))) - - ;; TODO : cleanup, cancel current task, shut down running tasks - (void)) - - ;; id to task-state - (define current-tasks (make-hash)) - - ;; hash of agents to handler thread - (define agents (make-hash)) - - (define (handle-thd-msg) - (match (thread-receive) - [(cons 'new-agent (cons id resources)) - (parameterize ([current-custodian cust]) - (hash-set! agents id (thread (lambda () (agent-thd id resources)))))] - [(cons 'delete-agent id) - (thread-send (hash-ref agents id) 'shutdown #f) - (hash-remove! agents id)] - [(cons 'new-task (cons id manifest)) - (define ts (initialize-task! id manifest)) - (hash-set! current-tasks id ts) - ;; notify agents - (for ([(id thd) (in-hash agents)]) - (thread-send thd (cons 'new-task ts) #f))] - [(cons 'agent-report (cons agent-id (cons assignment-id state))) - (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)] - [(cons 'cancel-task id) - (hash-remove! current-tasks id) - ;; notify agents - (for ([(id thd) (in-hash agents)]) - (thread-send thd (cons 'cancel-task ts) #f))] - [_ (error "unknown agent handler message")])) - - (let loop () - (define thd-evt (thread-receive-evt)) - (match (sync thd-evt) - [(== thd-evt) (handle-thd-msg)]) - (loop))) - -(define (make-agent-handler) - (thread agent-handler)) - -(define (current-agent-handler (make-parameter #f))) - -;; agent rpcs - -;; report state 'complete or a list of integer representing a success result -(define-rpc server (agent-report-state assignment-id state) - (enforce-subject 'agent) - (define agent-id (node-id (current-from-node))) - (thread-send (current-agent-handler) - (cons 'agent-report (cons agent-id (cons assignment-id state))))) - - -;; command line usage -(module+ main - (require racket/cmdline) - (current-db (open-server-db 'create)) - (migrate-server-db) - - (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) - ((rpc-impl server get-projects))) - - ; (make-task "meow-task" '((meow . 10)) #"this is some extra data") - ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f) - ; "aarch64-unknown-linux-gnu" - ; (node 0 "server" 'server #f #f "meow.systems" 1337))) - ; (with-output-to-file "/tmp/crossfire-agent.configured" - ; (lambda () (write-bytes data))) - ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive")) - ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) - ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive"))) - ; (get-nodes 'agent) - ; (get-nodes 'meow) -) +(require racket/unit + "comms.rkt") + +(provide server^ agent^ client^) + +(define-signature server^ + [get-agents + new-agent + edit-agent + get-agent-deployment + delete-agent + new-project + get-projects + agent-report-state]) + +(define-signature agent^ + [todo]) + +(define-signature client^ + [todo]) diff --git a/crossfire/server.rkt b/crossfire/server.rkt new file mode 100644 index 0000000..8a98f0a --- /dev/null +++ b/crossfire/server.rkt @@ -0,0 +1,460 @@ +#lang racket/base +;; crossfire: distributed brute force infrastructure +;; +;; Copyright (C) 2020 haskal +;; +;; This program is free software: you can redistribute it and/or modify +;; it under the terms of the GNU Affero General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; This program is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU Affero General Public License for more details. +;; +;; You should have received a copy of the GNU Affero General Public License +;; along with this program. If not, see . + +(require db/base db/sqlite3 + data/queue racket/bool racket/fasl racket/file racket/match racket/path racket/random + racket/runtime-path racket/set racket/string racket/unit + north/base north/adapter/base north/adapter/sqlite + "comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt" + ;; port-fsync + (submod "static-support.rkt" misc-calls)) + +;; configuration + +(define PRODUCTION? #f) + +(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/")) +(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite")) +(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/")) +(define AGENT-ARCH-PREFIX "arch_") +(define AGENT-BINARY "crossfire-agent") + +;; comms node for server (without secret key) +(define current-server-public-node (make-parameter #f)) + +;; north migrations +(define-runtime-path migrations-dir "migrations/") + +;; database + +(define current-db (make-parameter #f)) + +(define (open-server-db [mode 'read/write]) + (let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)]) + (query-exec db "pragma foreign_keys=1;") + db)) + +;; this allows the server to be capable of migrating itself +(define (migrate-server-db [db (current-db)]) + ;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all + ;; programmatically accessible...) + ;; i also use a runtime path to be a bit more robust + (define base (path->migration migrations-dir)) + (define adapter (sqlite-adapter db)) + (adapter-init adapter) + (define current-revision (adapter-current-revision adapter)) + (define target-revision (migration-revision (migration-most-recent base))) + (define plan (migration-plan base current-revision target-revision)) + (for ([migration (in-list plan)]) + (displayln (format "applying migration: ~a" (migration-revision migration))) + (adapter-apply! adapter (migration-revision migration) (migration-up migration))) + (void)) + +(define-syntax-rule (define-stmt name what) + (define name (virtual-statement what))) + +(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)") +(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)") +(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?") +(define-stmt q-get-nodes "select id, name, arch from node where type=?") +(define-stmt q-get-all-resources + "select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid + where node.type = ?") +(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?") +(define-stmt q-edit-node "update node set name=? where id=?") +(define-stmt q-delete-node "delete from node where id=?") +(define-stmt q-get-node-type "select type from node where id=?") +(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?") + +(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)") +(define-stmt q-get-task-id-commit "select id, committed from task") +(define-stmt q-set-task-commit "update task set committed=1 where id=?") +(define-stmt q-delete-task "delete from task where id=?") +(define-stmt q-get-tasks "select id, name, manifest, complete from task") + +(define-stmt q-get-task-log + "select worker, time_start, time_end, pattern from task_log where taskid=?") +(define-stmt q-add-task-log + "insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)") + +;; utils + +(define (query/insert-id db stmt . args) + (define info (simple-result-info (apply query db stmt args))) + (cdr (assoc 'insert-id info))) + +;; cleanly recovers from potential crash situations +(define (server-cleanup-unused-files) + (define existing-ids (mutable-set)) + (call-with-transaction (current-db) (lambda () + (define (cleanup id exists? path) + (displayln (format "removing corrupted/incomplete task ~a" id)) + (when exists? (delete-file path)) + (query-exec (current-db) q-delete-task id)) + (for ([(id committed) (in-query (current-db) q-get-task-id-commit)]) + (set-add! existing-ids (number->string id)) + (define path (build-path SERVER-FILES-PATH (number->string id))) + (define exists? (file-exists? path)) + (define committed? (= 1 committed)) + (cond + ;; potentially crashed while file was fsync'd but the directory was not + [(and committed? (not exists?)) (cleanup id exists? path)] + ;; crashed between row insert and file fsync + [(not committed?) (cleanup id exists? path)] + [else (void)])))) + ;; delete any unaffiliated files + (for ([subpath (in-directory SERVER-FILES-PATH)]) + (define name (path->string (file-name-from-path subpath))) + (unless (set-member? existing-ids name) + (delete-file subpath))) + (void)) + +;; commits a file corresponding to the task +(define (server-commit-file taskid data) + (define path (build-path SERVER-FILES-PATH (number->string taskid))) + (call-with-output-file path + (lambda (out) + (write-bytes data out) + (port-fsync out)) + #:mode 'binary + #:exists 'truncate) + (query-exec (current-db) q-set-task-commit taskid)) + +;; rpc calls + +(struct node-info [id name arch type resources online?] #:prefab) +;; manifest is the raw source format +(struct project-info [id name manifest complete?] #:prefab) + +(define (get-nodes type) + (define type-str (symbol->string type)) + (define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list) + (query (current-db) q-get-all-resources type-str))) + (for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)]) + (define online? (and (current-comms) (comms-channel-available? (current-comms) id))) + (node-info id name arch type (hash-ref resources id) online?))) + +(define (make-node name arch type resources) + (call-with-transaction (current-db) (lambda () + (define secret (crypto-sign-make-key)) + (define public (crypto-sign-public-key secret)) + (define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret)) + (for ([res (in-list resources)]) + (query-exec (current-db) q-add-node-res id res)) + (values id public)))) + +(define (configure-agent-binary agent-node agent-arch server-node) + (define binary + (file->bytes + (build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY))) + + (define (configure.linux-gnu) + (define trailing-data (s-exp->fasl (list agent-node server-node))) + ;; write 32 bit unsigned big endian trailer size (including size) + (define trailing-size + (integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t)) + (bytes-append binary trailing-data trailing-size)) + + (match (string-split agent-arch "-") + [(list _ ... "linux" "gnu") (configure.linux-gnu)] + [_ (error "XXX: don't know how to configure arch" agent-arch)])) + +;; manifest is the raw form +(define (make-task name manifest tar) + (define manifest-data (s-exp->fasl manifest)) + (define id (query/insert-id (current-db) q-new-task name manifest-data)) + (server-commit-file id tar) + id) + +(define (enforce-subject type) + (unless (symbol=? type (node-type (current-from-node))) + (error "unauthorized"))) + +(define (enforce-object id type) + (match (query-maybe-value (current-db) q-get-node-type id) + [#f (error "node doesn't exist" id)] + [(== (symbol->string type)) (void)] + [x (error "wrong node type" x)])) + +;; client rpcs + +(define (get-agents) + (enforce-subject 'client) + (get-nodes 'agent)) + +(define (new-agent name arch resources) + (enforce-subject 'client) + (define-values [id public] (make-node name arch 'agent resources)) + (define comms-node (node id name 'agent public #f #f #f)) + (comms-set-node-info (current-comms) comms-node) + id) + +(define (edit-agent id name resources) + (enforce-subject 'client) + (call-with-transaction (current-db) (lambda () + (enforce-object id 'agent) + (define existing-resource (list->set (query-list (current-db) q-get-node-resources id))) + (define new-resource (list->set resources)) + (query-exec (current-db) q-edit-node name id) + (for ([res (in-set (set-subtract new-resource existing-resource))]) + (query-exec (current-db) q-add-node-res id res)) + (for ([res (in-set (set-subtract existing-resource new-resource))]) + (query-exec (current-db) q-del-node-res id res)))) + (define comms-node (comms-get-node-info (current-comms) id)) + (comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))) + +(define (get-agent-deployment id) + (enforce-subject 'client) + (match (query-maybe-row (current-db) q-get-node-info id) + [(vector name arch "agent" secret) + (configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f) + arch (current-server-public-node))] + [_ (error "invalid id or wrong node type")])) + +(define (delete-agent id) + (enforce-subject 'client) + (call-with-transaction (current-db) (lambda () + (enforce-object id 'agent) + (query-exec (current-db) q-delete-node id))) + (comms-delete-node (current-comms) id)) + +;; client rpcs :: projects + +(define (new-project name manifest tar) + (enforce-subject 'client) + (define id (make-task name manifest tar)) + (void id) + id) + +(define (get-projects) + (enforce-subject 'client) + (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)]) + (project-info id name (fasl->s-exp manifest) complete?))) + + +;; agent handling +;; distributing subtasks + +;; minimum batch of inputs that will be scheduled for a node +;; the batch size scales to how fast the node completes a batch +(define MIN-SUBTASK-SIZE 16) +;; aim to batch every 5 minutes +;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved +(define OPTIMAL-COMPLETION-SECS 300) +;; tasks will be reassigned if not completed within this time +(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS)) + +(define (agent-handler) + ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way + (define cust (make-custodian)) + (define this-thread (current-thread)) + ;; make an auto cleanup thread :P + (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust))) + + ;; tasks + ;; semaphore guarding mutation (sigh) + ;; manifest: task manifest (parsed version) + ;; work-pattern: the integer-set representing the work left to do + ;; agent-todo: hash of agent id to integer-set representing work the agent is working on + (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent) + + (define (initialize-task! id mf) + (define sema (make-semaphore 1)) + (define agent-todo (make-hash)) + (define init-pattern-range (range->integer-set 0 (manifest-psize mf))) + ;; subtract the pattern ranges that were already logged as complete + (define pattern-range + (for/fold ([pattern-range init-pattern-range]) + ([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)]) + (define sub (make-integer-set (fasl->s-exp pat-fasl))) + (integer-set-subtract pattern-range sub))) + (task-state id sema mf pattern-range agent-todo)) + + ;; this doesn't update the database - that only gets updated when the work is complete + (define (task-assign! ts agent-id requested-amount) + (unless (positive? requested-amount) (error "requested amount must be positive")) + (call-with-semaphore (task-state-sema ts) (lambda () + (when (hash-has-key? (task-state-agent-todo) agent-id) + (error "agent already has work assigned")) + (define-values [assignment new-wp] + (pattern-range-take (task-state-work-pattern ts) requested-amount)) + (cond + ;; done! (maybe) + ;; check other agents work + [(zero? (integer-set-count assignment)) #f] + ;; update tracking + [else + (hash-set! (task-state-agent-todo agent-id assignment)) + (set-task-state-work-pattern! ts new-wp) + assignment])))) + + ;; returns work from agent back to the regular work pool + (define (task-unassign! ts agent-id) + (call-with-semaphore (task-state-sema ts) (lambda () + (match (hash-ref (task-state-agent-todo ts) agent-id #f) + [#f (void)] + [assignment + (define new-wp (integer-set-union assignment (task-state-work-pattern ts))) + (set-task-state-work-pattern! ts new-wp) + (hash-remove! (task-state-agent-todo ts) agent-id)])))) + + (define (task-complete! ts agent-id time-start time-end) + (call-with-semaphore (task-state-sema ts) (lambda () + (match (hash-ref (task-state-agent-todo ts) agent-id #f) + [#f (void)] + [assignment + ;; add a new work log + (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end + (s-exp->fasl (integer-set-contents assignment))) + ;; remove tracking - this work is now done + (hash-remove! (task-state-agent-todo ts) agent-id)])))) + + (define (agent-thd id resources-in) + ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track + ;; what they're currently working on) + (define make-assignment-id + (let ([counter (random 0 1000)]) + (lambda () (begin0 counter (set! counter (add1 counter)))))) + ;; set of resources + (define resources (list->set resources-in)) + (set! resources-in #f) + + ;; tracks the sizes of task pieces given to the agent + (define task-size (make-hash)) + ;; current tasks, mirrors parent but only contains tasks we have resources for + (define current-tasks (make-hash)) + + ;; assignments + (struct assignment [id taskid assign-data start-time handler] #:transparent) + ;; active assignments, by assignment id + (define assigned-tasks (make-hash)) + + ;; keep running? + (define run-agent-thd? #t) + + (define (send-assignment assignment) + (void "TODO")) + + (define (cancel-assignment assignment) + (void "TODO")) + + (define (handle-thd-msg) + (match (thread-receive) + [(cons 'new-task ts) + (void "TODO")] + [(cons 'cancel-task ts) (void "TODO")] + ;; got completion report from agent + [(cons 'agent-report (cons assignment-id state)) + (match state + ['complete (error "TODO")] + [success-input (error "TODO")])] + ['shutdown (set! run-agent-thd? #f)])) + + ;; TODO : agent rpc to cancel current task, with error handling + + (let loop () + ;; handle events + (define thd-evt (thread-receive-evt)) + ; (define task-timeout + ; (if task-start-time + ; (- TASK-TIMEOUT (- (current-seconds) task-start-time)) + ; #f)) + (define task-timeout #f) + (match (sync/timeout task-timeout thd-evt) + [#f (void "TODO: task timeout")] + [(== thd-evt) (handle-thd-msg)]) + (when run-agent-thd? + (loop))) + + ;; TODO : cleanup, cancel current task, shut down running tasks + (void)) + + ;; id to task-state + (define current-tasks (make-hash)) + + ;; hash of agents to handler thread + (define agents (make-hash)) + + (define (handle-thd-msg) + (match (thread-receive) + [(cons 'new-agent (cons id resources)) + (parameterize ([current-custodian cust]) + (hash-set! agents id (thread (lambda () (agent-thd id resources)))))] + [(cons 'delete-agent id) + (thread-send (hash-ref agents id) 'shutdown #f) + (hash-remove! agents id)] + [(cons 'new-task (cons id manifest)) + (define ts (initialize-task! id manifest)) + (hash-set! current-tasks id ts) + ;; notify agents + (for ([(id thd) (in-hash agents)]) + (thread-send thd (cons 'new-task ts) #f))] + [(cons 'agent-report (cons agent-id (cons assignment-id state))) + (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)] + [(cons 'cancel-task task-id) + (hash-remove! current-tasks task-id) + ;; notify agents + (for ([(id thd) (in-hash agents)]) + (thread-send thd (cons 'cancel-task task-id) #f))] + [_ (error "unknown agent handler message")])) + + (let loop () + (define thd-evt (thread-receive-evt)) + (match (sync thd-evt) + [(== thd-evt) (handle-thd-msg)]) + (loop))) + +(define (make-agent-handler) + (thread agent-handler)) + +(define current-agent-handler (make-parameter #f)) + +;; agent rpcs + +;; report state 'complete or a list of integer representing a success result +(define (agent-report-state assignment-id state) + (enforce-subject 'agent) + (define agent-id (node-id (current-from-node))) + (thread-send (current-agent-handler) + (cons 'agent-report (cons agent-id (cons assignment-id state))))) + + +;; the server impl unit +(define-unit-from-context server-impl@ server^) + +;; command line usage +; (module+ main +; (require racket/cmdline) +; (current-db (open-server-db 'create)) +; (migrate-server-db) +; +; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) +; ((rpc-impl server get-projects))) +; +; ; (make-task "meow-task" '((meow . 10)) #"this is some extra data") +; ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f) +; ; "aarch64-unknown-linux-gnu" +; ; (node 0 "server" 'server #f #f "meow.systems" 1337))) +; ; (with-output-to-file "/tmp/crossfire-agent.configured" +; ; (lambda () (write-bytes data))) +; ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive")) +; ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) +; ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive"))) +; ; (get-nodes 'agent) +; ; (get-nodes 'meow) +; )