fix a few concurrency bugs with server and agents
This commit is contained in:
parent
790945b828
commit
8ec0861907
|
@ -138,6 +138,10 @@
|
|||
(custodian-shutdown-all cust))
|
||||
|
||||
(define (execute-assignment assignment extract-dir)
|
||||
;; TODO : on cancel-assignment, actually kill the process and stuff
|
||||
;; TODO : do local verification of resource usage. if the server starts an assignment that uses
|
||||
;; resource A and we're already using resource A, kill the old assignment
|
||||
|
||||
(define aid (assignment-id assignment))
|
||||
(log-agent-info "starting execution of ~a in ~a" aid extract-dir)
|
||||
|
||||
|
|
|
@ -435,8 +435,14 @@
|
|||
;; done! (maybe)
|
||||
[(zero? (integer-set-count assignment))
|
||||
(define at (task-state-agent-todo ts))
|
||||
(define real-in-progress
|
||||
(integer-set-subtract
|
||||
(for/fold ([wp (make-integer-set '())]) ([(_ v) (in-hash at)])
|
||||
(integer-set-union wp v))
|
||||
(task-state-completed-work ts)))
|
||||
(cond
|
||||
[(hash-empty? at) ;; actually done. cancel all in-progress assignments and celebrate uwu
|
||||
[(zero? (integer-set-count real-in-progress))
|
||||
;; actually done. cancel all in-progress assignments and celebrate uwu
|
||||
;; are we going to hold up literally everything because we're still holding this
|
||||
;;semaphore during a database write?
|
||||
;; probably
|
||||
|
@ -448,14 +454,15 @@
|
|||
;; this will massively overcommit the last few parts of a project and potentially
|
||||
;; prioritize doing useless duplicate work instead of moving on to the next project
|
||||
;; but it'll be fiiiiiine don't worry
|
||||
(define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)])
|
||||
(integer-set-union iset v)))
|
||||
(define-values [assignment _]
|
||||
(pattern-range-take (task-state-work-pattern ts) requested-amount))
|
||||
(define-values [assignment _] (pattern-range-take real-in-progress requested-amount))
|
||||
(log-server-info "assigning ~a (stolen assignment): ~a"
|
||||
agent-id (integer-set-contents assignment))
|
||||
(hash-set! at agent-id assignment)
|
||||
assignment])]
|
||||
;; update tracking
|
||||
[else
|
||||
(log-server-info "assigning ~a (normal assignment): ~a"
|
||||
agent-id (integer-set-contents assignment))
|
||||
(hash-set! (task-state-agent-todo ts) agent-id assignment)
|
||||
(set-task-state-work-pattern! ts new-wp)
|
||||
assignment]))))
|
||||
|
@ -491,6 +498,13 @@
|
|||
(task-set-complete! ts))]))))
|
||||
|
||||
(define (agent-thd id arch resources-in msg-chan)
|
||||
;; TODO : for better reliability, the RPCs to the agent should really be on a separate thread
|
||||
;; it should access a message queue that this handler thread adds and removes items for
|
||||
;; perhaps the entire comms subsystem should be restructured with some sort of message queue
|
||||
;; system
|
||||
;; TODO whenever comms gets actual real time notifications of nodes going online or offline it
|
||||
;; needs to be hooked into update-assignments! here
|
||||
|
||||
;; initialize to-node for rpcs
|
||||
(current-to-node (comms-get-node-info (current-comms) id))
|
||||
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
|
||||
|
@ -559,9 +573,20 @@
|
|||
(hash-set! assigned-tasks aid (assignment aid (task-state-id ts) start-time-utc
|
||||
start-time-monotonic))
|
||||
;; send agent rpc
|
||||
(invoke/retry-forever
|
||||
(lambda () (push-assignment aid (task-state-id ts) mf-raw file-hash
|
||||
(integer-set-contents assign-data))))
|
||||
(define (handle-failure ex)
|
||||
(log-server-error "failed to push ~a to agent ~a" aid id)
|
||||
((error-display-handler) (exn-message ex) ex)
|
||||
(task-unassign! ts id)
|
||||
;; attempt to cancel it
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (ex)
|
||||
(log-server-error "failed to cancel ~a after error on ~a" aid id)
|
||||
((error-display-handler) (exn-message ex) ex))])
|
||||
(cancel-assignment aid))
|
||||
#t)
|
||||
(with-handlers ([exn:fail? handle-failure])
|
||||
(push-assignment aid (task-state-id ts) mf-raw file-hash
|
||||
(integer-set-contents assign-data)))
|
||||
#t]))
|
||||
|
||||
(define (cancel-assignment! assignment)
|
||||
|
@ -572,7 +597,12 @@
|
|||
(task-unassign! ts id))
|
||||
(hash-remove! assigned-tasks (assignment-id assignment))
|
||||
;; send agent rpc
|
||||
(invoke/retry-forever (lambda () (cancel-assignment (assignment-id assignment)))))
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (ex)
|
||||
(log-server-error "failed to cancel ~a on ~a"
|
||||
(assignment-id assignment) id)
|
||||
((error-display-handler) (exn-message ex) ex))])
|
||||
(cancel-assignment (assignment-id assignment))))
|
||||
|
||||
;; records a match for a certain assignment
|
||||
(define (add-assignment-match! assignment success-input)
|
||||
|
|
Loading…
Reference in New Issue