implement basic server main

This commit is contained in:
xenia 2020-11-23 02:34:02 -05:00
parent 88f8ba6749
commit 3ede25222e
2 changed files with 86 additions and 31 deletions

View File

@ -350,7 +350,7 @@
(comms-dispatch-msg comms to-id msg))) (comms-dispatch-msg comms to-id msg)))
(provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info (provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info
comms-delete-node comms-channel-available?) comms-delete-node comms-channel-available? comms-shutdown)
;; transactional messages support ;; transactional messages support
@ -541,8 +541,24 @@
(lambda (stx) (lambda (stx)
(rpc-wrapper-unit-helper (second (syntax-e stx))))) (rpc-wrapper-unit-helper (second (syntax-e stx)))))
;; same thing but for registering impls
(define-for-syntax (rpc-register-all-helper sig-name unit-def)
(define-values [parent members vars stxs] (signature-members sig-name sig-name))
(define code-out
#`((lambda ()
(define-values/invoke-unit #,unit-def (import) (export #,sig-name))
#,@(for/list ([mem (in-list members)])
#`(tm-register-rpc (current-tm) (quote #,mem) #,mem)))))
code-out)
;; register everything from a signature and unit
(define-syntax rpc-register-all
(lambda (stx)
(define parts (syntax-e stx))
(rpc-register-all-helper (second parts) (third parts))))
(provide current-comms current-tm current-to-node current-from-node (provide current-comms current-tm current-to-node current-from-node
make-rpc-wrapper-unit) make-rpc-wrapper-unit rpc-register-all)
; ;; demo code ; ;; demo code
; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") ; (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")

View File

@ -286,10 +286,10 @@
;; TODO : streaming interface ;; TODO : streaming interface
(enforce-subject 'client) (enforce-subject 'client)
;; check validity ;; check validity
(parse-manifest manifest) (define mf-parsed (parse-manifest manifest))
(define id (make-task manifest tar)) (define id (make-task manifest tar))
;; notify agent handler ;; notify agent handler
(agent-handler-new-task id manifest) (agent-handler-new-task id mf-parsed)
id) id)
(define (get-projects) (define (get-projects)
@ -363,6 +363,7 @@
(cond (cond
;; done! (maybe) ;; done! (maybe)
;; check other agents work ;; check other agents work
;; TODO : update completeness
[(zero? (integer-set-count assignment)) #f] [(zero? (integer-set-count assignment)) #f]
;; update tracking ;; update tracking
[else [else
@ -457,6 +458,7 @@
;; records a match for a certain assignment ;; records a match for a certain assignment
(define (add-assignment-match! assignment success-input) (define (add-assignment-match! assignment success-input)
;; TODO : notify other things that a match occurred maybe? ;; TODO : notify other things that a match occurred maybe?
;; set complete!!!
(query-exec (query-exec
(current-db) q-add-task-match (current-db) q-add-task-match
(assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input))) (assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input)))
@ -602,19 +604,25 @@
;; hash of agents to handler thread ;; hash of agents to handler thread
(define agents (make-hash)) (define agents (make-hash))
;; run handler
(define run-handler? #t)
(define (handle-thd-msg) (define (handle-thd-msg)
(define (handle-delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f)
;; TODO : wait for thread a bit, then kill it
;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to
;; this agent
(error "TODO: this function is half unimplemented lol")
(hash-remove! agents id))
(match (thread-receive) (match (thread-receive)
[(cons 'new-agent (cons id resources)) [(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust]) (parameterize ([current-custodian cust])
;; TODO : add monitor thread that detects crashes and unassigns tasks ;; TODO : add monitor thread that detects crashes and unassigns tasks
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))] (hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'delete-agent id) [(cons 'delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f) (handle-delete-agent id)]
;; TODO : wait for thread a bit, then kill it
;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to
;; this agent
(error "TODO: this function is half unimplemented lol")
(hash-remove! agents id)]
[(cons 'new-task (cons id manifest)) [(cons 'new-task (cons id manifest))
(define ts (initialize-task id manifest)) (define ts (initialize-task id manifest))
(hash-set! current-tasks id ts) (hash-set! current-tasks id ts)
@ -628,15 +636,20 @@
(for ([(id thd) (in-hash agents)]) (for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task task-id) #f)) (thread-send thd (cons 'cancel-task task-id) #f))
(hash-remove! current-tasks task-id)] (hash-remove! current-tasks task-id)]
['shutdown
(for ([(id _) (in-hash agents)])
(handle-delete-agent id))
(set! run-handler? #f)]
[_ (error "unknown agent handler message")])) [_ (error "unknown agent handler message")]))
(let loop () (let loop ()
(define thd-evt (thread-receive-evt)) (define thd-evt (thread-receive-evt))
(match (sync thd-evt) (match (sync thd-evt)
[(== thd-evt) (handle-thd-msg)]) [(== thd-evt) (handle-thd-msg)])
(loop))) (when run-handler? (loop))))
(define (make-agent-handler) (define (make-agent-handler)
;; TODO : monitor this thread because dying is probably fatal
(thread agent-handler)) (thread agent-handler))
(define current-agent-handler (make-parameter #f)) (define current-agent-handler (make-parameter #f))
@ -653,6 +666,9 @@
(define (agent-handler-cancel-task task-id [ah (current-agent-handler)]) (define (agent-handler-cancel-task task-id [ah (current-agent-handler)])
(thread-send ah (cons 'cancel-task task-id))) (thread-send ah (cons 'cancel-task task-id)))
(define (agent-handler-shutdown [ah (current-agent-handler)])
(thread-send ah 'shutdown))
;; agent rpcs ;; agent rpcs
;; report state 'incomplete 'complete or a list of integer representing a success result ;; report state 'incomplete 'complete or a list of integer representing a success result
@ -671,23 +687,46 @@
(define-unit-from-context server-impl@ server^) (define-unit-from-context server-impl@ server^)
;; command line usage ;; command line usage
; (module+ main (module+ main
; (require racket/cmdline) (require racket/cmdline)
; (current-db (open-server-db 'create))
; (migrate-server-db) ;; TODO : read cmdline and config file
; ;; TODO : real logging, replace all displayln
; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)])
; ((rpc-impl server get-projects))) ;; initialize server
; (current-db (open-server-db 'create))
; ; (make-task "meow-task" '((meow . 10)) #"this is some extra data") (migrate-server-db)
; ; (define data (configure-agent-binary (node 10 "meow0" 'agent #f #f #f #f) ;; temp key
; ; "aarch64-unknown-linux-gnu" (define seckey (crypto-sign-make-key))
; ; (node 0 "server" 'server #f #f "meow.systems" 1337))) (define pubkey (crypto-sign-public-key seckey))
; ; (with-output-to-file "/tmp/crossfire-agent.configured" (define server (node 0 "server" 'server pubkey seckey "0.0.0.0" 1337))
; ; (lambda () (write-bytes data))) (current-comms (make-comms server))
; ; (make-node "agent0" "x86_64" 'agent '("gpu" "hifive")) (current-tm (make-transaction-manager server (current-comms)))
; ; (parameterize ([current-from-node (node 100 "meow" 'client #f #f #f #f)]) (current-agent-handler (make-agent-handler))
; ; ((rpc-impl server edit-agent) 1 "meow0" '("cpu" "hifive"))) (rpc-register-all server^ server-impl@)
; ; (get-nodes 'agent)
; ; (get-nodes 'meow) ;; restore agents
; ) (for ([agent (in-list (get-nodes 'agent))])
(agent-handler-new-agent (node-info-id agent) (node-info-resources agent)))
;; restore active tasks
(for ([(id name manifest-in complete?) (in-query (current-db) q-get-tasks)]
#:unless complete?)
(define manifest (parse-manifest (fasl->s-exp manifest)))
(agent-handler-new-task id manifest))
;; start listening
(comms-listen (current-comms) 1337)
(displayln "server running")
;; wait for break
(with-handlers ([exn? (lambda (ex) (displayln (format "encountered exception: ~a" ex)))])
(sync never-evt))
;; shutdown
(displayln "stopping server")
(agent-handler-shutdown)
(tm-shutdown (current-tm))
(comms-shutdown (current-comms))
(void))