diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt
index ee53527..fe25101 100644
--- a/crossfire/comms.rkt
+++ b/crossfire/comms.rkt
@@ -17,8 +17,8 @@
;; along with this program. If not, see .
(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list
- racket/match racket/tcp syntax/parse/define
- (for-syntax racket/base racket/syntax)
+ racket/match racket/tcp racket/unit syntax/parse/define
+ (for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime)
"not-crypto.rkt")
;; define message types (they must all be prefab for fasl)
@@ -503,46 +503,43 @@
;; utility functions and macros for defining rpcs
-;; id generation helpers
-(define-for-syntax (rpc-type-id type)
- (format-id type "rpc-type-~a" (syntax-e type)))
-(define-for-syntax (rpc-impl-id type name)
- (format-id type "rpc-impl-~a-~a" (syntax-e type) (syntax-e name)))
-
;; parameters for comms, tm, and targeted node
(define current-comms (make-parameter #f))
(define current-tm (make-parameter #f))
(define current-to-node (make-parameter #f))
(define current-from-node (make-parameter #f))
-;; defines a class of rpcs
-(define-simple-macro (define-rpc-type type:id)
- #:with def-id (rpc-type-id #'type)
- (define def-id (make-hash)))
-;; defines an rpc implementation, registers it with a given class of rpcs and makes a wrapper to
-;; call it
-(define-simple-macro (define-rpc type:id (name:id args:id ...) body:expr ...)
- #:with def-id (rpc-type-id #'type)
- #:with impl-id (rpc-impl-id #'type #'name)
- (begin
- (define (impl-id args ...) body ...)
- (define (name args ...)
- (tm-transact (current-tm) (node-id (current-to-node)) (quote name) (list args ...)))
- (hash-set! def-id (quote name) impl-id)))
+;; this is entirely a proc macro because idk how to do this in any fancier way
+;; big weh
+;; also macro expanding the results of macros is pretty wack because if racket/unit is not required
+;; in this file, racket assumes "unit" refers to a runtime procedure instead of a macro and proceeds
+;; to expand the "arguments" to it, so you end up with weird errors like "define not permitted in
+;; expression context" and "illegal usage of ^" which only make sense once you realize that's
+;; what's going on
+;; u__u
+(define-for-syntax (rpc-wrapper-unit-helper sig-name)
+ (define-values [parent members vars stxs] (signature-members sig-name sig-name))
+ (define unit-out
+ #`(unit
+ (import)
+ (export #,sig-name)
+ #,@(for/list ([mem (in-list members)])
+ #`(define (#,mem . args)
+ (tm-transact (current-tm) (node-id (current-to-node)) (quote #,mem) args)))))
+ unit-out)
-(define-simple-macro (rpc-impl type:id name:id)
- #:with impl-id (rpc-impl-id #'type #'name)
- impl-id)
-
-;; installs all rpcs of a given rpc class into the transaction manager
-(define-simple-macro (install-rpc-type type:id)
- #:with def-id (rpc-type-id #'type)
- (for ([(k v) (in-hash def-id)])
- (tm-register-rpc (current-tm) k v)))
+;; this creates a wrapper unit for the given signature that delegates to the transaction manager
+;; (current-tm) using (current-to-node)
+;; (define-signature mastodon^ [make-post florp eat-hot-chip lie])
+;; (define wrapper@ (make-rpc-wrapper-unit mastodon^))
+;; (define-values/invoke-unit wrapper@ (import) (export mastodon^))
+(define-syntax make-rpc-wrapper-unit
+ (lambda (stx)
+ (rpc-wrapper-unit-helper (second (syntax-e stx)))))
(provide current-comms current-tm current-to-node current-from-node
- define-rpc-type define-rpc rpc-impl install-rpc-type)
+ make-rpc-wrapper-unit)
; ;; demo code
; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
diff --git a/crossfire/manifest.rkt b/crossfire/manifest.rkt
index aa8ae8a..20569d7 100644
--- a/crossfire/manifest.rkt
+++ b/crossfire/manifest.rkt
@@ -65,11 +65,11 @@
(cons/c 'iset (listof isub/c))
(cons/c 'pattern (listof isub/c)))))
-(struct manifest [data pattern size] #:transparent)
+(struct manifest [data pattern psize] #:transparent)
(provide (contract-out
[struct manifest ((data manifest-def/c)
(pattern (vector/c integer-set?))
- (size integer?))]))
+ (psize integer?))]))
;; this "parses" to the extent that is necessary to avoid unnecessary pattern computations
@@ -112,8 +112,8 @@
[_ (error "unrecognized pattern element in manifest" x)])))
(define pattern (apply vector-append patterns))
- (define size (pattern-count pattern))
- (manifest manifest-def pattern size))
+ (define psize (pattern-count pattern))
+ (manifest manifest-def pattern psize))
;; get data from the manifest
(define/contract (manifest-data-get mf key [fail-thunk #f])
diff --git a/crossfire/protocol.rkt b/crossfire/protocol.rkt
index 1c0efff..088bd09 100644
--- a/crossfire/protocol.rkt
+++ b/crossfire/protocol.rkt
@@ -16,443 +16,23 @@
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see .
-(require db/base db/sqlite3
- data/queue racket/bool racket/fasl racket/file racket/match
- racket/path racket/random racket/runtime-path racket/set racket/string
- north/base north/adapter/base north/adapter/sqlite
- "comms.rkt" "not-crypto.rkt"
- ;; port-fsync
- (submod "static-support.rkt" misc-calls))
-
-;; configuration
-
-(define PRODUCTION? #f)
-
-(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/"))
-(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite"))
-(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/"))
-(define AGENT-ARCH-PREFIX "arch_")
-(define AGENT-BINARY "crossfire-agent")
-
-;; comms node for server (without secret key)
-(define current-server-public-node (make-parameter #f))
-
-;; north migrations
-(define-runtime-path migrations-dir "migrations/")
-
-;; database
-
-(define current-db (make-parameter #f))
-
-(define (open-server-db [mode 'read/write])
- (let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)])
- (query-exec db "pragma foreign_keys=1;")
- db))
-
-;; this allows the server to be capable of migrating itself
-(define (migrate-server-db [db (current-db)])
- ;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all
- ;; programmatically accessible...)
- ;; i also use a runtime path to be a bit more robust
- (define base (path->migration migrations-dir))
- (define adapter (sqlite-adapter db))
- (adapter-init adapter)
- (define current-revision (adapter-current-revision adapter))
- (define target-revision (migration-revision (migration-most-recent base)))
- (define plan (migration-plan base current-revision target-revision))
- (for ([migration (in-list plan)])
- (displayln (format "applying migration: ~a" (migration-revision migration)))
- (adapter-apply! adapter (migration-revision migration) (migration-up migration)))
- (void))
-
-(define-syntax-rule (define-stmt name what)
- (define name (virtual-statement what)))
-
-(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)")
-(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)")
-(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?")
-(define-stmt q-get-nodes "select id, name, arch from node where type=?")
-(define-stmt q-get-all-resources
- "select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid
- where node.type = ?")
-(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
-(define-stmt q-edit-node "update node set name=? where id=?")
-(define-stmt q-delete-node "delete from node where id=?")
-(define-stmt q-get-node-type "select type from node where id=?")
-(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
-
-(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
-(define-stmt q-get-task-id-commit "select id, committed from task")
-(define-stmt q-set-task-commit "update task set committed=1 where id=?")
-(define-stmt q-delete-task "delete from task where id=?")
-(define-stmt q-get-tasks "select id, name, manifest, complete from task")
-
-(define-stmt q-get-task-log
- "select worker, time_start, time_end, pattern from task_log where taskid=?")
-(define-stmt q-add-task-log
- "insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)")
-
-;; utils
-
-(define (query/insert-id db stmt . args)
- (define info (simple-result-info (apply query db stmt args)))
- (cdr (assoc 'insert-id info)))
-
-;; cleanly recovers from potential crash situations
-(define (server-cleanup-unused-files)
- (define existing-ids (mutable-set))
- (call-with-transaction (current-db) (lambda ()
- (define (cleanup id exists? path)
- (displayln (format "removing corrupted/incomplete task ~a" id))
- (when exists? (delete-file path))
- (query-exec (current-db) q-delete-task id))
- (for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
- (set-add! existing-ids (number->string id))
- (define path (build-path SERVER-FILES-PATH (number->string id)))
- (define exists? (file-exists? path))
- (define committed? (= 1 committed))
- (cond
- ;; potentially crashed while file was fsync'd but the directory was not
- [(and committed? (not exists?)) (cleanup id exists? path)]
- ;; crashed between row insert and file fsync
- [(not committed?) (cleanup id exists? path)]
- [else (void)]))))
- ;; delete any unaffiliated files
- (for ([subpath (in-directory SERVER-FILES-PATH)])
- (define name (path->string (file-name-from-path subpath)))
- (unless (set-member? existing-ids name)
- (delete-file subpath)))
- (void))
-
-;; commits a file corresponding to the task
-(define (server-commit-file taskid data)
- (define path (build-path SERVER-FILES-PATH (number->string taskid)))
- (call-with-output-file path
- (lambda (out)
- (write-bytes data out)
- (port-fsync out))
- #:mode 'binary
- #:exists 'truncate)
- (query-exec (current-db) q-set-task-commit taskid))
-
-;; rpc calls
-
-(define-rpc-type server)
-
-(struct node-info [id name arch type resources online?] #:prefab)
-(struct project-info [id name manifest complete?] #:prefab)
-
-(define (get-nodes type)
- (define type-str (symbol->string type))
- (define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list)
- (query (current-db) q-get-all-resources type-str)))
- (for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)])
- (define online? (and (current-comms) (comms-channel-available? (current-comms) id)))
- (node-info id name arch type (hash-ref resources id) online?)))
-
-(define (make-node name arch type resources)
- (call-with-transaction (current-db) (lambda ()
- (define secret (crypto-sign-make-key))
- (define public (crypto-sign-public-key secret))
- (define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret))
- (for ([res (in-list resources)])
- (query-exec (current-db) q-add-node-res id res))
- (values id public))))
-
-(define (configure-agent-binary agent-node agent-arch server-node)
- (define binary
- (file->bytes
- (build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY)))
-
- (define (configure.linux-gnu)
- (define trailing-data (s-exp->fasl (list agent-node server-node)))
- ;; write 32 bit unsigned big endian trailer size (including size)
- (define trailing-size
- (integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t))
- (bytes-append binary trailing-data trailing-size))
-
- (match (string-split agent-arch "-")
- [(list _ ... "linux" "gnu") (configure.linux-gnu)]
- [_ (error "XXX: don't know how to configure arch" agent-arch)]))
-
-(define (make-task name manifest tar)
- (define manifest-data (s-exp->fasl manifest))
- (define id (query/insert-id (current-db) q-new-task name manifest-data))
- (server-commit-file id tar)
- id)
-
-(define (enforce-subject type)
- (unless (symbol=? type (node-type (current-from-node)))
- (error "unauthorized")))
-
-(define (enforce-object id type)
- (match (query-maybe-value (current-db) q-get-node-type id)
- [#f (error "node doesn't exist" id)]
- [(== (symbol->string type)) (void)]
- [x (error "wrong node type" x)]))
-
-;; client rpcs
-
-(define-rpc server (get-agents)
- (enforce-subject 'client)
- (get-nodes 'agent))
-
-(define-rpc server (new-agent name arch resources)
- (enforce-subject 'client)
- (define-values [id public] (make-node name arch 'agent resources))
- (define comms-node (node id name 'agent public #f #f #f))
- (comms-set-node-info (current-comms) comms-node)
- id)
-
-(define-rpc server (edit-agent id name resources)
- (enforce-subject 'client)
- (call-with-transaction (current-db) (lambda ()
- (enforce-object id 'agent)
- (define existing-resource (list->set (query-list (current-db) q-get-node-resources id)))
- (define new-resource (list->set resources))
- (query-exec (current-db) q-edit-node name id)
- (for ([res (in-set (set-subtract new-resource existing-resource))])
- (query-exec (current-db) q-add-node-res id res))
- (for ([res (in-set (set-subtract existing-resource new-resource))])
- (query-exec (current-db) q-del-node-res id res))))
- (define comms-node (comms-get-node-info (current-comms) id))
- (comms-set-node-info (current-comms) (struct-copy node comms-node [name name])))
-
-(define-rpc server (get-agent-deployment id)
- (enforce-subject 'client)
- (match (query-maybe-row (current-db) q-get-node-info id)
- [(vector name arch "agent" secret)
- (configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f)
- arch (current-server-public-node))]
- [_ (error "invalid id or wrong node type")]))
-
-(define-rpc server (delete-agent id)
- (enforce-subject 'client)
- (call-with-transaction (current-db) (lambda ()
- (enforce-object id 'agent)
- (query-exec (current-db) q-delete-node id)))
- (comms-delete-node (current-comms) id))
-
-;; client rpcs :: projects
-
-(define-rpc server (new-project name manifest tar)
- (enforce-subject 'client)
- (define id (make-task name manifest tar))
- (void id)
- id)
-
-(define-rpc server (get-projects)
- (enforce-subject 'client)
- (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
- (project-info id name (fasl->s-exp manifest) complete?)))
-
-
-;; agent handling
-;; distributing subtasks
-
-;; minimum batch of inputs that will be scheduled for a node
-;; the batch size scales to how fast the node completes a batch
-(define MIN-SUBTASK-SIZE 16)
-;; aim to batch every 5 minutes
-;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
-(define OPTIMAL-COMPLETION-SECS 300)
-;; tasks will be reassigned if not completed within this time
-(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS))
-
-(define (agent-handler)
- ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
- (define cust (make-custodian))
- (define this-thread (current-thread))
- ;; make an auto cleanup thread :P
- (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
-
- ;; tasks
- ;; semaphore guarding mutation (sigh)
- ;; manifest: task manifest
- ;; work-pattern: the integer-set representing the work left to do
- ;; agent-todo: hash of agent id to integer-set representing work the agent is working on
- (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent)
-
- (define (initialize-task! id manifest)
- (define sema (make-semaphore 1))
- (define agent-todo (make-hash))
- ;; TODO : manifest processing. 100 hardcoded here u___u
- (define init-pattern-range (range->integer-set 0 100))
- ;; subtract the pattern ranges that were already logged as complete
- (define pattern-range
- (for/fold ([pattern-range init-pattern-range])
- ([(_ _ _ pat-fasl) (in-query (current-db) q-get-task-log id)])
- (define sub (make-integer-set (fasl->s-exp pat-fasl)))
- (integer-set-subtract pattern-range sub)))
- (task-state id sema manifest pattern-range agent-todo))
-
- ;; this doesn't update the database - that only gets updated when the work is complete
- (define (task-assign! ts agent-id requested-amount)
- (unless (positive? requested-amount) (error "requested amount must be positive"))
- (call-with-semaphore (task-state-sema ts) (lambda ()
- (when (hash-has-key? (task-state-agent-todo) agent-id)
- (error "agent already has work assigned"))
- (define-values [assignment new-wp]
- (pattern-range-take (task-state-work-pattern ts) requested-amount))
- (cond
- ;; done! (maybe)
- ;; check other agents work
- [(zero? (integer-set-count assignment)) #f]
- ;; update tracking
- [else
- (hash-set! (task-state-agent-todo agent-id assignment))
- (set-task-state-work-pattern! ts new-wp)
- assignment]))))
-
- ;; returns work from agent back to the regular work pool
- (define (task-unassign! ts agent-id)
- (call-with-semaphore (task-state-sema ts) (lambda ()
- (match (hash-ref (task-state-agent-todo ts) agent-id #f)
- [#f (void)]
- [assignment
- (define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
- (set-task-state-work-pattern! ts new-wp)
- (hash-remove! (task-state-agent-todo ts) agent-id)]))))
-
- (define (task-complete! ts agent-id time-start time-end)
- (call-with-semaphore (task-state-sema ts) (lambda ()
- (match (hash-ref (task-state-agent-todo ts) agent-id #f)
- [#f (void)]
- [assignment
- ;; add a new work log
- (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end
- (s-exp->fasl (integer-set-contents assignment)))
- ;; remove tracking - this work is now done
- (hash-remove! (task-state-agent-todo ts) agent-id)]))))
-
- (define (agent-thd id resources-in)
- ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
- ;; what they're currently working on)
- (define make-assignment-id
- (let ([counter (random 0 1000)])
- (lambda () (begin0 counter (set! counter (add1 counter))))))
- ;; set of resources
- (define resources (list->set resources-in))
- (set! resources-in #f)
-
- ;; tracks the sizes of task pieces given to the agent
- (define task-size (make-hash))
- ;; current tasks, mirrors parent but only contains tasks we have resources for
- (define current-tasks (make-hash))
-
- ;; assignments
- (struct assignment [id taskid assign-data start-time handler] #:transparent)
- ;; active assignments, by assignment id
- (define assigned-tasks (make-hash))
-
- ;; keep running?
- (define run-agent-thd? #t)
-
- (define (send-assignment assignment)
- (void "TODO"))
-
- (define (cancel-assignment assignment)
- (void "TODO"))
-
- (define (handle-thd-msg)
- (match (thread-recieve)
- [(cons 'new-task ts)
- (void "TODO")]
- [(cons 'cancel-task ts) (void "TODO")]
- ;; got completion report from agent
- [(cons 'agent-report (cons assignment-id state))
- (match state
- ['complete (error "TODO")]
- [success-input (error "TODO")])]
- ['shutdown (set! run-agent-thd? #f)]))
-
- ;; TODO : agent rpc to cancel current task, with error handling
-
- (let loop ()
- ;; handle events
- (define thd-evt (thread-receive-evt))
- ; (define task-timeout
- ; (if task-start-time
- ; (- TASK-TIMEOUT (- (current-seconds) task-start-time))
- ; #f))
- (define task-timeout #f)
- (match (sync/timeout task-timeout thd-evt)
- [#f (void "TODO: task timeout")]
- [(== thd-evt) (handle-thd-msg)])
- (when run-agent-thd?
- (loop)))
-
- ;; TODO : cleanup, cancel current task, shut down running tasks
- (void))
-
- ;; id to task-state
- (define current-tasks (make-hash))
-
- ;; hash of agents to handler thread
- (define agents (make-hash))
-
- (define (handle-thd-msg)
- (match (thread-receive)
- [(cons 'new-agent (cons id resources))
- (parameterize ([current-custodian cust])
- (hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
- [(cons 'delete-agent id)
- (thread-send (hash-ref agents id) 'shutdown #f)
- (hash-remove! agents id)]
- [(cons 'new-task (cons id manifest))
- (define ts (initialize-task! id manifest))
- (hash-set! current-tasks id ts)
- ;; notify agents
- (for ([(id thd) (in-hash agents)])
- (thread-send thd (cons 'new-task ts) #f))]
- [(cons 'agent-report (cons agent-id (cons assignment-id state)))
- (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
- [(cons 'cancel-task id)
- (hash-remove! current-tasks id)
- ;; notify agents
- (for ([(id thd) (in-hash agents)])
- (thread-send thd (cons 'cancel-task ts) #f))]
- [_ (error "unknown agent handler message")]))
-
- (let loop ()
- (define thd-evt (thread-receive-evt))
- (match (sync thd-evt)
- [(== thd-evt) (handle-thd-msg)])
- (loop)))
-
-(define (make-agent-handler)
- (thread agent-handler))
-
-(define (current-agent-handler (make-parameter #f)))
-
-;; agent rpcs
-
-;; report state 'complete or a list of integer representing a success result
-(define-rpc server (agent-report-state assignment-id state)
- (enforce-subject 'agent)
- (define agent-id (node-id (current-from-node)))
- (thread-send (current-agent-handler)
- (cons 'agent-report (cons agent-id (cons assignment-id state)))))
-
-
-;; command line usage
-(module+ main
- (require racket/cmdline)
- (current-db (open-server-db 'create))
- (migrate-server-db)
-
- (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
- ((rpc-impl server get-projects)))
-
- ; (make-task "meow-task" '((meow . 10)) #"this is some extra data")
- ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f)
- ; "aarch64-unknown-linux-gnu"
- ; (node 0 "server" 'server #f #f "meow.systems" 1337)))
- ; (with-output-to-file "/tmp/crossfire-agent.configured"
- ; (lambda () (write-bytes data)))
- ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive"))
- ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
- ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive")))
- ; (get-nodes 'agent)
- ; (get-nodes 'meow)
-)
+(require racket/unit
+ "comms.rkt")
+
+(provide server^ agent^ client^)
+
+(define-signature server^
+ [get-agents
+ new-agent
+ edit-agent
+ get-agent-deployment
+ delete-agent
+ new-project
+ get-projects
+ agent-report-state])
+
+(define-signature agent^
+ [todo])
+
+(define-signature client^
+ [todo])
diff --git a/crossfire/server.rkt b/crossfire/server.rkt
new file mode 100644
index 0000000..8a98f0a
--- /dev/null
+++ b/crossfire/server.rkt
@@ -0,0 +1,460 @@
+#lang racket/base
+;; crossfire: distributed brute force infrastructure
+;;
+;; Copyright (C) 2020 haskal
+;;
+;; This program is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU Affero General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;;
+;; This program is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see .
+
+(require db/base db/sqlite3
+ data/queue racket/bool racket/fasl racket/file racket/match racket/path racket/random
+ racket/runtime-path racket/set racket/string racket/unit
+ north/base north/adapter/base north/adapter/sqlite
+ "comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
+ ;; port-fsync
+ (submod "static-support.rkt" misc-calls))
+
+;; configuration
+
+(define PRODUCTION? #f)
+
+(define SERVER-DATA-DIR (if PRODUCTION? "/var/lib/crossfire/" "lib/"))
+(define SERVER-DB-PATH (build-path SERVER-DATA-DIR "crossfire.sqlite"))
+(define SERVER-FILES-PATH (build-path SERVER-DATA-DIR "projects/"))
+(define AGENT-ARCH-PREFIX "arch_")
+(define AGENT-BINARY "crossfire-agent")
+
+;; comms node for server (without secret key)
+(define current-server-public-node (make-parameter #f))
+
+;; north migrations
+(define-runtime-path migrations-dir "migrations/")
+
+;; database
+
+(define current-db (make-parameter #f))
+
+(define (open-server-db [mode 'read/write])
+ (let ([db (sqlite3-connect #:database SERVER-DB-PATH #:mode mode)])
+ (query-exec db "pragma foreign_keys=1;")
+ db))
+
+;; this allows the server to be capable of migrating itself
+(define (migrate-server-db [db (current-db)])
+ ;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all
+ ;; programmatically accessible...)
+ ;; i also use a runtime path to be a bit more robust
+ (define base (path->migration migrations-dir))
+ (define adapter (sqlite-adapter db))
+ (adapter-init adapter)
+ (define current-revision (adapter-current-revision adapter))
+ (define target-revision (migration-revision (migration-most-recent base)))
+ (define plan (migration-plan base current-revision target-revision))
+ (for ([migration (in-list plan)])
+ (displayln (format "applying migration: ~a" (migration-revision migration)))
+ (adapter-apply! adapter (migration-revision migration) (migration-up migration)))
+ (void))
+
+(define-syntax-rule (define-stmt name what)
+ (define name (virtual-statement what)))
+
+(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)")
+(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)")
+(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?")
+(define-stmt q-get-nodes "select id, name, arch from node where type=?")
+(define-stmt q-get-all-resources
+ "select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid
+ where node.type = ?")
+(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
+(define-stmt q-edit-node "update node set name=? where id=?")
+(define-stmt q-delete-node "delete from node where id=?")
+(define-stmt q-get-node-type "select type from node where id=?")
+(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
+
+(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
+(define-stmt q-get-task-id-commit "select id, committed from task")
+(define-stmt q-set-task-commit "update task set committed=1 where id=?")
+(define-stmt q-delete-task "delete from task where id=?")
+(define-stmt q-get-tasks "select id, name, manifest, complete from task")
+
+(define-stmt q-get-task-log
+ "select worker, time_start, time_end, pattern from task_log where taskid=?")
+(define-stmt q-add-task-log
+ "insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)")
+
+;; utils
+
+(define (query/insert-id db stmt . args)
+ (define info (simple-result-info (apply query db stmt args)))
+ (cdr (assoc 'insert-id info)))
+
+;; cleanly recovers from potential crash situations
+(define (server-cleanup-unused-files)
+ (define existing-ids (mutable-set))
+ (call-with-transaction (current-db) (lambda ()
+ (define (cleanup id exists? path)
+ (displayln (format "removing corrupted/incomplete task ~a" id))
+ (when exists? (delete-file path))
+ (query-exec (current-db) q-delete-task id))
+ (for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
+ (set-add! existing-ids (number->string id))
+ (define path (build-path SERVER-FILES-PATH (number->string id)))
+ (define exists? (file-exists? path))
+ (define committed? (= 1 committed))
+ (cond
+ ;; potentially crashed while file was fsync'd but the directory was not
+ [(and committed? (not exists?)) (cleanup id exists? path)]
+ ;; crashed between row insert and file fsync
+ [(not committed?) (cleanup id exists? path)]
+ [else (void)]))))
+ ;; delete any unaffiliated files
+ (for ([subpath (in-directory SERVER-FILES-PATH)])
+ (define name (path->string (file-name-from-path subpath)))
+ (unless (set-member? existing-ids name)
+ (delete-file subpath)))
+ (void))
+
+;; commits a file corresponding to the task
+(define (server-commit-file taskid data)
+ (define path (build-path SERVER-FILES-PATH (number->string taskid)))
+ (call-with-output-file path
+ (lambda (out)
+ (write-bytes data out)
+ (port-fsync out))
+ #:mode 'binary
+ #:exists 'truncate)
+ (query-exec (current-db) q-set-task-commit taskid))
+
+;; rpc calls
+
+(struct node-info [id name arch type resources online?] #:prefab)
+;; manifest is the raw source format
+(struct project-info [id name manifest complete?] #:prefab)
+
+(define (get-nodes type)
+ (define type-str (symbol->string type))
+ (define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list)
+ (query (current-db) q-get-all-resources type-str)))
+ (for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)])
+ (define online? (and (current-comms) (comms-channel-available? (current-comms) id)))
+ (node-info id name arch type (hash-ref resources id) online?)))
+
+(define (make-node name arch type resources)
+ (call-with-transaction (current-db) (lambda ()
+ (define secret (crypto-sign-make-key))
+ (define public (crypto-sign-public-key secret))
+ (define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret))
+ (for ([res (in-list resources)])
+ (query-exec (current-db) q-add-node-res id res))
+ (values id public))))
+
+(define (configure-agent-binary agent-node agent-arch server-node)
+ (define binary
+ (file->bytes
+ (build-path SERVER-DATA-DIR (string-append AGENT-ARCH-PREFIX agent-arch) AGENT-BINARY)))
+
+ (define (configure.linux-gnu)
+ (define trailing-data (s-exp->fasl (list agent-node server-node)))
+ ;; write 32 bit unsigned big endian trailer size (including size)
+ (define trailing-size
+ (integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t))
+ (bytes-append binary trailing-data trailing-size))
+
+ (match (string-split agent-arch "-")
+ [(list _ ... "linux" "gnu") (configure.linux-gnu)]
+ [_ (error "XXX: don't know how to configure arch" agent-arch)]))
+
+;; manifest is the raw form
+(define (make-task name manifest tar)
+ (define manifest-data (s-exp->fasl manifest))
+ (define id (query/insert-id (current-db) q-new-task name manifest-data))
+ (server-commit-file id tar)
+ id)
+
+(define (enforce-subject type)
+ (unless (symbol=? type (node-type (current-from-node)))
+ (error "unauthorized")))
+
+(define (enforce-object id type)
+ (match (query-maybe-value (current-db) q-get-node-type id)
+ [#f (error "node doesn't exist" id)]
+ [(== (symbol->string type)) (void)]
+ [x (error "wrong node type" x)]))
+
+;; client rpcs
+
+(define (get-agents)
+ (enforce-subject 'client)
+ (get-nodes 'agent))
+
+(define (new-agent name arch resources)
+ (enforce-subject 'client)
+ (define-values [id public] (make-node name arch 'agent resources))
+ (define comms-node (node id name 'agent public #f #f #f))
+ (comms-set-node-info (current-comms) comms-node)
+ id)
+
+(define (edit-agent id name resources)
+ (enforce-subject 'client)
+ (call-with-transaction (current-db) (lambda ()
+ (enforce-object id 'agent)
+ (define existing-resource (list->set (query-list (current-db) q-get-node-resources id)))
+ (define new-resource (list->set resources))
+ (query-exec (current-db) q-edit-node name id)
+ (for ([res (in-set (set-subtract new-resource existing-resource))])
+ (query-exec (current-db) q-add-node-res id res))
+ (for ([res (in-set (set-subtract existing-resource new-resource))])
+ (query-exec (current-db) q-del-node-res id res))))
+ (define comms-node (comms-get-node-info (current-comms) id))
+ (comms-set-node-info (current-comms) (struct-copy node comms-node [name name])))
+
+(define (get-agent-deployment id)
+ (enforce-subject 'client)
+ (match (query-maybe-row (current-db) q-get-node-info id)
+ [(vector name arch "agent" secret)
+ (configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f)
+ arch (current-server-public-node))]
+ [_ (error "invalid id or wrong node type")]))
+
+(define (delete-agent id)
+ (enforce-subject 'client)
+ (call-with-transaction (current-db) (lambda ()
+ (enforce-object id 'agent)
+ (query-exec (current-db) q-delete-node id)))
+ (comms-delete-node (current-comms) id))
+
+;; client rpcs :: projects
+
+(define (new-project name manifest tar)
+ (enforce-subject 'client)
+ (define id (make-task name manifest tar))
+ (void id)
+ id)
+
+(define (get-projects)
+ (enforce-subject 'client)
+ (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
+ (project-info id name (fasl->s-exp manifest) complete?)))
+
+
+;; agent handling
+;; distributing subtasks
+
+;; minimum batch of inputs that will be scheduled for a node
+;; the batch size scales to how fast the node completes a batch
+(define MIN-SUBTASK-SIZE 16)
+;; aim to batch every 5 minutes
+;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
+(define OPTIMAL-COMPLETION-SECS 300)
+;; tasks will be reassigned if not completed within this time
+(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS))
+
+(define (agent-handler)
+ ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
+ (define cust (make-custodian))
+ (define this-thread (current-thread))
+ ;; make an auto cleanup thread :P
+ (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
+
+ ;; tasks
+ ;; semaphore guarding mutation (sigh)
+ ;; manifest: task manifest (parsed version)
+ ;; work-pattern: the integer-set representing the work left to do
+ ;; agent-todo: hash of agent id to integer-set representing work the agent is working on
+ (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent)
+
+ (define (initialize-task! id mf)
+ (define sema (make-semaphore 1))
+ (define agent-todo (make-hash))
+ (define init-pattern-range (range->integer-set 0 (manifest-psize mf)))
+ ;; subtract the pattern ranges that were already logged as complete
+ (define pattern-range
+ (for/fold ([pattern-range init-pattern-range])
+ ([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)])
+ (define sub (make-integer-set (fasl->s-exp pat-fasl)))
+ (integer-set-subtract pattern-range sub)))
+ (task-state id sema mf pattern-range agent-todo))
+
+ ;; this doesn't update the database - that only gets updated when the work is complete
+ (define (task-assign! ts agent-id requested-amount)
+ (unless (positive? requested-amount) (error "requested amount must be positive"))
+ (call-with-semaphore (task-state-sema ts) (lambda ()
+ (when (hash-has-key? (task-state-agent-todo) agent-id)
+ (error "agent already has work assigned"))
+ (define-values [assignment new-wp]
+ (pattern-range-take (task-state-work-pattern ts) requested-amount))
+ (cond
+ ;; done! (maybe)
+ ;; check other agents work
+ [(zero? (integer-set-count assignment)) #f]
+ ;; update tracking
+ [else
+ (hash-set! (task-state-agent-todo agent-id assignment))
+ (set-task-state-work-pattern! ts new-wp)
+ assignment]))))
+
+ ;; returns work from agent back to the regular work pool
+ (define (task-unassign! ts agent-id)
+ (call-with-semaphore (task-state-sema ts) (lambda ()
+ (match (hash-ref (task-state-agent-todo ts) agent-id #f)
+ [#f (void)]
+ [assignment
+ (define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
+ (set-task-state-work-pattern! ts new-wp)
+ (hash-remove! (task-state-agent-todo ts) agent-id)]))))
+
+ (define (task-complete! ts agent-id time-start time-end)
+ (call-with-semaphore (task-state-sema ts) (lambda ()
+ (match (hash-ref (task-state-agent-todo ts) agent-id #f)
+ [#f (void)]
+ [assignment
+ ;; add a new work log
+ (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end
+ (s-exp->fasl (integer-set-contents assignment)))
+ ;; remove tracking - this work is now done
+ (hash-remove! (task-state-agent-todo ts) agent-id)]))))
+
+ (define (agent-thd id resources-in)
+ ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
+ ;; what they're currently working on)
+ (define make-assignment-id
+ (let ([counter (random 0 1000)])
+ (lambda () (begin0 counter (set! counter (add1 counter))))))
+ ;; set of resources
+ (define resources (list->set resources-in))
+ (set! resources-in #f)
+
+ ;; tracks the sizes of task pieces given to the agent
+ (define task-size (make-hash))
+ ;; current tasks, mirrors parent but only contains tasks we have resources for
+ (define current-tasks (make-hash))
+
+ ;; assignments
+ (struct assignment [id taskid assign-data start-time handler] #:transparent)
+ ;; active assignments, by assignment id
+ (define assigned-tasks (make-hash))
+
+ ;; keep running?
+ (define run-agent-thd? #t)
+
+ (define (send-assignment assignment)
+ (void "TODO"))
+
+ (define (cancel-assignment assignment)
+ (void "TODO"))
+
+ (define (handle-thd-msg)
+ (match (thread-receive)
+ [(cons 'new-task ts)
+ (void "TODO")]
+ [(cons 'cancel-task ts) (void "TODO")]
+ ;; got completion report from agent
+ [(cons 'agent-report (cons assignment-id state))
+ (match state
+ ['complete (error "TODO")]
+ [success-input (error "TODO")])]
+ ['shutdown (set! run-agent-thd? #f)]))
+
+ ;; TODO : agent rpc to cancel current task, with error handling
+
+ (let loop ()
+ ;; handle events
+ (define thd-evt (thread-receive-evt))
+ ; (define task-timeout
+ ; (if task-start-time
+ ; (- TASK-TIMEOUT (- (current-seconds) task-start-time))
+ ; #f))
+ (define task-timeout #f)
+ (match (sync/timeout task-timeout thd-evt)
+ [#f (void "TODO: task timeout")]
+ [(== thd-evt) (handle-thd-msg)])
+ (when run-agent-thd?
+ (loop)))
+
+ ;; TODO : cleanup, cancel current task, shut down running tasks
+ (void))
+
+ ;; id to task-state
+ (define current-tasks (make-hash))
+
+ ;; hash of agents to handler thread
+ (define agents (make-hash))
+
+ (define (handle-thd-msg)
+ (match (thread-receive)
+ [(cons 'new-agent (cons id resources))
+ (parameterize ([current-custodian cust])
+ (hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
+ [(cons 'delete-agent id)
+ (thread-send (hash-ref agents id) 'shutdown #f)
+ (hash-remove! agents id)]
+ [(cons 'new-task (cons id manifest))
+ (define ts (initialize-task! id manifest))
+ (hash-set! current-tasks id ts)
+ ;; notify agents
+ (for ([(id thd) (in-hash agents)])
+ (thread-send thd (cons 'new-task ts) #f))]
+ [(cons 'agent-report (cons agent-id (cons assignment-id state)))
+ (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
+ [(cons 'cancel-task task-id)
+ (hash-remove! current-tasks task-id)
+ ;; notify agents
+ (for ([(id thd) (in-hash agents)])
+ (thread-send thd (cons 'cancel-task task-id) #f))]
+ [_ (error "unknown agent handler message")]))
+
+ (let loop ()
+ (define thd-evt (thread-receive-evt))
+ (match (sync thd-evt)
+ [(== thd-evt) (handle-thd-msg)])
+ (loop)))
+
+(define (make-agent-handler)
+ (thread agent-handler))
+
+(define current-agent-handler (make-parameter #f))
+
+;; agent rpcs
+
+;; report state 'complete or a list of integer representing a success result
+(define (agent-report-state assignment-id state)
+ (enforce-subject 'agent)
+ (define agent-id (node-id (current-from-node)))
+ (thread-send (current-agent-handler)
+ (cons 'agent-report (cons agent-id (cons assignment-id state)))))
+
+
+;; the server impl unit
+(define-unit-from-context server-impl@ server^)
+
+;; command line usage
+; (module+ main
+; (require racket/cmdline)
+; (current-db (open-server-db 'create))
+; (migrate-server-db)
+;
+; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
+; ((rpc-impl server get-projects)))
+;
+; ; (make-task "meow-task" '((meow . 10)) #"this is some extra data")
+; ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f)
+; ; "aarch64-unknown-linux-gnu"
+; ; (node 0 "server" 'server #f #f "meow.systems" 1337)))
+; ; (with-output-to-file "/tmp/crossfire-agent.configured"
+; ; (lambda () (write-bytes data)))
+; ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive"))
+; ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
+; ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive")))
+; ; (get-nodes 'agent)
+; ; (get-nodes 'meow)
+; )