crossfire/crossfire/server.rkt

935 lines
38 KiB
Racket

#lang racket/base
;; crossfire: distributed brute force infrastructure
;;
;; Copyright (C) 2020 haskal
;;
;; This program is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Affero General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file
racket/function racket/list racket/logging racket/match racket/path racket/random
racket/runtime-path racket/set racket/string racket/unit srfi/19
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "info.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync
(submod "static-support.rkt" misc-calls))
;; logging
(define-logger server #:parent global-logger)
;; configuration
(define *server-node-id* 0)
(define *production?* #f)
(define *config-root* (if *production?* "/etc/" "etc/"))
(define *state-root* (if *production?* "/var/lib/crossfire/" "lib/"))
(define *lib-root* (if *production?* "/usr/lib/" "lib/"))
(define (get-binary-path-for-arch arch)
(build-path *lib-root* (string-append "arch_" arch) "crossfire-agent"))
(define *server-config-path* (build-path *config-root* "crossfire.rktd"))
(define *server-db-path* (build-path *state-root* "crossfire.sqlite"))
(define *server-files-path* (build-path *state-root* "projects/"))
(define *server-seckey-path* (build-path *state-root* "server.key"))
(define (get-project-file-path projid)
(build-path *server-files-path* (number->string projid)))
;; comms node for server (without secret key)
(define current-server-public-node (make-parameter #f))
;; north migrations
(define-runtime-path migrations-dir "migrations/")
;; database
(define current-db (make-parameter #f))
(define (open-server-db [mode 'read/write])
(let ([db (sqlite3-connect #:database *server-db-path* #:mode mode)])
(query-exec db "pragma foreign_keys=1;")
db))
;; this allows the server to be capable of migrating itself
(define (migrate-server-db [db (current-db)])
;; these are the steps taken by the north cli tool (it's a bit verbose but at least it's all
;; programmatically accessible...)
;; i also use a runtime path to be a bit more robust
(define base (path->migration migrations-dir))
(define adapter (sqlite-adapter db))
(adapter-init adapter)
(define current-revision (adapter-current-revision adapter))
(define target-revision (migration-revision (migration-most-recent base)))
(define plan (migration-plan base current-revision target-revision))
(for ([migration (in-list plan)])
(log-server-info "applying migration: ~a" (migration-revision migration))
(adapter-apply! adapter (migration-revision migration) (migration-up migration)))
(void))
(define-syntax-rule (define-stmt name what)
(define name (virtual-statement what)))
(define-stmt q-new-node "insert into node (name, arch, type, secret) values (?, ?, ?, ?)")
(define-stmt q-add-node-res "insert or ignore into node_resource (nodeid, resource) values (?, ?)")
(define-stmt q-del-node-res "delete from node_resource where nodeid=? and resource=?")
(define-stmt q-get-nodes "select id, name, arch from node where type=?")
(define-stmt q-get-all-resources
"select nodeid, resource from node_resource inner join node on node.id = node_resource.nodeid
where node.type = ?")
(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
(define-stmt q-edit-node "update node set name=? where id=?")
(define-stmt q-delete-node "delete from node where id=?")
(define-stmt q-get-node-type "select type from node where id=?")
(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
(define-stmt q-get-task-id-commit "select id, committed from task")
(define-stmt q-set-task-commit "update task set committed=1 where id=?")
(define-stmt q-delete-task "delete from task where id=?")
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
(define-stmt q-set-task-complete "update task set complete=1 where id=?")
(define-stmt q-get-task-log
"select worker, time_wall_start, duration, pattern from task_log where taskid=?")
(define-stmt q-add-task-log
"insert into task_log (taskid, worker, time_wall_start, duration, pattern)
values (?, ?, ?, ?, ?)")
(define-stmt q-add-task-match
"insert into task_match (taskid, worker, time_wall, match) values (?, ?, ?, ?)")
;; utils
;; time helpers (because time is a bigge heck)
;; monotonic time can be different than wall clock time
;; for our purposes, tasks have two measures of time associated with them:
;; - wall clock instant representing when the task was started
;; - duration of the task (as a difference of monotonic times)
;; it's important never to mix up wall clock and monotonic measurements, and never to take a
;; difference between two wall clock times
;; fortunately, racket provides srfi/19 as part of the default installation which is very convenient
(define (current-seconds-monotonic)
(time-second (current-time 'time-monotonic)))
(define (current-seconds-utc)
(time-second (current-time 'time-utc)))
(define (query/insert-id db stmt . args)
(define info (simple-result-info (apply query db stmt args)))
(cdr (assoc 'insert-id info)))
;; cleanly recovers from potential crash situations
(define (server-cleanup-unused-files)
(define existing-ids (mutable-set))
(call-with-transaction (current-db) (lambda ()
(define (cleanup id exists? path)
(log-server-warning "removing corrupted/incomplete task ~a" id)
(when exists? (delete-file path))
(query-exec (current-db) q-delete-task id))
(for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
(set-add! existing-ids (number->string id))
(define path (get-project-file-path id))
(define exists? (file-exists? path))
(define committed? (= 1 committed))
(cond
;; potentially crashed while file was fsync'd but the directory was not
[(and committed? (not exists?)) (cleanup id exists? path)]
;; crashed between row insert and file fsync
[(not committed?) (cleanup id exists? path)]
[else (void)]))))
;; delete any unaffiliated files
(for ([subpath (in-directory *server-files-path*)])
(define name (path->string (file-name-from-path subpath)))
(unless (set-member? existing-ids name)
(delete-file subpath)))
(void))
;; commits a file corresponding to the task
(define (server-commit-file taskid data)
(define path (get-project-file-path taskid))
(call-with-output-file path
(lambda (out)
(write-bytes data out)
(port-fsync out))
#:mode 'binary
#:exists 'truncate)
(query-exec (current-db) q-set-task-commit taskid))
;; computes a hash of the file identifying its current contents for agents
;; (in case we reuse taskids)
(define (server-hash-file taskid)
(define path (get-project-file-path taskid))
(define *read-size* 8192)
(call-with-input-file path (lambda (in)
(define ctx (crypto-blake2b-init))
(let loop ()
(match (read-bytes *read-size* in)
[(? eof-object?) (void)]
[data (crypto-blake2b-update ctx data)
(loop)]))
(crypto-blake2b-final ctx))))
(define (server-get-file taskid)
(define path (get-project-file-path taskid))
(file->bytes path))
(struct node-info [id name arch type resources online?] #:prefab)
;; manifest is the raw source format
(struct project-info [id name manifest complete?] #:prefab)
(define (get-nodes type)
(define type-str (symbol->string type))
(define resources (rows->dict #:key "nodeid" #:value "resource" #:value-mode '(list)
(query (current-db) q-get-all-resources type-str)))
(for/list ([(id name arch) (in-query (current-db) q-get-nodes type-str)])
(define online? (and (current-comms) (comms-channel-available? (current-comms) id)))
(node-info id name arch type (hash-ref resources id) online?)))
(define (make-node name arch type resources)
(call-with-transaction (current-db) (lambda ()
(define secret (crypto-sign-make-key))
(define public (crypto-sign-public-key secret))
(define id (query/insert-id (current-db) q-new-node name arch (symbol->string type) secret))
(for ([res (in-list resources)])
(query-exec (current-db) q-add-node-res id res))
(values id public))))
(define (load-comms-node id [with-secret? #f] [with-arch? #f])
(match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch type secret)
(define node-val
(node id name (string->symbol type) (crypto-sign-public-key secret)
(and with-secret? secret) #f #f))
(if with-arch?
(values node-val arch)
node-val)]
[_ (error "invalid node id" id)]))
(define (restore-comms-node id)
(comms-set-node-info (current-comms) (load-comms-node id)))
(define (configure-agent-binary agent-node agent-arch server-node)
(define binary
(file->bytes
(get-binary-path-for-arch agent-arch)))
(define (configure.linux-gnu)
(define trailing-data (s-exp->fasl (list agent-node server-node)))
;; write 32 bit unsigned big endian trailer size (including size)
(define trailing-size
(integer->integer-bytes (+ 4 (bytes-length trailing-data)) 4 #f #t))
(bytes-append binary trailing-data trailing-size))
(match (string-split agent-arch "-")
[(list _ ... "linux" "gnu") (configure.linux-gnu)]
[_ (error "XXX: don't know how to configure arch" agent-arch)]))
;; manifest is the raw form
(define (make-task manifest tar)
(define manifest-data (s-exp->fasl manifest))
(define name (second (assoc 'name manifest)))
(define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar)
id)
(define (enforce-subject type)
;; override if the from-node is us
(unless (or (symbol=? type (node-type (current-from-node)))
(= (node-id (current-from-node)) *server-node-id*))
(error "unauthorized")))
(define (enforce-object id type)
(match (query-maybe-value (current-db) q-get-node-type id)
[#f (error "node doesn't exist" id)]
[(== (symbol->string type)) (void)]
[x (error "wrong node type" x)]))
;; client rpcs
(define (get-agents)
(enforce-subject 'client)
(get-nodes 'agent))
(define/contract (new-agent name arch resources)
(-> string? string? (listof string?) integer?)
(enforce-subject 'client)
(define-values [id public] (make-node name arch 'agent resources))
(define comms-node (node id name 'agent public #f #f #f))
(comms-set-node-info (current-comms) comms-node)
(agent-handler-new-agent id arch resources)
id)
(define/contract (edit-agent id name resources)
(-> integer? string? (listof string?) void?)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(define existing-resource (list->set (query-list (current-db) q-get-node-resources id)))
(define new-resource (list->set resources))
(query-exec (current-db) q-edit-node name id)
(for ([res (in-set (set-subtract new-resource existing-resource))])
(query-exec (current-db) q-add-node-res id res))
(for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res))))
(define-values [comms-node arch] (load-comms-node id #f #t))
(comms-set-node-info (current-comms) comms-node)
(agent-handler-delete-agent id)
(agent-handler-new-agent id arch resources)
(void))
(define/contract (get-agent-deployment id)
(-> integer? bytes?)
;; TODO : streaming interface
(enforce-subject 'client)
(define-values [agent-node arch] (load-comms-node id #t #t))
(match (node-type agent-node)
['agent (configure-agent-binary agent-node arch (current-server-public-node))]
[_ (error "invalid node type")]))
(define/contract (delete-agent id)
(-> integer? void?)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(query-exec (current-db) q-delete-node id)))
(comms-delete-node (current-comms) id)
(agent-handler-delete-agent id)
(void))
;; client rpcs :: projects
(define/contract (new-project manifest tar)
(-> list? bytes? integer?)
;; TODO : streaming interface
(enforce-subject 'client)
;; check validity
(define mf-parsed (parse-manifest manifest))
(define id (make-task manifest tar))
;; notify agent handler
(agent-handler-new-task id mf-parsed)
id)
(define (get-projects)
(enforce-subject 'client)
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?)))
(define/contract (get-project-file taskid)
(-> integer? bytes?)
;; TODO : streaming interface
(with-handlers ([exn:fail? (lambda (ex) (error "unable to fetch the requested file"))])
(server-get-file taskid)))
;; agent handling
;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch
(define *min-subtask-size* 16)
;; aim to batch every 5 minutes
;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
(define *optimal-completion-secs* 300)
;; tasks will be reassigned if not completed within this time
(define *subtask-timeout* (* 3 *optimal-completion-secs*))
;; constatns for agent rpc retries
(define *min-retry-delay* 2)
(define *max-retry-delay* 120)
(define *retry-delay-ratio* 2)
(define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
(define cust (make-custodian))
(define this-thread (current-thread))
(current-agent-handler this-thread)
;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; setup agent rpcs
(define agent-wrapper@ (make-rpc-wrapper-unit agent^))
(define-values/invoke-unit agent-wrapper@ (import) (export agent^))
;; tasks
;; semaphore guarding mutation (sigh)
;; manifest: task manifest (parsed version)
;; work-pattern: the integer-set representing the work left to do
;; agent-todo: hash of agent id to integer-set representing work the agent is working on
;; file-hash: the hash to send to agents to identify the contents of the project file more
;; precisely than just the taskid, which allows them to cache the file locally
;; completed-work: an integer set of completed work
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash
[completed-work #:mutable]] #:transparent)
(define (initialize-task id mf)
(define file-hash (server-hash-file id))
(define sema (make-semaphore 1))
(define agent-todo (make-hash))
(define init-pattern-range (range->integer-set 0 (manifest-psize mf)))
;; subtract the pattern ranges that were already logged as complete
(define-values [pattern-range completed-work]
(for/fold ([pattern-range init-pattern-range]
[completed-work (make-integer-set '())])
([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)])
(define sub (make-integer-set (fasl->s-exp pat-fasl)))
(values (integer-set-subtract pattern-range sub) (integer-set-union completed-work sub))))
(task-state id sema mf pattern-range agent-todo file-hash completed-work))
(define (task-has-work? ts)
(not (and (zero? (integer-set-count (task-state-work-pattern ts)))
(hash-empty? (task-state-agent-todo ts)))))
(define (task-set-complete! ts)
(log-server-info "fully completed task: ~a" (task-state-id ts))
(query-exec (current-db) q-set-task-complete (task-state-id ts))
;; TODO : notification mechanism
(handle-stop-task (task-state-id ts)))
;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount)
(unless (positive? requested-amount) (error "requested amount must be positive"))
(call-with-semaphore (task-state-sema ts) (lambda ()
(when (hash-has-key? (task-state-agent-todo ts) agent-id)
(error "agent already has work assigned"))
(define-values [assignment new-wp]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
(cond
;; done! (maybe)
[(zero? (integer-set-count assignment))
(define at (task-state-agent-todo ts))
(cond
[(hash-empty? at) ;; actually done. cancel all in-progress assignments and celebrate uwu
;; are we going to hold up literally everything because we're still holding this
;;semaphore during a database write?
;; probably
;; does it actually matter?
;; probably not
(task-set-complete! ts)
#f]
[else ;; steal work lol
;; this will massively overcommit the last few parts of a project and potentially
;; prioritize doing useless duplicate work instead of moving on to the next project
;; but it'll be fiiiiiine don't worry
(define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)]))
(integer-set-union iset v))
(define-values [assignment _]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
(hash-set! at agent-id assignment)
assignment])]
;; update tracking
[else
(hash-set! (task-state-agent-todo ts) agent-id assignment)
(set-task-state-work-pattern! ts new-wp)
assignment]))))
;; returns work from agent back to the regular work pool
(define (task-unassign! ts agent-id)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
(define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
(set-task-state-work-pattern!
ts (integer-set-subtract new-wp (task-state-completed-work ts)))
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
;; adds to task log, then updates work pool with task completion
(define (task-complete! ts agent-id time-wall-start duration)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
[assignment
;; add a new work log
(query-exec
(current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration
(s-exp->fasl (integer-set-contents assignment)))
(define new-completed (integer-set-union (task-state-completed-work ts) assignment))
(set-task-state-completed-work! new-completed)
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)
;; check if we're fully complete. if so, mark the task complete in the database and cancel
;; all related assignments
(unless (task-has-work? ts)
(task-set-complete! ts))]))))
(define (agent-thd id arch resources-in msg-chan)
;; initialize to-node for rpcs
(current-to-node (comms-get-node-info (current-comms) id))
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
;; what they're currently working on)
(define make-assignment-id
(let ([counter (random 0 1000)])
(lambda () (begin0 counter (set! counter (add1 counter))))))
;; set of resources
(define resources (list->set resources-in))
(set! resources-in #f)
;; tracks the sizes of task pieces given to the agent
;; task id to count of assignment size
(define task-size (make-hash))
;; current tasks, mirrors parent but only contains tasks we have resources for
(define current-tasks (make-hash))
;; assignments
(struct assignment [id taskid start-time-utc start-time-monotonic] #:transparent)
;; active assignments, by assignment id
(define assigned-tasks (make-hash))
;; keep running?
(define run-agent-thd? #t)
(define (handle-cleanup)
;; unassign all tasks
(for ([(aid av) (in-hash assigned-tasks)])
(define taskid (assignment-taskid av))
(define ts (hash-ref current-tasks taskid #f))
(unless (false? ts)
(task-unassign! ts id)))
;; call agent rpc to cancel everything
(cancel-all-assignments))
;; helper to repeatedly invoke an agent rpc
(define (invoke/retry-forever proc)
(let init-loop ([retry-delay *min-retry-delay*])
(define maybe-exn
(with-handlers ([exn:fail? identity])
(proc)
#f))
(when maybe-exn
(log-server-error "agent ~a encountered error" id)
((error-display-handler) (exn-message maybe-exn) maybe-exn)
(sleep retry-delay)
(init-loop (min *max-retry-delay*
(* *retry-delay-ratio* retry-delay))))))
;; #t if a new assignment was added, otherwise #f
(define (create-assignment! ts)
(define requested-amount (hash-ref! task-size (task-state-id ts) *min-subtask-size*))
;; integer set of assignment data, or false
(define assign-data (task-assign! ts id requested-amount))
(cond
[(false? assign-data) #f]
[else
(define start-time-utc (current-seconds-utc))
(define start-time-monotonic (current-seconds-monotonic))
(define aid (make-assignment-id))
(define mf-parsed (task-state-manifest ts))
(define file-hash (task-state-file-hash ts))
(define mf-raw (serialize-manifest mf-parsed))
;; add to local tracking
(hash-set! assigned-tasks aid (assignment aid (task-state-id ts) start-time-utc
start-time-monotonic))
;; send agent rpc
(invoke/retry-forever
(lambda () (push-assignment aid (task-state-id ts) mf-raw file-hash
(integer-set-contents assign-data))))
#t]))
(define (cancel-assignment! assignment)
;; tell the agent to cancel work, unassign the assignment
(define ts (hash-ref current-tasks (assignment-taskid assignment) #f))
;; update manager tracking
(unless (false? ts)
(task-unassign! ts id))
(hash-remove! assigned-tasks (assignment-id assignment))
;; send agent rpc
(invoke/retry-forever (lambda () (cancel-assignment (assignment-id assignment)))))
;; records a match for a certain assignment
(define (add-assignment-match! assignment success-input)
;; TODO : notify other things that a match occurred maybe?
;; set complete!!!
(log-server-info "agent handler ~a: match for task ~a: ~a" id (assignment-taskid assignment)
success-input)
(query-exec
(current-db) q-add-task-match
(assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input)))
(define (complete-assignment! assignment)
(log-server-info "agent handler ~a: completed assignment for task ~a" id
(assignment-taskid assignment))
(define end-time-monotonic (current-seconds-monotonic))
(define duration (- end-time-monotonic (assignment-start-time-monotonic assignment)))
(define ts (hash-ref current-tasks (assignment-taskid assignment) #f))
(hash-remove! assigned-tasks (assignment-id assignment))
(unless (false? ts)
;; mark complete
(task-complete! ts id (assignment-start-time-utc assignment) duration)
;; update next task size request
(cond
[(< duration *optimal-completion-secs*)
(log-server-info "agent handler ~a: increasing task allocation" id)
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (* 2 v)) *min-subtask-size*)]
[(> duration (* 2 *optimal-completion-secs*))
(log-server-info "agent handler ~a: decreasing task allocation" id)
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (max *min-subtask-size* (/ 2 v))) *min-subtask-size*)]
[else (void)])))
(define (update-assignments!)
(log-server-info "agent handler ~a: updating assignments" id)
;; detect set of used resources
;; see set of current inactive tasks
;; if a task uses all free resources, assign it
(define used-resources
(for/fold ([used-resources (set)]) ([(aid av) (in-hash assigned-tasks)])
(define task (hash-ref current-tasks (assignment-taskid av)))
(define manifest (task-state-manifest task))
(set-union used-resources (list->set (manifest-data-ref manifest 'resources '())))))
(define available-resources (set-subtract resources used-resources))
(define assigned-taskids
(for/set ([(aid av) (in-hash assigned-tasks)]) (assignment-taskid av)))
;; the scheduling prioritizer is currently:
;; yolo just pick randomly
;; this should be good enough for all basic use cases
(define task-list
(shuffle
(for/list ([(tid ts) (in-hash current-tasks)]
#:unless (set-member? assigned-taskids tid))
ts)))
;; assigns the first task in the list that we are capable of doing
;; returns if we could maybe assign more
(define (select-task! task-list)
(match task-list
['() #f]
[(cons head tail)
(define manifest (task-state-manifest head))
(define needed-resources (list->set (manifest-data-ref manifest 'resources '())))
(define needed-arch (manifest-data-ref manifest 'arch '("any")))
(define right-arch? (or (member "any" needed-arch) (member arch needed-arch)))
(if (and right-arch?
(task-has-work? head)
(subset? available-resources needed-resources))
(create-assignment! head)
(select-task! tail))]))
(when (select-task! task-list)
(update-assignments!)))
(define (handle-command data)
(match data
[(cons 'new-task ts)
(log-server-info "agent handler ~a got new task ~a" id (task-state-id ts))
(hash-set! current-tasks (task-state-id ts) ts)
(update-assignments!)]
[(cons 'cancel-task ts)
(define assignments
(for/list ([(_ v) (in-hash assigned-tasks)]
#:when (= (task-state-id ts) (assignment-taskid v)))
v))
(for ([av (in-list assignments)])
(cancel-assignment! av))
(hash-remove! current-tasks (task-state-id ts) ts)
(hash-remove! task-size (task-state-id ts) ts)]
;; got completion report from agent
[(cons 'agent-report status)
(match status
;; no current assignment
[(cons #f _) (void)] ;; do nothing
;; current assignment incomplete
[(cons assignment-id 'incomplete) (void)] ;; also do nothing
;; current assignment complete
[(cons assignment-id 'complete)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (complete-assignment! av))
(update-assignments!)]
;; got succeeding input
[(cons assignment-id success-input)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (add-assignment-match! av success-input))])]
['shutdown (set! run-agent-thd? #f)]))
(define (handle-assignment-timeout)
;; on timeout, work is returned to the assignment pool and other agents may not be actually
;; notified of this. but because of work stealing they should have already attempting to
;; steal the work so there shouldn't actually be a situation where an agent thread is asleep
;; when work is returned to the pool
(define time (current-seconds-monotonic))
(define overdue-assignments
(filter (lambda (av)
(define st (assignment-start-time-monotonic av))
(>= (- time st) *subtask-timeout*))
(hash-values assigned-tasks)))
(for ([overdue (in-list overdue-assignments)])
(define taskid (assignment-taskid overdue))
;; revert to the smallest possible task size, just in case
;; we don't want to spin forever assigning potentially gigantic tasks and timing out
;; although ideally this case wouldn't occur because the timeout is three times the target
;; subtask duration
(hash-set! task-size taskid *min-subtask-size*)
(log-server-warning "agent ~a timed out on task ~a" id taskid)
(cancel-assignment! overdue)))
(with-handlers ([exn? (lambda (ex) (handle-cleanup) (raise ex))])
;; wait for agent node to become present
;; TODO : comms should have some sort notification mechanism for this
(log-server-info "agent handler ~a waiting for agent" id)
(let loop ()
(unless (comms-channel-available? (current-comms) id)
(sleep 10)
(loop)))
(log-server-info "agent handler ~a: agent online" id)
;; cancel whatever the agent is currently working on, in case the server crashed and came back
;; up or something
;; we want agent state synchronized with what we think it should be
(invoke/retry-forever (lambda () (cancel-all-assignments)))
(let loop ()
;; handle events
(define time (current-seconds-monotonic))
(define timeout-list
(for/list ([(aid av) (in-hash assigned-tasks)])
(define st (assignment-start-time-monotonic av))
(max 0 (- *subtask-timeout* (- time st)))))
(define nearest-timeout (if (empty? timeout-list) #f (apply min timeout-list)))
(match (sync/timeout nearest-timeout msg-chan)
[#f (handle-assignment-timeout)]
[data (handle-command data)])
(when run-agent-thd?
(loop)))
(handle-cleanup)
(void)))
;; id to task-state
(define current-tasks (make-hash))
;; hash of agents to handler channel
(define agents (make-hash))
;; run handler
(define run-handler? #t)
;; first two thread messages send the comms and tm
(current-comms (thread-receive))
(current-tm (thread-receive))
(define (handle-stop-task task-id)
;; notify agents
(for ([(id agent-data) (in-hash agents)])
(async-channel-put (car agent-data) (cons 'cancel-task task-id)))
(hash-remove! current-tasks task-id))
(define (handle-thd-msg)
(define (handle-delete-agent id)
(match-define (cons chan thd) (hash-ref agents id))
(async-channel-put chan 'shutdown)
;; give it some time, then break the thread
(sync/timeout 5 thd)
(break-thread thd)
(hash-remove! agents id))
(match (thread-receive)
[(cons 'new-agent (cons id (cons arch resources)))
(define cmd-channel (make-async-channel))
(define thd
(parameterize ([current-custodian cust])
;; TODO : add monitor thread that detects crashes and unassigns tasks
(thread (lambda () (agent-thd id arch resources cmd-channel)))))
;; todo : notify agent of current tasks
(hash-set! agents id (cons cmd-channel thd))]
[(cons 'delete-agent id)
(handle-delete-agent id)]
[(cons 'new-task (cons id manifest))
(define ts (initialize-task id manifest))
(hash-set! current-tasks id ts)
;; notify agents
(for ([(agent-id agent-data) (in-hash agents)])
(log-server-info "registering task ~a with agent ~a" id agent-id)
(async-channel-put (car agent-data) (cons 'new-task ts)))]
[(cons 'agent-report (cons agent-id (cons assignment-id state)))
(async-channel-put (car (hash-ref agents agent-id))
(cons 'agent-report (cons assignment-id state)))]
[(cons 'cancel-task task-id)
(handle-stop-task task-id)]
['shutdown
(for ([(id _) (in-hash agents)])
(handle-delete-agent id))
(set! run-handler? #f)]
[_ (error "unknown agent handler message")]))
(let loop ()
(define thd-evt (thread-receive-evt))
(match (sync thd-evt)
[(== thd-evt) (handle-thd-msg)])
(when run-handler? (loop))))
(define (make-agent-handler)
;; TODO : monitor this thread because dying is probably fatal
(thread agent-handler))
(define current-agent-handler (make-parameter #f))
(define (init-agent-handler [ah (current-agent-handler)])
(thread-send ah (current-comms))
(thread-send ah (current-tm)))
(define (agent-handler-new-agent id arch resources [ah (current-agent-handler)])
(thread-send ah (cons 'new-agent (cons id (cons arch resources)))))
(define (agent-handler-delete-agent id [ah (current-agent-handler)])
(thread-send ah (cons 'delete-agent id)))
(define (agent-handler-new-task id manifest [ah (current-agent-handler)])
(thread-send ah (cons 'new-task (cons id manifest))))
(define (agent-handler-cancel-task task-id [ah (current-agent-handler)])
(thread-send ah (cons 'cancel-task task-id)))
(define (agent-handler-shutdown [ah (current-agent-handler)])
(thread-send ah 'shutdown)
(thread-wait ah))
;; agent rpcs
;; report state 'incomplete 'complete or a list of integer representing a success result
;; assignment-id is a numeric ID or #f if no currently assigned task
;; agents can call this as many times as they want -- it can serve as a sort of "ping"
(define (agent-report-state assignment-id state)
(-> (or/c false? integer?) (or/c 'incomplete 'complete (listof integer?)))
(enforce-subject 'agent)
(define agent-id (node-id (current-from-node)))
;; TODO : maybe wait for an actual completion here? idk
(thread-send (current-agent-handler)
(cons 'agent-report (cons agent-id (cons assignment-id state)))))
;; the server impl unit
(define-unit-from-context server-impl@ server^)
;; command line usage
(module+ main
(require racket/cmdline)
;; basic server initialization
(install-logging!)
(log-server-info "starting crossfire-server v~a" (#%info-lookup 'version))
(define server-config (call-with-input-file *server-config-path* read))
(unless (list? server-config)
(error "corrupted config file, expected a list of config entries"))
(define (config-get key pred)
(define (make-err-fmt actual)
(format "invalid config!\n expected: (~a <value satisfying ~a>)\n got: ~a" key
(if (contract? pred) (format "~v" pred) (object-name pred))
actual))
(match (assoc key server-config)
[#f (error (make-err-fmt "<nothing>"))]
[(list _ (? pred data)) data]
[x (error (make-err-fmt x))]))
(current-db (open-server-db 'create))
(migrate-server-db)
;; load or create secret key
(unless (file-exists? *server-seckey-path*)
(log-server-info "generating new secret key")
(call-with-output-file *server-seckey-path*
(lambda (out)
(write-bytes (crypto-sign-make-key) out)
(port-fsync out))))
(define seckey (file->bytes *server-seckey-path*))
(define pubkey (crypto-sign-public-key seckey))
(define listen-addr
(match (config-get 'listen-addr (or/c 'auto string?))
['auto "0.0.0.0"]
[addr addr]))
(define server (node *server-node-id* (config-get 'name string?) 'server pubkey seckey
listen-addr (config-get 'listen-port integer?)))
(define public-addr
(match (config-get 'public-addr (or/c 'auto string?))
['auto (error "TODO auto public-addr unimplemented")]
[addr addr]))
(define public-port
(match (config-get 'public-port (or/c 'auto integer?))
['auto (node-port server)]
[port port]))
(current-server-public-node
(struct-copy node server [seckey #f] [host public-addr] [port public-port]))
;; read command line
;; TODO : read cmdline for admin commands
;; ideally allow the admin commands to coexist with an actual current running server
(match (current-command-line-arguments)
[(vector "dev-new-agent" name arch)
(define-values [id _] (make-node name arch 'agent '("cpu")))
(define agent-node (load-comms-node id #t))
(call-with-output-file (build-path *state-root* (string-append name ".rktd"))
(lambda (out) (write (list agent-node (current-server-public-node)) out)))
(log-server-info "created dev agent ~a" name)
(exit)]
[(vector "dev-new-project")
(define id (make-task '((name "test project") (arch "any")
(resources "cpu") (pattern "meow?d?d")) #"no file contents lol"))
(log-server-info "created project")
(exit)]
[(vector subcmd _ ...)
(error "invalid subcommand" subcmd)]
[argv (void)])
;; start server and start doing stuff
(current-agent-handler (make-agent-handler))
(current-comms (make-comms server))
(current-tm (make-transaction-manager server (current-comms)))
(rpc-register-all server^ server-impl@)
(init-agent-handler)
;; restore agents
(for ([agent (in-list (get-nodes 'agent))])
(log-server-info "restoring agent ~a" (node-info-id agent))
(restore-comms-node (node-info-id agent))
(agent-handler-new-agent (node-info-id agent) (node-info-arch agent)
(node-info-resources agent)))
;; restore active tasks
(for ([(id name manifest-in complete?) (in-query (current-db) q-get-tasks)]
#:when (zero? complete?))
(log-server-info "restoring project ~a" id)
(define manifest (parse-manifest (fasl->s-exp manifest-in)))
(agent-handler-new-task id manifest))
;; now server is ready to start listening
(comms-listen (current-comms) (node-port server))
(log-server-info "server running")
;; wait for break
(with-handlers ([exn:break? void])
(sync never-evt))
;; shutdown
;; a second break aborts clean shutdown. ideally, don't break again unless necessary
(log-server-info "stopping server")
(agent-handler-shutdown)
(tm-shutdown (current-tm))
(comms-shutdown (current-comms))
(void))