improve robustness of task distribution
This commit is contained in:
parent
647b3fe443
commit
d4482794ab
|
@ -30,6 +30,8 @@
|
|||
|
||||
;; configuration
|
||||
|
||||
(define *server-node-id* 0)
|
||||
|
||||
(define *production?* #f)
|
||||
|
||||
(define *config-root* (if *production?* "/etc/" "etc/"))
|
||||
|
@ -100,6 +102,7 @@
|
|||
(define-stmt q-set-task-commit "update task set committed=1 where id=?")
|
||||
(define-stmt q-delete-task "delete from task where id=?")
|
||||
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
|
||||
(define-stmt q-set-task-complete "update task set complete=1 where id=?")
|
||||
|
||||
(define-stmt q-get-task-log
|
||||
"select worker, time_wall_start, duration, pattern from task_log where taskid=?")
|
||||
|
@ -244,7 +247,9 @@
|
|||
id)
|
||||
|
||||
(define (enforce-subject type)
|
||||
(unless (symbol=? type (node-type (current-from-node)))
|
||||
;; override if the from-node is us
|
||||
(unless (or (symbol=? type (node-type (current-from-node)))
|
||||
(= (node-id (current-from-node)) *server-node-id*))
|
||||
(error "unauthorized")))
|
||||
|
||||
(define (enforce-object id type)
|
||||
|
@ -365,7 +370,9 @@
|
|||
;; agent-todo: hash of agent id to integer-set representing work the agent is working on
|
||||
;; file-hash: the hash to send to agents to identify the contents of the project file more
|
||||
;; precisely than just the taskid, which allows them to cache the file locally
|
||||
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash] #:transparent)
|
||||
;; completed-work: an integer set of completed work
|
||||
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash
|
||||
[completed-work #:mutable]] #:transparent)
|
||||
|
||||
(define (initialize-task id mf)
|
||||
(define file-hash (server-hash-file id))
|
||||
|
@ -373,15 +380,23 @@
|
|||
(define agent-todo (make-hash))
|
||||
(define init-pattern-range (range->integer-set 0 (manifest-psize mf)))
|
||||
;; subtract the pattern ranges that were already logged as complete
|
||||
(define pattern-range
|
||||
(for/fold ([pattern-range init-pattern-range])
|
||||
(define-values [pattern-range completed-work]
|
||||
(for/fold ([pattern-range init-pattern-range]
|
||||
[completed-work (make-integer-set '())])
|
||||
([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)])
|
||||
(define sub (make-integer-set (fasl->s-exp pat-fasl)))
|
||||
(integer-set-subtract pattern-range sub)))
|
||||
(task-state id sema mf pattern-range agent-todo file-hash))
|
||||
(values (integer-set-subtract pattern-range sub) (integer-set-union completed-work sub))))
|
||||
(task-state id sema mf pattern-range agent-todo file-hash completed-work))
|
||||
|
||||
(define (task-has-work? ts)
|
||||
(not (zero? (integer-set-count (task-state-work-pattern ts)))))
|
||||
(not (and (zero? (integer-set-count (task-state-work-pattern ts)))
|
||||
(hash-empty? (task-state-agent-todo ts)))))
|
||||
|
||||
(define (task-set-complete! ts)
|
||||
(log-server-info "fully completed task: ~a" (task-state-id ts))
|
||||
(query-exec (current-db) q-set-task-complete (task-state-id ts))
|
||||
;; TODO : notification mechanism
|
||||
(handle-stop-task (task-state-id ts)))
|
||||
|
||||
;; this doesn't update the database - that only gets updated when the work is complete
|
||||
(define (task-assign! ts agent-id requested-amount)
|
||||
|
@ -393,12 +408,27 @@
|
|||
(pattern-range-take (task-state-work-pattern ts) requested-amount))
|
||||
(cond
|
||||
;; done! (maybe)
|
||||
;; check other agents work
|
||||
;; TODO : update completeness
|
||||
;; then deregister task with handle-stop-task
|
||||
[(zero? (integer-set-count assignment))
|
||||
(log-server-info "fully completed task: ~a" (task-state-id ts))
|
||||
#f]
|
||||
(define at (task-state-agent-todo ts))
|
||||
(cond
|
||||
[(hash-empty? at) ;; actually done. cancel all in-progress assignments and celebrate uwu
|
||||
;; are we going to hold up literally everything because we're still holding this
|
||||
;;semaphore during a database write?
|
||||
;; probably
|
||||
;; does it actually matter?
|
||||
;; probably not
|
||||
(task-set-complete! ts)
|
||||
#f]
|
||||
[else ;; steal work lol
|
||||
;; this will massively overcommit the last few parts of a project and potentially
|
||||
;; prioritize doing useless duplicate work instead of moving on to the next project
|
||||
;; but it'll be fiiiiiine don't worry
|
||||
(define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)]))
|
||||
(integer-set-union iset v))
|
||||
(define-values [assignment _]
|
||||
(pattern-range-take (task-state-work-pattern ts) requested-amount))
|
||||
(hash-set! at agent-id assignment)
|
||||
assignment])]
|
||||
;; update tracking
|
||||
[else
|
||||
(hash-set! (task-state-agent-todo ts) agent-id assignment)
|
||||
|
@ -412,7 +442,8 @@
|
|||
[#f (void)]
|
||||
[assignment
|
||||
(define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
|
||||
(set-task-state-work-pattern! ts new-wp)
|
||||
(set-task-state-work-pattern!
|
||||
ts (integer-set-subtract new-wp (task-state-completed-work ts)))
|
||||
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
|
||||
|
||||
;; adds to task log, then updates work pool with task completion
|
||||
|
@ -425,8 +456,14 @@
|
|||
(query-exec
|
||||
(current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration
|
||||
(s-exp->fasl (integer-set-contents assignment)))
|
||||
(define new-completed (integer-set-union (task-state-completed-work ts) assignment))
|
||||
(set-task-state-completed-work! new-completed)
|
||||
;; remove tracking - this work is now done
|
||||
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
|
||||
(hash-remove! (task-state-agent-todo ts) agent-id)
|
||||
;; check if we're fully complete. if so, mark the task complete in the database and cancel
|
||||
;; all related assignments
|
||||
(unless (task-has-work? ts)
|
||||
(task-set-complete! ts))]))))
|
||||
|
||||
(define (agent-thd id arch resources-in msg-chan)
|
||||
;; initialize to-node for rpcs
|
||||
|
@ -480,8 +517,6 @@
|
|||
(define requested-amount (hash-ref! task-size (task-state-id ts) *min-subtask-size*))
|
||||
;; integer set of assignment data, or false
|
||||
(define assign-data (task-assign! ts id requested-amount))
|
||||
;; TODO : handle false case better
|
||||
;; maybe steal work from other agents in progress or something
|
||||
(cond
|
||||
[(false? assign-data) #f]
|
||||
[else
|
||||
|
@ -578,8 +613,6 @@
|
|||
(define needed-arch (manifest-data-ref manifest 'arch '("any")))
|
||||
(define right-arch? (or (member "any" needed-arch) (member arch needed-arch)))
|
||||
(if (and right-arch?
|
||||
;; TODO : if there's no work, check if the task is complete or work can be
|
||||
;; stolen from other agents
|
||||
(task-has-work? head)
|
||||
(subset? available-resources needed-resources))
|
||||
(create-assignment! head)
|
||||
|
@ -622,8 +655,10 @@
|
|||
['shutdown (set! run-agent-thd? #f)]))
|
||||
|
||||
(define (handle-assignment-timeout)
|
||||
;; TODO : on timeout, work is returned to the assignment pool, but by this time other agent
|
||||
;; handlers that could have picked it up might be sleeping
|
||||
;; on timeout, work is returned to the assignment pool and other agents may not be actually
|
||||
;; notified of this. but because of work stealing they should have already attempting to
|
||||
;; steal the work so there shouldn't actually be a situation where an agent thread is asleep
|
||||
;; when work is returned to the pool
|
||||
(define time (current-seconds-monotonic))
|
||||
(define overdue-assignments
|
||||
(filter (lambda (av)
|
||||
|
@ -783,7 +818,8 @@
|
|||
(module+ main
|
||||
(require racket/cmdline)
|
||||
|
||||
;; initialize server
|
||||
|
||||
;; basic server initialization
|
||||
(install-logging!)
|
||||
(log-server-info "starting crossfire-server v~a" (#%info-lookup 'version))
|
||||
|
||||
|
@ -813,8 +849,12 @@
|
|||
|
||||
(define seckey (file->bytes *server-seckey-path*))
|
||||
(define pubkey (crypto-sign-public-key seckey))
|
||||
(define server (node 0 (config-get 'name string?) 'server pubkey seckey
|
||||
(config-get 'listen-addr string?) (config-get 'listen-port integer?)))
|
||||
(define listen-addr
|
||||
(match (config-get 'listen-addr (or/c 'auto string?))
|
||||
['auto "0.0.0.0"]
|
||||
[addr addr]))
|
||||
(define server (node *server-node-id* (config-get 'name string?) 'server pubkey seckey
|
||||
listen-addr (config-get 'listen-port integer?)))
|
||||
(define public-addr
|
||||
(match (config-get 'public-addr (or/c 'auto string?))
|
||||
['auto (error "TODO auto public-addr unimplemented")]
|
||||
|
@ -826,6 +866,8 @@
|
|||
(current-server-public-node
|
||||
(struct-copy node server [seckey #f] [host public-addr] [port public-port]))
|
||||
|
||||
|
||||
;; read command line
|
||||
;; TODO : read cmdline for admin commands
|
||||
;; ideally allow the admin commands to coexist with an actual current running server
|
||||
|
||||
|
@ -846,6 +888,9 @@
|
|||
(error "invalid subcommand" subcmd)]
|
||||
[argv (void)])
|
||||
|
||||
|
||||
;; start server and start doing stuff
|
||||
|
||||
(current-agent-handler (make-agent-handler))
|
||||
(current-comms (make-comms server))
|
||||
(current-tm (make-transaction-manager server (current-comms)))
|
||||
|
@ -866,7 +911,7 @@
|
|||
(define manifest (parse-manifest (fasl->s-exp manifest-in)))
|
||||
(agent-handler-new-task id manifest))
|
||||
|
||||
;; start listening
|
||||
;; now server is ready to start listening
|
||||
(comms-listen (current-comms) (node-port server))
|
||||
|
||||
(log-server-info "server running")
|
||||
|
@ -876,6 +921,7 @@
|
|||
(sync never-evt))
|
||||
|
||||
;; shutdown
|
||||
;; a second break aborts clean shutdown. ideally, don't break again unless necessary
|
||||
(log-server-info "stopping server")
|
||||
(agent-handler-shutdown)
|
||||
(tm-shutdown (current-tm))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
(;; name of this server
|
||||
(name "a crossfire server")
|
||||
;; the ip address and port to listen on
|
||||
;; auto is "0.0.0.0"
|
||||
(listen-addr "0.0.0.0")
|
||||
(listen-port 25446)
|
||||
;; the "public" ip (or domain name) and port of this node
|
||||
|
|
Loading…
Reference in New Issue