diff --git a/crossfire/server.rkt b/crossfire/server.rkt index 862c7f4..8df018c 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -30,6 +30,8 @@ ;; configuration +(define *server-node-id* 0) + (define *production?* #f) (define *config-root* (if *production?* "/etc/" "etc/")) @@ -100,6 +102,7 @@ (define-stmt q-set-task-commit "update task set committed=1 where id=?") (define-stmt q-delete-task "delete from task where id=?") (define-stmt q-get-tasks "select id, name, manifest, complete from task") +(define-stmt q-set-task-complete "update task set complete=1 where id=?") (define-stmt q-get-task-log "select worker, time_wall_start, duration, pattern from task_log where taskid=?") @@ -244,7 +247,9 @@ id) (define (enforce-subject type) - (unless (symbol=? type (node-type (current-from-node))) + ;; override if the from-node is us + (unless (or (symbol=? type (node-type (current-from-node))) + (= (node-id (current-from-node)) *server-node-id*)) (error "unauthorized"))) (define (enforce-object id type) @@ -365,7 +370,9 @@ ;; agent-todo: hash of agent id to integer-set representing work the agent is working on ;; file-hash: the hash to send to agents to identify the contents of the project file more ;; precisely than just the taskid, which allows them to cache the file locally - (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash] #:transparent) + ;; completed-work: an integer set of completed work + (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash + [completed-work #:mutable]] #:transparent) (define (initialize-task id mf) (define file-hash (server-hash-file id)) @@ -373,15 +380,23 @@ (define agent-todo (make-hash)) (define init-pattern-range (range->integer-set 0 (manifest-psize mf))) ;; subtract the pattern ranges that were already logged as complete - (define pattern-range - (for/fold ([pattern-range init-pattern-range]) + (define-values [pattern-range completed-work] + (for/fold ([pattern-range init-pattern-range] + [completed-work (make-integer-set '())]) ([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)]) (define sub (make-integer-set (fasl->s-exp pat-fasl))) - (integer-set-subtract pattern-range sub))) - (task-state id sema mf pattern-range agent-todo file-hash)) + (values (integer-set-subtract pattern-range sub) (integer-set-union completed-work sub)))) + (task-state id sema mf pattern-range agent-todo file-hash completed-work)) (define (task-has-work? ts) - (not (zero? (integer-set-count (task-state-work-pattern ts))))) + (not (and (zero? (integer-set-count (task-state-work-pattern ts))) + (hash-empty? (task-state-agent-todo ts))))) + + (define (task-set-complete! ts) + (log-server-info "fully completed task: ~a" (task-state-id ts)) + (query-exec (current-db) q-set-task-complete (task-state-id ts)) + ;; TODO : notification mechanism + (handle-stop-task (task-state-id ts))) ;; this doesn't update the database - that only gets updated when the work is complete (define (task-assign! ts agent-id requested-amount) @@ -393,12 +408,27 @@ (pattern-range-take (task-state-work-pattern ts) requested-amount)) (cond ;; done! (maybe) - ;; check other agents work - ;; TODO : update completeness - ;; then deregister task with handle-stop-task [(zero? (integer-set-count assignment)) - (log-server-info "fully completed task: ~a" (task-state-id ts)) - #f] + (define at (task-state-agent-todo ts)) + (cond + [(hash-empty? at) ;; actually done. cancel all in-progress assignments and celebrate uwu + ;; are we going to hold up literally everything because we're still holding this + ;;semaphore during a database write? + ;; probably + ;; does it actually matter? + ;; probably not + (task-set-complete! ts) + #f] + [else ;; steal work lol + ;; this will massively overcommit the last few parts of a project and potentially + ;; prioritize doing useless duplicate work instead of moving on to the next project + ;; but it'll be fiiiiiine don't worry + (define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)])) + (integer-set-union iset v)) + (define-values [assignment _] + (pattern-range-take (task-state-work-pattern ts) requested-amount)) + (hash-set! at agent-id assignment) + assignment])] ;; update tracking [else (hash-set! (task-state-agent-todo ts) agent-id assignment) @@ -412,7 +442,8 @@ [#f (void)] [assignment (define new-wp (integer-set-union assignment (task-state-work-pattern ts))) - (set-task-state-work-pattern! ts new-wp) + (set-task-state-work-pattern! + ts (integer-set-subtract new-wp (task-state-completed-work ts))) (hash-remove! (task-state-agent-todo ts) agent-id)])))) ;; adds to task log, then updates work pool with task completion @@ -425,8 +456,14 @@ (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration (s-exp->fasl (integer-set-contents assignment))) + (define new-completed (integer-set-union (task-state-completed-work ts) assignment)) + (set-task-state-completed-work! new-completed) ;; remove tracking - this work is now done - (hash-remove! (task-state-agent-todo ts) agent-id)])))) + (hash-remove! (task-state-agent-todo ts) agent-id) + ;; check if we're fully complete. if so, mark the task complete in the database and cancel + ;; all related assignments + (unless (task-has-work? ts) + (task-set-complete! ts))])))) (define (agent-thd id arch resources-in msg-chan) ;; initialize to-node for rpcs @@ -480,8 +517,6 @@ (define requested-amount (hash-ref! task-size (task-state-id ts) *min-subtask-size*)) ;; integer set of assignment data, or false (define assign-data (task-assign! ts id requested-amount)) - ;; TODO : handle false case better - ;; maybe steal work from other agents in progress or something (cond [(false? assign-data) #f] [else @@ -578,8 +613,6 @@ (define needed-arch (manifest-data-ref manifest 'arch '("any"))) (define right-arch? (or (member "any" needed-arch) (member arch needed-arch))) (if (and right-arch? - ;; TODO : if there's no work, check if the task is complete or work can be - ;; stolen from other agents (task-has-work? head) (subset? available-resources needed-resources)) (create-assignment! head) @@ -622,8 +655,10 @@ ['shutdown (set! run-agent-thd? #f)])) (define (handle-assignment-timeout) - ;; TODO : on timeout, work is returned to the assignment pool, but by this time other agent - ;; handlers that could have picked it up might be sleeping + ;; on timeout, work is returned to the assignment pool and other agents may not be actually + ;; notified of this. but because of work stealing they should have already attempting to + ;; steal the work so there shouldn't actually be a situation where an agent thread is asleep + ;; when work is returned to the pool (define time (current-seconds-monotonic)) (define overdue-assignments (filter (lambda (av) @@ -783,7 +818,8 @@ (module+ main (require racket/cmdline) - ;; initialize server + + ;; basic server initialization (install-logging!) (log-server-info "starting crossfire-server v~a" (#%info-lookup 'version)) @@ -813,8 +849,12 @@ (define seckey (file->bytes *server-seckey-path*)) (define pubkey (crypto-sign-public-key seckey)) - (define server (node 0 (config-get 'name string?) 'server pubkey seckey - (config-get 'listen-addr string?) (config-get 'listen-port integer?))) + (define listen-addr + (match (config-get 'listen-addr (or/c 'auto string?)) + ['auto "0.0.0.0"] + [addr addr])) + (define server (node *server-node-id* (config-get 'name string?) 'server pubkey seckey + listen-addr (config-get 'listen-port integer?))) (define public-addr (match (config-get 'public-addr (or/c 'auto string?)) ['auto (error "TODO auto public-addr unimplemented")] @@ -826,6 +866,8 @@ (current-server-public-node (struct-copy node server [seckey #f] [host public-addr] [port public-port])) + + ;; read command line ;; TODO : read cmdline for admin commands ;; ideally allow the admin commands to coexist with an actual current running server @@ -846,6 +888,9 @@ (error "invalid subcommand" subcmd)] [argv (void)]) + + ;; start server and start doing stuff + (current-agent-handler (make-agent-handler)) (current-comms (make-comms server)) (current-tm (make-transaction-manager server (current-comms))) @@ -866,7 +911,7 @@ (define manifest (parse-manifest (fasl->s-exp manifest-in))) (agent-handler-new-task id manifest)) - ;; start listening + ;; now server is ready to start listening (comms-listen (current-comms) (node-port server)) (log-server-info "server running") @@ -876,6 +921,7 @@ (sync never-evt)) ;; shutdown + ;; a second break aborts clean shutdown. ideally, don't break again unless necessary (log-server-info "stopping server") (agent-handler-shutdown) (tm-shutdown (current-tm)) diff --git a/etc/crossfire.rktd b/etc/crossfire.rktd index ec788f6..a9e646a 100644 --- a/etc/crossfire.rktd +++ b/etc/crossfire.rktd @@ -1,6 +1,7 @@ (;; name of this server (name "a crossfire server") ;; the ip address and port to listen on + ;; auto is "0.0.0.0" (listen-addr "0.0.0.0") (listen-port 25446) ;; the "public" ip (or domain name) and port of this node