implement basic simulated agent, fix issues

This commit is contained in:
xenia 2020-11-25 03:53:18 -05:00
parent beae13c7eb
commit 647b3fe443
6 changed files with 314 additions and 184 deletions

View File

@ -18,7 +18,7 @@
.PHONY: all clean
APP_NAME=crossfire-agent
RKT_NAME=$(APP_NAME).rkt
RKT_NAME=../crossfire/agent.rkt
MONOCYPHER_VERSION=3.1.1

View File

@ -1,47 +0,0 @@
#lang racket/base
;; crossfire: distributed brute force infrastructure
;;
;; Copyright (C) 2020 haskal
;;
;; This program is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Affero General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/fasl racket/file racket/match racket/port racket/string
"../crossfire/static-support.rkt"
"../crossfire/not-crypto.rkt" "../crossfire/comms.rkt")
; (crypto-sign-public-key #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
;
; (define key (crypto-lock-make-key))
; (define nonce (crypto-lock-make-nonce))
; (define pt #"hello world")
;
; (define-values [ct mac] (crypto-lock key nonce pt))
;
; (displayln (crypto-unlock key nonce mac ct))
; (displayln (crypto-unlock key nonce mac (bytes-append ct #"abcd")))
(define (get-config.linux-gnu)
(call-with-input-file "/proc/self/exe"
(lambda (in)
(file-position in eof)
(define len (file-position in))
(file-position in (- len 4))
(define offset (integer-bytes->integer (port->bytes in) #f #t))
(file-position in (- len offset))
(fasl->s-exp in))))
(match (string-split (static-ffi-arch) "-")
[(list _ ... "linux" "gnu") (get-config.linux-gnu)]
[arch (error "XXX: don't know how to get config on arch" arch)])

110
crossfire/agent.rkt Normal file
View File

@ -0,0 +1,110 @@
#lang racket/base
;; crossfire: distributed brute force infrastructure
;;
;; Copyright (C) 2020 haskal
;;
;; This program is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Affero General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/contract racket/fasl racket/file racket/match racket/port racket/string racket/unit
"comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt"
"static-support.rkt")
(define-logger agent #:parent global-logger)
(define (get-config.linux-gnu)
(call-with-input-file "/proc/self/exe"
(lambda (in)
(file-position in eof)
(define len (file-position in))
(file-position in (- len 4))
(define offset (integer-bytes->integer (port->bytes in) #f #t))
(file-position in (- len offset))
(fasl->s-exp in))))
;; main loop
(define (agent-loop)
(sleep 10)
(agent-loop))
;; rpc impl
(define/contract (push-assignment aid tid mf-raw file-hash assign-data)
(-> integer? integer? list? bytes? (listof pair?) void?)
(log-agent-info "got push-assignment ~a ~a ~s ~a" aid mf-raw file-hash assign-data)
(thread (lambda ()
(log-agent-info "downloading assignment ~a" aid)
(define data (get-project-file tid))
(log-agent-info "assignment data: ~s" data)
(log-agent-info "simulating assignment ~a" aid)
(sleep 10)
(log-agent-info "sending completion ~a" aid)
(agent-report-state aid 'complete)))
(void))
(define/contract (cancel-assignment aid)
(-> integer? void?)
(log-agent-info "got cancel-assignment ~a" aid)
(void))
(define/contract (cancel-all-assignments)
(-> void?)
(log-agent-info "got cancel-all-assignments")
(void))
;; agent impl unit
(define-unit-from-context agent-impl@ agent^)
;; server wrapper unit
(define server-wrapper@ (make-rpc-wrapper-unit server^))
(define-values/invoke-unit server-wrapper@ (import) (export server^))
(module+ main
(require racket/cmdline)
(install-logging!)
(log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version))
(match-define (list agent-node server-node)
(if (static-ffi-available?)
(match (string-split (static-ffi-arch) "-")
[(list _ ... "linux" "gnu") (get-config.linux-gnu)]
[arch (error "XXX: don't know how to get config on arch" arch)])
(let ([config-location (make-parameter #f)])
(log-agent-info "loading development config using command line")
(command-line
#:once-each
[("-c" "--config") param "development plain config file" (config-location param)]
#:args ()
(call-with-input-file (config-location) read)))))
(current-to-node server-node)
(current-comms (make-comms agent-node))
(current-tm (make-transaction-manager agent-node (current-comms)))
(rpc-register-all agent^ agent-impl@)
(comms-set-node-info (current-comms) server-node)
(log-agent-info "connecting to server...")
(let loop ([sleep-time 1])
(with-handlers ([exn? (lambda (ex)
(log-agent-error "error connecting to server: ~a" ex)
(sleep sleep-time)
(loop (min 120 (* sleep-time 2))))])
(comms-connect (current-comms) (node-id server-node))
(agent-report-state #f #f)))
(log-agent-info "connected! ready to do stuff")
(agent-loop))

View File

@ -67,6 +67,7 @@
(define (comms-event-loop my-node startup-thd)
;; this thread
(define el-thread (current-thread))
(current-comms el-thread)
;; performs a handshake with a new peer connection
;; raises exn:fail if any part of the handshake fails
@ -222,7 +223,8 @@
(match (hash-ref node-registry id #f)
[#f (thread-send from (make-error "no such node" id) #f)]
[(node id name type pubkey seckey host port)
(make-peer-thread (lambda () (tcp-connect host port)))
(when (and host port)
(make-peer-thread (lambda () (tcp-connect host port))))
(thread-send from (void) #f)])))))
;; notify that startup is done
@ -359,6 +361,7 @@
;; transactional messages support
(define (transaction-manager my-node comms startup-thd)
(current-tm (current-thread))
(define run-tm #t)
(define trans-id 0)
@ -383,7 +386,7 @@
[(? exn?)
(define-values (ty _) (struct-info data))
(define-values (name _2 _3 _4 _5 _6 _7 _8) (struct-type-info ty))
`('exn ,name ,(exn-message data))]
`(exn ,name ,(exn-message data))]
[x x]))
(define (trans-data-deserialize data)
@ -560,42 +563,3 @@
(provide current-comms current-tm current-to-node current-from-node
make-rpc-wrapper-unit rpc-register-all)
; ;; demo code
; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
; (define server-pk (crypto-sign-public-key server-sk))
; (define client-sk #"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
; (define client-pk (crypto-sign-public-key client-sk))
;
; (define server-node (node 0 "server" 'server server-pk server-sk "localhost" 1337))
; (define client-node (node 1 "client" 'client client-pk client-sk #f #f))
;
; (require racket/cmdline)
;
; (define mode
; (command-line #:args (mode) mode))
; (match mode
; ["server"
; (define comms (make-comms server-node))
; (comms-set-node-info comms client-node)
; (define tm (make-transaction-manager server-node comms))
; (tm-register-rpc tm 'add1 add1)
;
; (comms-listen comms 1337)
;
; (log-info "listening")
; (sleep 9999)
;
; (tm-shutdown tm)
; (comms-shutdown comms)]
; ["client"
; (define comms (make-comms client-node))
; (comms-set-node-info comms server-node)
; (define tm (make-transaction-manager client-node comms))
;
; (log-info "transacting...")
; (log-info "transaction: ~a" (tm-transact tm 0 'add1 (list 1)))
; (log-info "done")
;
; (tm-shutdown tm)
; (comms-shutdown comms)])

View File

@ -22,13 +22,13 @@ create table task_log(taskid integer not null, worker integer not null,
time_wall_start timestamp not null,
duration integer not null,
pattern blob not null,
foreign key (taskid) references tasks(id) on delete cascade,
foreign key (taskid) references task(id) on delete cascade,
foreign key (worker) references node(id) on delete restrict);
-- }
-- @up {
create table task_match(taskid integer not null, worker integer not null,
time_wall timestamp not null, match blob not null,
foreign key (taskid) references tasks(id) on delete cascade,
foreign key (taskid) references task(id) on delete cascade,
foreign key (worker) references node(id) on delete restrict);
-- }

View File

@ -17,11 +17,11 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/bool racket/contract racket/fasl racket/file racket/list racket/logging
racket/match racket/path racket/random racket/runtime-path racket/set racket/string
racket/unit srfi/19
data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file
racket/list racket/logging racket/match racket/path racket/random racket/runtime-path
racket/set racket/string racket/unit srfi/19
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
"comms.rkt" "info.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync
(submod "static-support.rkt" misc-calls))
@ -205,6 +205,20 @@
(query-exec (current-db) q-add-node-res id res))
(values id public))))
(define (load-comms-node id [with-secret? #f] [with-arch? #f])
(match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch type secret)
(define node-val
(node id name (string->symbol type) (crypto-sign-public-key secret)
(and with-secret? secret) #f #f))
(if with-arch?
(values node-val arch)
node-val)]
[_ (error "invalid node id" id)]))
(define (restore-comms-node id)
(comms-set-node-info (current-comms) (load-comms-node id)))
(define (configure-agent-binary agent-node agent-arch server-node)
(define binary
(file->bytes
@ -224,7 +238,7 @@
;; manifest is the raw form
(define (make-task manifest tar)
(define manifest-data (s-exp->fasl manifest))
(define name (second (assoc 'name manifest-data)))
(define name (second (assoc 'name manifest)))
(define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar)
id)
@ -252,7 +266,7 @@
(define-values [id public] (make-node name arch 'agent resources))
(define comms-node (node id name 'agent public #f #f #f))
(comms-set-node-info (current-comms) comms-node)
(agent-handler-new-agent id resources)
(agent-handler-new-agent id arch resources)
id)
(define/contract (edit-agent id name resources)
@ -267,21 +281,20 @@
(query-exec (current-db) q-add-node-res id res))
(for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res))))
(define comms-node (comms-get-node-info (current-comms) id))
(comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))
(define-values [comms-node arch] (load-comms-node id #f #t))
(comms-set-node-info (current-comms) comms-node)
(agent-handler-delete-agent id)
(agent-handler-new-agent id resources)
(agent-handler-new-agent id arch resources)
(void))
(define/contract (get-agent-deployment id)
(-> integer? bytes?)
;; TODO : streaming interface
(enforce-subject 'client)
(match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch "agent" secret)
(configure-agent-binary (node id name 'agent (crypto-sign-public-key secret) secret #f #f)
arch (current-server-public-node))]
[_ (error "invalid id or wrong node type")]))
(define-values [agent-node arch] (load-comms-node id #t #t))
(match (node-type agent-node)
['agent (configure-agent-binary agent-node arch (current-server-public-node))]
[_ (error "invalid node type")]))
(define/contract (delete-agent id)
(-> integer? void?)
@ -338,6 +351,7 @@
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
(define cust (make-custodian))
(define this-thread (current-thread))
(current-agent-handler this-thread)
;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; setup agent rpcs
@ -366,11 +380,14 @@
(integer-set-subtract pattern-range sub)))
(task-state id sema mf pattern-range agent-todo file-hash))
(define (task-has-work? ts)
(not (zero? (integer-set-count (task-state-work-pattern ts)))))
;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount)
(unless (positive? requested-amount) (error "requested amount must be positive"))
(call-with-semaphore (task-state-sema ts) (lambda ()
(when (hash-has-key? (task-state-agent-todo) agent-id)
(when (hash-has-key? (task-state-agent-todo ts) agent-id)
(error "agent already has work assigned"))
(define-values [assignment new-wp]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
@ -378,10 +395,13 @@
;; done! (maybe)
;; check other agents work
;; TODO : update completeness
[(zero? (integer-set-count assignment)) #f]
;; then deregister task with handle-stop-task
[(zero? (integer-set-count assignment))
(log-server-info "fully completed task: ~a" (task-state-id ts))
#f]
;; update tracking
[else
(hash-set! (task-state-agent-todo agent-id assignment))
(hash-set! (task-state-agent-todo ts) agent-id assignment)
(set-task-state-work-pattern! ts new-wp)
assignment]))))
@ -408,7 +428,7 @@
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (agent-thd id resources-in)
(define (agent-thd id arch resources-in msg-chan)
;; initialize to-node for rpcs
(current-to-node (comms-get-node-info (current-comms) id))
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
@ -427,13 +447,24 @@
(define current-tasks (make-hash))
;; assignments
(struct assignment [id taskid assign-data start-time-utc start-time-monotonic] #:transparent)
(struct assignment [id taskid start-time-utc start-time-monotonic] #:transparent)
;; active assignments, by assignment id
(define assigned-tasks (make-hash))
;; keep running?
(define run-agent-thd? #t)
(define (handle-cleanup)
;; unassign all tasks
(for ([(aid av) (in-hash assigned-tasks)])
(define taskid (assignment-taskid av))
(define ts (hash-ref current-tasks taskid #f))
(unless (false? ts)
(task-unassign! ts id)))
;; call agent rpc to cancel everything
(cancel-all-assignments))
;; helper to repeatedly invoke an agent rpc
(define (invoke/retry-forever proc)
(let init-loop ([retry-delay *min-retry-delay*])
@ -444,20 +475,30 @@
(* *retry-delay-ratio* retry-delay))))])
(proc))))
;; #t if a new assignment was added, otherwise #f
(define (create-assignment! ts)
(define requested-amount (hash-ref! task-size (task-state-id ts) *min-subtask-size*))
;; integer set of assignment data, or false
(define assign-data (task-assign! ts id requested-amount))
(define start-time-utc (current-seconds-utc))
(define start-time-monotonic (current-seconds-monotonic))
(define aid (make-assignment-id))
(define mf-parsed (task-state-manifest ts))
(define file-hash (task-state-file-hash ts))
(define mf-raw (serialize-manifest mf-parsed))
;; add to local tracking
(hash-set! assigned-tasks aid (assignment id (task-state-id ts) assign-data start-time-utc
start-time-monotonic))
;; send agent rpc
(invoke/retry-forever (lambda () (push-assignment aid mf-raw file-hash assign-data))))
;; TODO : handle false case better
;; maybe steal work from other agents in progress or something
(cond
[(false? assign-data) #f]
[else
(define start-time-utc (current-seconds-utc))
(define start-time-monotonic (current-seconds-monotonic))
(define aid (make-assignment-id))
(define mf-parsed (task-state-manifest ts))
(define file-hash (task-state-file-hash ts))
(define mf-raw (serialize-manifest mf-parsed))
;; add to local tracking
(hash-set! assigned-tasks aid (assignment aid (task-state-id ts) start-time-utc
start-time-monotonic))
;; send agent rpc
(invoke/retry-forever
(lambda () (push-assignment aid (task-state-id ts) mf-raw file-hash
(integer-set-contents assign-data))))
#t]))
(define (cancel-assignment! assignment)
;; tell the agent to cancel work, unassign the assignment
@ -473,11 +514,15 @@
(define (add-assignment-match! assignment success-input)
;; TODO : notify other things that a match occurred maybe?
;; set complete!!!
(log-server-info "agent handler ~a: match for task ~a: ~a" id (assignment-taskid assignment)
success-input)
(query-exec
(current-db) q-add-task-match
(assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input)))
(define (complete-assignment! assignment)
(log-server-info "agent handler ~a: completed assignment for task ~a" id
(assignment-taskid assignment))
(define end-time-monotonic (current-seconds-monotonic))
(define duration (- end-time-monotonic (assignment-start-time-monotonic assignment)))
(define ts (hash-ref current-tasks (assignment-taskid assignment) #f))
@ -488,14 +533,17 @@
;; update next task size request
(cond
[(< duration *optimal-completion-secs*)
(log-server-info "agent handler ~a: increasing task allocation" id)
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (* 2 v)) *min-subtask-size*)]
[(> duration (* 2 *optimal-completion-secs*))
(log-server-info "agent handler ~a: decreasing task allocation" id)
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (max *min-subtask-size* (/ 2 v))) *min-subtask-size*)]
[else (void)])))
(define (update-assignments!)
(log-server-info "agent handler ~a: updating assignments" id)
;; detect set of used resources
;; see set of current inactive tasks
;; if a task uses all free resources, assign it
@ -515,7 +563,7 @@
;; this should be good enough for all basic use cases
(define task-list
(shuffle
(for ([(tid ts) (in-hash current-tasks)]
(for/list ([(tid ts) (in-hash current-tasks)]
#:unless (set-member? assigned-taskids tid))
ts)))
@ -527,16 +575,23 @@
[(cons head tail)
(define manifest (task-state-manifest head))
(define needed-resources (list->set (manifest-data-ref manifest 'resources '())))
(if (subset? available-resources needed-resources)
(define needed-arch (manifest-data-ref manifest 'arch '("any")))
(define right-arch? (or (member "any" needed-arch) (member arch needed-arch)))
(if (and right-arch?
;; TODO : if there's no work, check if the task is complete or work can be
;; stolen from other agents
(task-has-work? head)
(subset? available-resources needed-resources))
(create-assignment! head)
(begin (select-task! tail) #t))]))
(select-task! tail))]))
(when (select-task! task-list)
(update-assignments!)))
(define (handle-thd-msg)
(match (thread-receive)
(define (handle-command data)
(match data
[(cons 'new-task ts)
(log-server-info "agent handler ~a got new task ~a" id (task-state-id ts))
(hash-set! current-tasks (task-state-id ts) ts)
(update-assignments!)]
[(cons 'cancel-task ts)
@ -555,17 +610,20 @@
[(cons #f _) (void)] ;; do nothing
;; current assignment incomplete
[(cons assignment-id 'incomplete) (void)] ;; also do nothing
;; current assignment complete, no success
;; current assignment complete
[(cons assignment-id 'complete)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (complete-assignment! av #f))]
;; complete, success
(unless (false? av) (complete-assignment! av))
(update-assignments!)]
;; got succeeding input
[(cons assignment-id success-input)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (add-assignment-match! av success-input))])]
['shutdown (set! run-agent-thd? #f)]))
(define (handle-assignment-timeout)
;; TODO : on timeout, work is returned to the assignment pool, but by this time other agent
;; handlers that could have picked it up might be sleeping
(define time (current-seconds-monotonic))
(define overdue-assignments
(filter (lambda (av)
@ -582,74 +640,90 @@
(log-server-warning "agent ~a timed out on task ~a" id taskid)
(cancel-assignment! overdue)))
;; cancel whatever the agent is currently working on, in case the server crashed and came back
;; up or something
;; we want agent state synchronized with what we think it should be
(invoke/retry-forever (lambda () (cancel-all-assignments)))
(with-handlers ([exn? (lambda (ex) (handle-cleanup) (raise ex))])
;; wait for agent node to become present
;; TODO : comms should have some sort notification mechanism for this
(let loop ()
;; handle events
(define time (current-seconds-monotonic))
(define thd-evt (thread-receive-evt))
(define nearest-timeout
(apply min (for/list ([(aid av) (in-hash assigned-tasks)])
(define st (assignment-start-time-monotonic av))
(max 0 (- *subtask-timeout* (- time st))))))
(match (sync/timeout nearest-timeout thd-evt)
[#f (handle-assignment-timeout)]
[(== thd-evt) (handle-thd-msg)])
(when run-agent-thd?
(loop)))
(log-server-info "agent handler ~a waiting for agent" id)
(let loop ()
(unless (comms-channel-available? (current-comms) id)
(sleep 10)
(loop)))
(log-server-info "agent handler ~a: agent online" id)
;; unassign all tasks
(for ([(aid av) (in-hash assigned-tasks)])
(define taskid (assignment-taskid av))
(define ts (hash-ref current-tasks taskid #f))
(unless (false? ts)
(task-unassign! ts id)))
;; cancel whatever the agent is currently working on, in case the server crashed and came back
;; up or something
;; we want agent state synchronized with what we think it should be
(invoke/retry-forever (lambda () (cancel-all-assignments)))
;; call agent rpc to cancel everything
(cancel-all-assignments)
(void))
(let loop ()
;; handle events
(define time (current-seconds-monotonic))
(define timeout-list
(for/list ([(aid av) (in-hash assigned-tasks)])
(define st (assignment-start-time-monotonic av))
(max 0 (- *subtask-timeout* (- time st)))))
(define nearest-timeout (if (empty? timeout-list) #f (apply min timeout-list)))
(match (sync/timeout nearest-timeout msg-chan)
[#f (handle-assignment-timeout)]
[data (handle-command data)])
(when run-agent-thd?
(loop)))
(handle-cleanup)
(void)))
;; id to task-state
(define current-tasks (make-hash))
;; hash of agents to handler thread
;; hash of agents to handler channel
(define agents (make-hash))
;; run handler
(define run-handler? #t)
;; first two thread messages send the comms and tm
(current-comms (thread-receive))
(current-tm (thread-receive))
(define (handle-stop-task task-id)
;; notify agents
(for ([(id agent-data) (in-hash agents)])
(async-channel-put (car agent-data) (cons 'cancel-task task-id)))
(hash-remove! current-tasks task-id))
(define (handle-thd-msg)
(define (handle-delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f)
;; TODO : wait for thread a bit, then kill it
;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to
;; this agent
(error "TODO: this function is half unimplemented lol")
(match-define (cons chan thd) (hash-ref agents id))
(async-channel-put chan 'shutdown)
;; give it some time, then break the thread
(sync/timeout 5 thd)
(break-thread thd)
(hash-remove! agents id))
(match (thread-receive)
[(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust])
;; TODO : add monitor thread that detects crashes and unassigns tasks
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'new-agent (cons id (cons arch resources)))
(define cmd-channel (make-async-channel))
(define thd
(parameterize ([current-custodian cust])
;; TODO : add monitor thread that detects crashes and unassigns tasks
(thread (lambda () (agent-thd id arch resources cmd-channel)))))
;; todo : notify agent of current tasks
(hash-set! agents id (cons cmd-channel thd))]
[(cons 'delete-agent id)
(handle-delete-agent id)]
[(cons 'new-task (cons id manifest))
(define ts (initialize-task id manifest))
(hash-set! current-tasks id ts)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'new-task ts) #f))]
(for ([(agent-id agent-data) (in-hash agents)])
(log-server-info "registering task ~a with agent ~a" id agent-id)
(async-channel-put (car agent-data) (cons 'new-task ts)))]
[(cons 'agent-report (cons agent-id (cons assignment-id state)))
(thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
(async-channel-put (car (hash-ref agents agent-id))
(cons 'agent-report (cons assignment-id state)))]
[(cons 'cancel-task task-id)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task task-id) #f))
(hash-remove! current-tasks task-id)]
(handle-stop-task task-id)]
['shutdown
(for ([(id _) (in-hash agents)])
(handle-delete-agent id))
@ -668,8 +742,12 @@
(define current-agent-handler (make-parameter #f))
(define (agent-handler-new-agent id resources [ah (current-agent-handler)])
(thread-send ah (cons 'new-agent (cons id resources))))
(define (init-agent-handler [ah (current-agent-handler)])
(thread-send ah (current-comms))
(thread-send ah (current-tm)))
(define (agent-handler-new-agent id arch resources [ah (current-agent-handler)])
(thread-send ah (cons 'new-agent (cons id (cons arch resources)))))
(define (agent-handler-delete-agent id [ah (current-agent-handler)])
(thread-send ah (cons 'delete-agent id)))
@ -681,7 +759,8 @@
(thread-send ah (cons 'cancel-task task-id)))
(define (agent-handler-shutdown [ah (current-agent-handler)])
(thread-send ah 'shutdown))
(thread-send ah 'shutdown)
(thread-wait ah))
;; agent rpcs
@ -704,7 +783,9 @@
(module+ main
(require racket/cmdline)
;; TODO : read cmdline
;; initialize server
(install-logging!)
(log-server-info "starting crossfire-server v~a" (#%info-lookup 'version))
(define server-config (call-with-input-file *server-config-path* read))
(unless (list? server-config)
@ -719,9 +800,6 @@
[(list _ (? pred data)) data]
[x (error (make-err-fmt x))]))
;; initialize server
(install-logging!)
(log-server-info "initializing server")
(current-db (open-server-db 'create))
(migrate-server-db)
@ -748,23 +826,48 @@
(current-server-public-node
(struct-copy node server [seckey #f] [host public-addr] [port public-port]))
;; TODO : read cmdline for admin commands
;; ideally allow the admin commands to coexist with an actual current running server
(match (current-command-line-arguments)
[(vector "dev-new-agent" name arch)
(define-values [id _] (make-node name arch 'agent '("cpu")))
(define agent-node (load-comms-node id #t))
(call-with-output-file (build-path *state-root* (string-append name ".rktd"))
(lambda (out) (write (list agent-node (current-server-public-node)) out)))
(log-server-info "created dev agent ~a" name)
(exit)]
[(vector "dev-new-project")
(define id (make-task '((name "test project") (arch "any")
(resources "cpu") (pattern "meow?d?d")) #"no file contents lol"))
(log-server-info "created project")
(exit)]
[(vector subcmd _ ...)
(error "invalid subcommand" subcmd)]
[argv (void)])
(current-agent-handler (make-agent-handler))
(current-comms (make-comms server))
(current-tm (make-transaction-manager server (current-comms)))
(current-agent-handler (make-agent-handler))
(rpc-register-all server^ server-impl@)
(init-agent-handler)
;; restore agents
(for ([agent (in-list (get-nodes 'agent))])
(agent-handler-new-agent (node-info-id agent) (node-info-resources agent)))
(log-server-info "restoring agent ~a" (node-info-id agent))
(restore-comms-node (node-info-id agent))
(agent-handler-new-agent (node-info-id agent) (node-info-arch agent)
(node-info-resources agent)))
;; restore active tasks
(for ([(id name manifest-in complete?) (in-query (current-db) q-get-tasks)]
#:unless complete?)
(define manifest (parse-manifest (fasl->s-exp manifest)))
#:when (zero? complete?))
(log-server-info "restoring project ~a" id)
(define manifest (parse-manifest (fasl->s-exp manifest-in)))
(agent-handler-new-task id manifest))
;; start listening
(comms-listen (current-comms) 1337)
(comms-listen (current-comms) (node-port server))
(log-server-info "server running")