migrate rpc system to signature/unit-based

This commit is contained in:
xenia 2020-11-22 04:34:30 -05:00
parent c0585504ea
commit 71a3a6d2ef
4 changed files with 513 additions and 476 deletions

View File

@ -17,8 +17,8 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list
racket/match racket/tcp syntax/parse/define
(for-syntax racket/base racket/syntax)
racket/match racket/tcp racket/unit syntax/parse/define
(for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime)
"not-crypto.rkt")
;; define message types (they must all be prefab for fasl)
@ -503,46 +503,43 @@
;; utility functions and macros for defining rpcs
;; id generation helpers
(define-for-syntax (rpc-type-id type)
(format-id type "rpc-type-~a" (syntax-e type)))
(define-for-syntax (rpc-impl-id type name)
(format-id type "rpc-impl-~a-~a" (syntax-e type) (syntax-e name)))
;; parameters for comms, tm, and targeted node
(define current-comms (make-parameter #f))
(define current-tm (make-parameter #f))
(define current-to-node (make-parameter #f))
(define current-from-node (make-parameter #f))
;; defines a class of rpcs
(define-simple-macro (define-rpc-type type:id)
#:with def-id (rpc-type-id #'type)
(define def-id (make-hash)))
;; defines an rpc implementation, registers it with a given class of rpcs and makes a wrapper to
;; call it
(define-simple-macro (define-rpc type:id (name:id args:id ...) body:expr ...)
#:with def-id (rpc-type-id #'type)
#:with impl-id (rpc-impl-id #'type #'name)
(begin
(define (impl-id args ...) body ...)
(define (name args ...)
(tm-transact (current-tm) (node-id (current-to-node)) (quote name) (list args ...)))
(hash-set! def-id (quote name) impl-id)))
;; this is entirely a proc macro because idk how to do this in any fancier way
;; big weh
;; also macro expanding the results of macros is pretty wack because if racket/unit is not required
;; in this file, racket assumes "unit" refers to a runtime procedure instead of a macro and proceeds
;; to expand the "arguments" to it, so you end up with weird errors like "define not permitted in
;; expression context" and "illegal usage of <sig>^" which only make sense once you realize that's
;; what's going on
;; u__u
(define-for-syntax (rpc-wrapper-unit-helper sig-name)
(define-values [parent members vars stxs] (signature-members sig-name sig-name))
(define unit-out
#`(unit
(import)
(export #,sig-name)
#,@(for/list ([mem (in-list members)])
#`(define (#,mem . args)
(tm-transact (current-tm) (node-id (current-to-node)) (quote #,mem) args)))))
unit-out)
(define-simple-macro (rpc-impl type:id name:id)
#:with impl-id (rpc-impl-id #'type #'name)
impl-id)
;; installs all rpcs of a given rpc class into the transaction manager
(define-simple-macro (install-rpc-type type:id)
#:with def-id (rpc-type-id #'type)
(for ([(k v) (in-hash def-id)])
(tm-register-rpc (current-tm) k v)))
;; this creates a wrapper unit for the given signature that delegates to the transaction manager
;; (current-tm) using (current-to-node)
;; (define-signature mastodon^ [make-post florp eat-hot-chip lie])
;; (define wrapper@ (make-rpc-wrapper-unit mastodon^))
;; (define-values/invoke-unit wrapper@ (import) (export mastodon^))
(define-syntax make-rpc-wrapper-unit
(lambda (stx)
(rpc-wrapper-unit-helper (second (syntax-e stx)))))
(provide current-comms current-tm current-to-node current-from-node
define-rpc-type define-rpc rpc-impl install-rpc-type)
make-rpc-wrapper-unit)
; ;; demo code
; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")

View File

@ -65,11 +65,11 @@
(cons/c 'iset (listof isub/c))
(cons/c 'pattern (listof isub/c)))))
(struct manifest [data pattern size] #:transparent)
(struct manifest [data pattern psize] #:transparent)
(provide (contract-out
[struct manifest ((data manifest-def/c)
(pattern (vector/c integer-set?))
(size integer?))]))
(psize integer?))]))
;; this "parses" to the extent that is necessary to avoid unnecessary pattern computations
@ -112,8 +112,8 @@
[_ (error "unrecognized pattern element in manifest" x)])))
(define pattern (apply vector-append patterns))
(define size (pattern-count pattern))
(manifest manifest-def pattern size))
(define psize (pattern-count pattern))
(manifest manifest-def pattern psize))
;; get data from the manifest
(define/contract (manifest-data-get mf key [fail-thunk #f])

View File

@ -16,443 +16,23 @@
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/bool racket/fasl racket/file racket/match
racket/path racket/random racket/runtime-path racket/set racket/string
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "not-crypto.rkt"
;; port-fsync
(submod "static-support.rkt" misc-calls))
;; configuration
(define PRODUCTION? #f)
(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/"))
(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite"))
(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/"))
(define AGENT-ARCH-PREFIX "arch_")
(define AGENT-BINARY "crossfire-agent")
;; comms node for server (without secret key)
(define current-server-public-node (make-parameter #f))
;; north migrations
(define-runtime-path migrations-dir "migrations/")
;; database
(define current-db (make-parameter #f))
(define (open-server-db [mode 'read/write])
(let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)])
(query-exec db "pragma foreign_keys=1;")
db))
;; this allows the server to be capable of migrating itself
(define (migrate-server-db [db (current-db)])
;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all
;; programmatically accessible...)
;; i also use a runtime path to be a bit more robust
(define base (path->migration migrations-dir))
(define adapter (sqlite-adapter db))
(adapter-init adapter)
(define current-revision (adapter-current-revision adapter))
(define target-revision (migration-revision (migration-most-recent base)))
(define plan (migration-plan base current-revision target-revision))
(for ([migration (in-list plan)])
(displayln (format "applying migration: ~a" (migration-revision migration)))
(adapter-apply! adapter (migration-revision migration) (migration-up migration)))
(void))
(define-syntax-rule (define-stmt name what)
(define name (virtual-statement what)))
(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)")
(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)")
(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?")
(define-stmt q-get-nodes "select id, name, arch from node where type=?")
(define-stmt q-get-all-resources
"select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid
where node.type = ?")
(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
(define-stmt q-edit-node "update node set name=? where id=?")
(define-stmt q-delete-node "delete from node where id=?")
(define-stmt q-get-node-type "select type from node where id=?")
(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
(define-stmt q-get-task-id-commit "select id, committed from task")
(define-stmt q-set-task-commit "update task set committed=1 where id=?")
(define-stmt q-delete-task "delete from task where id=?")
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
(define-stmt q-get-task-log
"select worker, time_start, time_end, pattern from task_log where taskid=?")
(define-stmt q-add-task-log
"insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)")
;; utils
(define (query/insert-id db stmt . args)
(define info (simple-result-info (apply query db stmt args)))
(cdr (assoc 'insert-id info)))
;; cleanly recovers from potential crash situations
(define (server-cleanup-unused-files)
(define existing-ids (mutable-set))
(call-with-transaction (current-db) (lambda ()
(define (cleanup id exists? path)
(displayln (format "removing corrupted/incomplete task ~a" id))
(when exists? (delete-file path))
(query-exec (current-db) q-delete-task id))
(for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
(set-add! existing-ids (number->string id))
(define path (build-path SERVER-FILES-PATH (number->string id)))
(define exists? (file-exists? path))
(define committed? (= 1 committed))
(cond
;; potentially crashed while file was fsync'd but the directory was not
[(and committed? (not exists?)) (cleanup id exists? path)]
;; crashed between row insert and file fsync
[(not committed?) (cleanup id exists? path)]
[else (void)]))))
;; delete any unaffiliated files
(for ([subpath (in-directory SERVER-FILES-PATH)])
(define name (path->string (file-name-from-path subpath)))
(unless (set-member? existing-ids name)
(delete-file subpath)))
(void))
;; commits a file corresponding to the task
(define (server-commit-file taskid data)
(define path (build-path SERVER-FILES-PATH (number->string taskid)))
(call-with-output-file path
(lambda (out)
(write-bytes data out)
(port-fsync out))
#:mode 'binary
#:exists 'truncate)
(query-exec (current-db) q-set-task-commit taskid))
;; rpc calls
(define-rpc-type server)
(struct node-info [id name arch type resources online?] #:prefab)
(struct project-info [id name manifest complete?] #:prefab)
(define (get-nodes type)
(define type-str (symbol->string type))
(define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list)
(query (current-db) q-get-all-resources type-str)))
(for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)])
(define online? (and (current-comms) (comms-channel-available? (current-comms) id)))
(node-info id name arch type (hash-ref resources id) online?)))
(define (make-node name arch type resources)
(call-with-transaction (current-db) (lambda ()
(define secret (crypto-sign-make-key))
(define public (crypto-sign-public-key secret))
(define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret))
(for ([res (in-list resources)])
(query-exec (current-db) q-add-node-res id res))
(values id public))))
(define (configure-agent-binary agent-node agent-arch server-node)
(define binary
(file->bytes
(build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY)))
(define (configure.linux-gnu)
(define trailing-data (s-exp->fasl (list agent-node server-node)))
;; write 32 bit unsigned big endian trailer size (including size)
(define trailing-size
(integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t))
(bytes-append binary trailing-data trailing-size))
(match (string-split agent-arch "-")
[(list _ ... "linux" "gnu") (configure.linux-gnu)]
[_ (error "XXX: don't know how to configure arch" agent-arch)]))
(define (make-task name manifest tar)
(define manifest-data (s-exp->fasl manifest))
(define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar)
id)
(define (enforce-subject type)
(unless (symbol=? type (node-type (current-from-node)))
(error "unauthorized")))
(define (enforce-object id type)
(match (query-maybe-value (current-db) q-get-node-type id)
[#f (error "node doesn't exist" id)]
[(== (symbol->string type)) (void)]
[x (error "wrong node type" x)]))
;; client rpcs
(define-rpc server (get-agents)
(enforce-subject 'client)
(get-nodes 'agent))
(define-rpc server (new-agent name arch resources)
(enforce-subject 'client)
(define-values [id public] (make-node name arch 'agent resources))
(define comms-node (node id name 'agent public #f #f #f))
(comms-set-node-info (current-comms) comms-node)
id)
(define-rpc server (edit-agent id name resources)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(define existing-resource (list->set (query-list (current-db) q-get-node-resources id)))
(define new-resource (list->set resources))
(query-exec (current-db) q-edit-node name id)
(for ([res (in-set (set-subtract new-resource existing-resource))])
(query-exec (current-db) q-add-node-res id res))
(for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res))))
(define comms-node (comms-get-node-info (current-comms) id))
(comms-set-node-info (current-comms) (struct-copy node comms-node [name name])))
(define-rpc server (get-agent-deployment id)
(enforce-subject 'client)
(match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch "agent" secret)
(configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f)
arch (current-server-public-node))]
[_ (error "invalid id or wrong node type")]))
(define-rpc server (delete-agent id)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(query-exec (current-db) q-delete-node id)))
(comms-delete-node (current-comms) id))
;; client rpcs :: projects
(define-rpc server (new-project name manifest tar)
(enforce-subject 'client)
(define id (make-task name manifest tar))
(void id)
id)
(define-rpc server (get-projects)
(enforce-subject 'client)
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?)))
;; agent handling
;; distributing subtasks
;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch
(define MIN-SUBTASK-SIZE 16)
;; aim to batch every 5 minutes
;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
(define OPTIMAL-COMPLETION-SECS 300)
;; tasks will be reassigned if not completed within this time
(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS))
(define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
(define cust (make-custodian))
(define this-thread (current-thread))
;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; tasks
;; semaphore guarding mutation (sigh)
;; manifest: task manifest
;; work-pattern: the integer-set representing the work left to do
;; agent-todo: hash of agent id to integer-set representing work the agent is working on
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent)
(define (initialize-task! id manifest)
(define sema (make-semaphore 1))
(define agent-todo (make-hash))
;; TODO : manifest processing. 100 hardcoded here u___u
(define init-pattern-range (range->integer-set 0 100))
;; subtract the pattern ranges that were already logged as complete
(define pattern-range
(for/fold ([pattern-range init-pattern-range])
([(_ _ _ pat-fasl) (in-query (current-db) q-get-task-log id)])
(define sub (make-integer-set (fasl->s-exp pat-fasl)))
(integer-set-subtract pattern-range sub)))
(task-state id sema manifest pattern-range agent-todo))
;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount)
(unless (positive? requested-amount) (error "requested amount must be positive"))
(call-with-semaphore (task-state-sema ts) (lambda ()
(when (hash-has-key? (task-state-agent-todo) agent-id)
(error "agent already has work assigned"))
(define-values [assignment new-wp]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
(cond
;; done! (maybe)
;; check other agents work
[(zero? (integer-set-count assignment)) #f]
;; update tracking
[else
(hash-set! (task-state-agent-todo agent-id assignment))
(set-task-state-work-pattern! ts new-wp)
assignment]))))
;; returns work from agent back to the regular work pool
(define (task-unassign! ts agent-id)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
(define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
(set-task-state-work-pattern! ts new-wp)
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (task-complete! ts agent-id time-start time-end)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
;; add a new work log
(query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end
(s-exp->fasl (integer-set-contents assignment)))
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (agent-thd id resources-in)
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
;; what they're currently working on)
(define make-assignment-id
(let ([counter (random 0 1000)])
(lambda () (begin0 counter (set! counter (add1 counter))))))
;; set of resources
(define resources (list->set resources-in))
(set! resources-in #f)
;; tracks the sizes of task pieces given to the agent
(define task-size (make-hash))
;; current tasks, mirrors parent but only contains tasks we have resources for
(define current-tasks (make-hash))
;; assignments
(struct assignment [id taskid assign-data start-time handler] #:transparent)
;; active assignments, by assignment id
(define assigned-tasks (make-hash))
;; keep running?
(define run-agent-thd? #t)
(define (send-assignment assignment)
(void "TODO"))
(define (cancel-assignment assignment)
(void "TODO"))
(define (handle-thd-msg)
(match (thread-recieve)
[(cons 'new-task ts)
(void "TODO")]
[(cons 'cancel-task ts) (void "TODO")]
;; got completion report from agent
[(cons 'agent-report (cons assignment-id state))
(match state
['complete (error "TODO")]
[success-input (error "TODO")])]
['shutdown (set! run-agent-thd? #f)]))
;; TODO : agent rpc to cancel current task, with error handling
(let loop ()
;; handle events
(define thd-evt (thread-receive-evt))
; (define task-timeout
; (if task-start-time
; (- TASK-TIMEOUT (- (current-seconds) task-start-time))
; #f))
(define task-timeout #f)
(match (sync/timeout task-timeout thd-evt)
[#f (void "TODO: task timeout")]
[(== thd-evt) (handle-thd-msg)])
(when run-agent-thd?
(loop)))
;; TODO : cleanup, cancel current task, shut down running tasks
(void))
;; id to task-state
(define current-tasks (make-hash))
;; hash of agents to handler thread
(define agents (make-hash))
(define (handle-thd-msg)
(match (thread-receive)
[(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust])
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f)
(hash-remove! agents id)]
[(cons 'new-task (cons id manifest))
(define ts (initialize-task! id manifest))
(hash-set! current-tasks id ts)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'new-task ts) #f))]
[(cons 'agent-report (cons agent-id (cons assignment-id state)))
(thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
[(cons 'cancel-task id)
(hash-remove! current-tasks id)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task ts) #f))]
[_ (error "unknown agent handler message")]))
(let loop ()
(define thd-evt (thread-receive-evt))
(match (sync thd-evt)
[(== thd-evt) (handle-thd-msg)])
(loop)))
(define (make-agent-handler)
(thread agent-handler))
(define (current-agent-handler (make-parameter #f)))
;; agent rpcs
;; report state 'complete or a list of integer representing a success result
(define-rpc server (agent-report-state assignment-id state)
(enforce-subject 'agent)
(define agent-id (node-id (current-from-node)))
(thread-send (current-agent-handler)
(cons 'agent-report (cons agent-id (cons assignment-id state)))))
;; command line usage
(module+ main
(require racket/cmdline)
(current-db (open-server-db 'create))
(migrate-server-db)
(parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
((rpc-impl server get-projects)))
; (make-task "meow-task" '((meow . 10)) #"this is some extra data")
; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f)
; "aarch64-unknown-linux-gnu"
; (node 0 "server" 'server #f #f "meow.systems" 1337)))
; (with-output-to-file "/tmp/crossfire-agent.configured"
; (lambda () (write-bytes data)))
; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive"))
; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive")))
; (get-nodes 'agent)
; (get-nodes 'meow)
)
(require racket/unit
"comms.rkt")
(provide server^ agent^ client^)
(define-signature server^
[get-agents
new-agent
edit-agent
get-agent-deployment
delete-agent
new-project
get-projects
agent-report-state])
(define-signature agent^
[todo])
(define-signature client^
[todo])

460
crossfire/server.rkt Normal file
View File

@ -0,0 +1,460 @@
#lang racket/base
;; crossfire: distributed brute force infrastructure
;;
;; Copyright (C) 2020 haskal
;;
;; This program is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Affero General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/bool racket/fasl racket/file racket/match racket/path racket/random
racket/runtime-path racket/set racket/string racket/unit
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync
(submod "static-support.rkt" misc-calls))
;; configuration
(define PRODUCTION? #f)
(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/"))
(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite"))
(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/"))
(define AGENT-ARCH-PREFIX "arch_")
(define AGENT-BINARY "crossfire-agent")
;; comms node for server (without secret key)
(define current-server-public-node (make-parameter #f))
;; north migrations
(define-runtime-path migrations-dir "migrations/")
;; database
(define current-db (make-parameter #f))
(define (open-server-db [mode 'read/write])
(let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)])
(query-exec db "pragma foreign_keys=1;")
db))
;; this allows the server to be capable of migrating itself
(define (migrate-server-db [db (current-db)])
;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all
;; programmatically accessible...)
;; i also use a runtime path to be a bit more robust
(define base (path->migration migrations-dir))
(define adapter (sqlite-adapter db))
(adapter-init adapter)
(define current-revision (adapter-current-revision adapter))
(define target-revision (migration-revision (migration-most-recent base)))
(define plan (migration-plan base current-revision target-revision))
(for ([migration (in-list plan)])
(displayln (format "applying migration: ~a" (migration-revision migration)))
(adapter-apply! adapter (migration-revision migration) (migration-up migration)))
(void))
(define-syntax-rule (define-stmt name what)
(define name (virtual-statement what)))
(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)")
(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)")
(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?")
(define-stmt q-get-nodes "select id, name, arch from node where type=?")
(define-stmt q-get-all-resources
"select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid
where node.type = ?")
(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
(define-stmt q-edit-node "update node set name=? where id=?")
(define-stmt q-delete-node "delete from node where id=?")
(define-stmt q-get-node-type "select type from node where id=?")
(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
(define-stmt q-get-task-id-commit "select id, committed from task")
(define-stmt q-set-task-commit "update task set committed=1 where id=?")
(define-stmt q-delete-task "delete from task where id=?")
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
(define-stmt q-get-task-log
"select worker, time_start, time_end, pattern from task_log where taskid=?")
(define-stmt q-add-task-log
"insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)")
;; utils
(define (query/insert-id db stmt . args)
(define info (simple-result-info (apply query db stmt args)))
(cdr (assoc 'insert-id info)))
;; cleanly recovers from potential crash situations
(define (server-cleanup-unused-files)
(define existing-ids (mutable-set))
(call-with-transaction (current-db) (lambda ()
(define (cleanup id exists? path)
(displayln (format "removing corrupted/incomplete task ~a" id))
(when exists? (delete-file path))
(query-exec (current-db) q-delete-task id))
(for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
(set-add! existing-ids (number->string id))
(define path (build-path SERVER-FILES-PATH (number->string id)))
(define exists? (file-exists? path))
(define committed? (= 1 committed))
(cond
;; potentially crashed while file was fsync'd but the directory was not
[(and committed? (not exists?)) (cleanup id exists? path)]
;; crashed between row insert and file fsync
[(not committed?) (cleanup id exists? path)]
[else (void)]))))
;; delete any unaffiliated files
(for ([subpath (in-directory SERVER-FILES-PATH)])
(define name (path->string (file-name-from-path subpath)))
(unless (set-member? existing-ids name)
(delete-file subpath)))
(void))
;; commits a file corresponding to the task
(define (server-commit-file taskid data)
(define path (build-path SERVER-FILES-PATH (number->string taskid)))
(call-with-output-file path
(lambda (out)
(write-bytes data out)
(port-fsync out))
#:mode 'binary
#:exists 'truncate)
(query-exec (current-db) q-set-task-commit taskid))
;; rpc calls
(struct node-info [id name arch type resources online?] #:prefab)
;; manifest is the raw source format
(struct project-info [id name manifest complete?] #:prefab)
(define (get-nodes type)
(define type-str (symbol->string type))
(define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list)
(query (current-db) q-get-all-resources type-str)))
(for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)])
(define online? (and (current-comms) (comms-channel-available? (current-comms) id)))
(node-info id name arch type (hash-ref resources id) online?)))
(define (make-node name arch type resources)
(call-with-transaction (current-db) (lambda ()
(define secret (crypto-sign-make-key))
(define public (crypto-sign-public-key secret))
(define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret))
(for ([res (in-list resources)])
(query-exec (current-db) q-add-node-res id res))
(values id public))))
(define (configure-agent-binary agent-node agent-arch server-node)
(define binary
(file->bytes
(build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY)))
(define (configure.linux-gnu)
(define trailing-data (s-exp->fasl (list agent-node server-node)))
;; write 32 bit unsigned big endian trailer size (including size)
(define trailing-size
(integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t))
(bytes-append binary trailing-data trailing-size))
(match (string-split agent-arch "-")
[(list _ ... "linux" "gnu") (configure.linux-gnu)]
[_ (error "XXX: don't know how to configure arch" agent-arch)]))
;; manifest is the raw form
(define (make-task name manifest tar)
(define manifest-data (s-exp->fasl manifest))
(define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar)
id)
(define (enforce-subject type)
(unless (symbol=? type (node-type (current-from-node)))
(error "unauthorized")))
(define (enforce-object id type)
(match (query-maybe-value (current-db) q-get-node-type id)
[#f (error "node doesn't exist" id)]
[(== (symbol->string type)) (void)]
[x (error "wrong node type" x)]))
;; client rpcs
(define (get-agents)
(enforce-subject 'client)
(get-nodes 'agent))
(define (new-agent name arch resources)
(enforce-subject 'client)
(define-values [id public] (make-node name arch 'agent resources))
(define comms-node (node id name 'agent public #f #f #f))
(comms-set-node-info (current-comms) comms-node)
id)
(define (edit-agent id name resources)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(define existing-resource (list->set (query-list (current-db) q-get-node-resources id)))
(define new-resource (list->set resources))
(query-exec (current-db) q-edit-node name id)
(for ([res (in-set (set-subtract new-resource existing-resource))])
(query-exec (current-db) q-add-node-res id res))
(for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res))))
(define comms-node (comms-get-node-info (current-comms) id))
(comms-set-node-info (current-comms) (struct-copy node comms-node [name name])))
(define (get-agent-deployment id)
(enforce-subject 'client)
(match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch "agent" secret)
(configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f)
arch (current-server-public-node))]
[_ (error "invalid id or wrong node type")]))
(define (delete-agent id)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(query-exec (current-db) q-delete-node id)))
(comms-delete-node (current-comms) id))
;; client rpcs :: projects
(define (new-project name manifest tar)
(enforce-subject 'client)
(define id (make-task name manifest tar))
(void id)
id)
(define (get-projects)
(enforce-subject 'client)
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?)))
;; agent handling
;; distributing subtasks
;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch
(define MIN-SUBTASK-SIZE 16)
;; aim to batch every 5 minutes
;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
(define OPTIMAL-COMPLETION-SECS 300)
;; tasks will be reassigned if not completed within this time
(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS))
(define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
(define cust (make-custodian))
(define this-thread (current-thread))
;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; tasks
;; semaphore guarding mutation (sigh)
;; manifest: task manifest (parsed version)
;; work-pattern: the integer-set representing the work left to do
;; agent-todo: hash of agent id to integer-set representing work the agent is working on
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent)
(define (initialize-task! id mf)
(define sema (make-semaphore 1))
(define agent-todo (make-hash))
(define init-pattern-range (range->integer-set 0 (manifest-psize mf)))
;; subtract the pattern ranges that were already logged as complete
(define pattern-range
(for/fold ([pattern-range init-pattern-range])
([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)])
(define sub (make-integer-set (fasl->s-exp pat-fasl)))
(integer-set-subtract pattern-range sub)))
(task-state id sema mf pattern-range agent-todo))
;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount)
(unless (positive? requested-amount) (error "requested amount must be positive"))
(call-with-semaphore (task-state-sema ts) (lambda ()
(when (hash-has-key? (task-state-agent-todo) agent-id)
(error "agent already has work assigned"))
(define-values [assignment new-wp]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
(cond
;; done! (maybe)
;; check other agents work
[(zero? (integer-set-count assignment)) #f]
;; update tracking
[else
(hash-set! (task-state-agent-todo agent-id assignment))
(set-task-state-work-pattern! ts new-wp)
assignment]))))
;; returns work from agent back to the regular work pool
(define (task-unassign! ts agent-id)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
(define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
(set-task-state-work-pattern! ts new-wp)
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (task-complete! ts agent-id time-start time-end)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
;; add a new work log
(query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end
(s-exp->fasl (integer-set-contents assignment)))
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (agent-thd id resources-in)
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
;; what they're currently working on)
(define make-assignment-id
(let ([counter (random 0 1000)])
(lambda () (begin0 counter (set! counter (add1 counter))))))
;; set of resources
(define resources (list->set resources-in))
(set! resources-in #f)
;; tracks the sizes of task pieces given to the agent
(define task-size (make-hash))
;; current tasks, mirrors parent but only contains tasks we have resources for
(define current-tasks (make-hash))
;; assignments
(struct assignment [id taskid assign-data start-time handler] #:transparent)
;; active assignments, by assignment id
(define assigned-tasks (make-hash))
;; keep running?
(define run-agent-thd? #t)
(define (send-assignment assignment)
(void "TODO"))
(define (cancel-assignment assignment)
(void "TODO"))
(define (handle-thd-msg)
(match (thread-receive)
[(cons 'new-task ts)
(void "TODO")]
[(cons 'cancel-task ts) (void "TODO")]
;; got completion report from agent
[(cons 'agent-report (cons assignment-id state))
(match state
['complete (error "TODO")]
[success-input (error "TODO")])]
['shutdown (set! run-agent-thd? #f)]))
;; TODO : agent rpc to cancel current task, with error handling
(let loop ()
;; handle events
(define thd-evt (thread-receive-evt))
; (define task-timeout
; (if task-start-time
; (- TASK-TIMEOUT (- (current-seconds) task-start-time))
; #f))
(define task-timeout #f)
(match (sync/timeout task-timeout thd-evt)
[#f (void "TODO: task timeout")]
[(== thd-evt) (handle-thd-msg)])
(when run-agent-thd?
(loop)))
;; TODO : cleanup, cancel current task, shut down running tasks
(void))
;; id to task-state
(define current-tasks (make-hash))
;; hash of agents to handler thread
(define agents (make-hash))
(define (handle-thd-msg)
(match (thread-receive)
[(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust])
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f)
(hash-remove! agents id)]
[(cons 'new-task (cons id manifest))
(define ts (initialize-task! id manifest))
(hash-set! current-tasks id ts)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'new-task ts) #f))]
[(cons 'agent-report (cons agent-id (cons assignment-id state)))
(thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
[(cons 'cancel-task task-id)
(hash-remove! current-tasks task-id)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task task-id) #f))]
[_ (error "unknown agent handler message")]))
(let loop ()
(define thd-evt (thread-receive-evt))
(match (sync thd-evt)
[(== thd-evt) (handle-thd-msg)])
(loop)))
(define (make-agent-handler)
(thread agent-handler))
(define current-agent-handler (make-parameter #f))
;; agent rpcs
;; report state 'complete or a list of integer representing a success result
(define (agent-report-state assignment-id state)
(enforce-subject 'agent)
(define agent-id (node-id (current-from-node)))
(thread-send (current-agent-handler)
(cons 'agent-report (cons agent-id (cons assignment-id state)))))
;; the server impl unit
(define-unit-from-context server-impl@ server^)
;; command line usage
; (module+ main
; (require racket/cmdline)
; (current-db (open-server-db 'create))
; (migrate-server-db)
;
; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
; ((rpc-impl server get-projects)))
;
; ; (make-task "meow-task" '((meow . 10)) #"this is some extra data")
; ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f)
; ; "aarch64-unknown-linux-gnu"
; ; (node 0 "server" 'server #f #f "meow.systems" 1337)))
; ; (with-output-to-file "/tmp/crossfire-agent.configured"
; ; (lambda () (write-bytes data)))
; ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive"))
; ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
; ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive")))
; ; (get-nodes 'agent)
; ; (get-nodes 'meow)
; )