diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 63aaff6..f51514e 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -350,7 +350,7 @@ (comms-dispatch-msg comms to-id msg))) (provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info - comms-delete-node comms-channel-available?) + comms-delete-node comms-channel-available? comms-shutdown) ;; transactional messages support @@ -541,8 +541,24 @@ (lambda (stx) (rpc-wrapper-unit-helper (second (syntax-e stx))))) +;; same thing but for registering impls +(define-for-syntax (rpc-register-all-helper sig-name unit-def) + (define-values [parent members vars stxs] (signature-members sig-name sig-name)) + (define code-out + #`((lambda () + (define-values/invoke-unit #,unit-def (import) (export #,sig-name)) + #,@(for/list ([mem (in-list members)]) + #`(tm-register-rpc (current-tm) (quote #,mem) #,mem))))) + code-out) + +;; register everything from a signature and unit +(define-syntax rpc-register-all + (lambda (stx) + (define parts (syntax-e stx)) + (rpc-register-all-helper (second parts) (third parts)))) + (provide current-comms current-tm current-to-node current-from-node - make-rpc-wrapper-unit) + make-rpc-wrapper-unit rpc-register-all) ; ;; demo code ; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") diff --git a/crossfire/server.rkt b/crossfire/server.rkt index 551cee7..93c023f 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -286,10 +286,10 @@ ;; TODO : streaming interface (enforce-subject 'client) ;; check validity - (parse-manifest manifest) + (define mf-parsed (parse-manifest manifest)) (define id (make-task manifest tar)) ;; notify agent handler - (agent-handler-new-task id manifest) + (agent-handler-new-task id mf-parsed) id) (define (get-projects) @@ -363,6 +363,7 @@ (cond ;; done! (maybe) ;; check other agents work + ;; TODO : update completeness [(zero? (integer-set-count assignment)) #f] ;; update tracking [else @@ -457,6 +458,7 @@ ;; records a match for a certain assignment (define (add-assignment-match! assignment success-input) ;; TODO : notify other things that a match occurred maybe? + ;; set complete!!! (query-exec (current-db) q-add-task-match (assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input))) @@ -602,19 +604,25 @@ ;; hash of agents to handler thread (define agents (make-hash)) + ;; run handler + (define run-handler? #t) + (define (handle-thd-msg) + (define (handle-delete-agent id) + (thread-send (hash-ref agents id) 'shutdown #f) + ;; TODO : wait for thread a bit, then kill it + ;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to + ;; this agent + (error "TODO: this function is half unimplemented lol") + (hash-remove! agents id)) + (match (thread-receive) [(cons 'new-agent (cons id resources)) (parameterize ([current-custodian cust]) ;; TODO : add monitor thread that detects crashes and unassigns tasks (hash-set! agents id (thread (lambda () (agent-thd id resources)))))] [(cons 'delete-agent id) - (thread-send (hash-ref agents id) 'shutdown #f) - ;; TODO : wait for thread a bit, then kill it - ;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to - ;; this agent - (error "TODO: this function is half unimplemented lol") - (hash-remove! agents id)] + (handle-delete-agent id)] [(cons 'new-task (cons id manifest)) (define ts (initialize-task id manifest)) (hash-set! current-tasks id ts) @@ -628,15 +636,20 @@ (for ([(id thd) (in-hash agents)]) (thread-send thd (cons 'cancel-task task-id) #f)) (hash-remove! current-tasks task-id)] + ['shutdown + (for ([(id _) (in-hash agents)]) + (handle-delete-agent id)) + (set! run-handler? #f)] [_ (error "unknown agent handler message")])) (let loop () (define thd-evt (thread-receive-evt)) (match (sync thd-evt) [(== thd-evt) (handle-thd-msg)]) - (loop))) + (when run-handler? (loop)))) (define (make-agent-handler) + ;; TODO : monitor this thread because dying is probably fatal (thread agent-handler)) (define current-agent-handler (make-parameter #f)) @@ -653,6 +666,9 @@ (define (agent-handler-cancel-task task-id [ah (current-agent-handler)]) (thread-send ah (cons 'cancel-task task-id))) +(define (agent-handler-shutdown [ah (current-agent-handler)]) + (thread-send ah 'shutdown)) + ;; agent rpcs ;; report state 'incomplete 'complete or a list of integer representing a success result @@ -671,23 +687,46 @@ (define-unit-from-context server-impl@ server^) ;; command line usage -; (module+ main -; (require racket/cmdline) -; (current-db (open-server-db 'create)) -; (migrate-server-db) -; -; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) -; ((rpc-impl server get-projects))) -; -; ; (make-task "meow-task" '((meow . 10)) #"this is some extra data") -; ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f) -; ; "aarch64-unknown-linux-gnu" -; ; (node 0 "server" 'server #f #f "meow.systems" 1337))) -; ; (with-output-to-file "/tmp/crossfire-agent.configured" -; ; (lambda () (write-bytes data))) -; ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive")) -; ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) -; ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive"))) -; ; (get-nodes 'agent) -; ; (get-nodes 'meow) -; ) +(module+ main + (require racket/cmdline) + + ;; TODO : read cmdline and config file + ;; TODO : real logging, replace all displayln + + ;; initialize server + (current-db (open-server-db 'create)) + (migrate-server-db) + ;; temp key + (define seckey (crypto-sign-make-key)) + (define pubkey (crypto-sign-public-key seckey)) + (define server (node 0 "server" 'server pubkey seckey "0.0.0.0" 1337)) + (current-comms (make-comms server)) + (current-tm (make-transaction-manager server (current-comms))) + (current-agent-handler (make-agent-handler)) + (rpc-register-all server^ server-impl@) + + ;; restore agents + (for ([agent (in-list (get-nodes 'agent))]) + (agent-handler-new-agent (node-info-id agent) (node-info-resources agent))) + + ;; restore active tasks + (for ([(id name manifest-in complete?) (in-query (current-db) q-get-tasks)] + #:unless complete?) + (define manifest (parse-manifest (fasl->s-exp manifest))) + (agent-handler-new-task id manifest)) + + ;; start listening + (comms-listen (current-comms) 1337) + + (displayln "server running") + ;; wait for break + (with-handlers ([exn? (lambda (ex) (displayln (format "encountered exception: ~a" ex)))]) + (sync never-evt)) + + ;; shutdown + (displayln "stopping server") + (agent-handler-shutdown) + (tm-shutdown (current-tm)) + (comms-shutdown (current-comms)) + + (void))