From 8ec0861907452c3cd3ebf27984616ddbc4258cf8 Mon Sep 17 00:00:00 2001 From: haskal Date: Thu, 31 Dec 2020 03:42:25 -0500 Subject: [PATCH] fix a few concurrency bugs with server and agents --- crossfire/agent.rkt | 4 ++++ crossfire/server.rkt | 48 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/crossfire/agent.rkt b/crossfire/agent.rkt index e8a97b8..f461391 100644 --- a/crossfire/agent.rkt +++ b/crossfire/agent.rkt @@ -138,6 +138,10 @@ (custodian-shutdown-all cust)) (define (execute-assignment assignment extract-dir) + ;; TODO : on cancel-assignment, actually kill the process and stuff + ;; TODO : do local verification of resource usage. if the server starts an assignment that uses + ;; resource A and we're already using resource A, kill the old assignment + (define aid (assignment-id assignment)) (log-agent-info "starting execution of ~a in ~a" aid extract-dir) diff --git a/crossfire/server.rkt b/crossfire/server.rkt index b215deb..b054fd4 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -435,8 +435,14 @@ ;; done! (maybe) [(zero? (integer-set-count assignment)) (define at (task-state-agent-todo ts)) + (define real-in-progress + (integer-set-subtract + (for/fold ([wp (make-integer-set '())]) ([(_ v) (in-hash at)]) + (integer-set-union wp v)) + (task-state-completed-work ts))) (cond - [(hash-empty? at) ;; actually done. cancel all in-progress assignments and celebrate uwu + [(zero? (integer-set-count real-in-progress)) + ;; actually done. cancel all in-progress assignments and celebrate uwu ;; are we going to hold up literally everything because we're still holding this ;;semaphore during a database write? ;; probably @@ -448,14 +454,15 @@ ;; this will massively overcommit the last few parts of a project and potentially ;; prioritize doing useless duplicate work instead of moving on to the next project ;; but it'll be fiiiiiine don't worry - (define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)]) - (integer-set-union iset v))) - (define-values [assignment _] - (pattern-range-take (task-state-work-pattern ts) requested-amount)) + (define-values [assignment _] (pattern-range-take real-in-progress requested-amount)) + (log-server-info "assigning ~a (stolen assignment): ~a" + agent-id (integer-set-contents assignment)) (hash-set! at agent-id assignment) assignment])] ;; update tracking [else + (log-server-info "assigning ~a (normal assignment): ~a" + agent-id (integer-set-contents assignment)) (hash-set! (task-state-agent-todo ts) agent-id assignment) (set-task-state-work-pattern! ts new-wp) assignment])))) @@ -491,6 +498,13 @@ (task-set-complete! ts))])))) (define (agent-thd id arch resources-in msg-chan) + ;; TODO : for better reliability, the RPCs to the agent should really be on a separate thread + ;; it should access a message queue that this handler thread adds and removes items for + ;; perhaps the entire comms subsystem should be restructured with some sort of message queue + ;; system + ;; TODO whenever comms gets actual real time notifications of nodes going online or offline it + ;; needs to be hooked into update-assignments! here + ;; initialize to-node for rpcs (current-to-node (comms-get-node-info (current-comms) id)) ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track @@ -559,9 +573,20 @@ (hash-set! assigned-tasks aid (assignment aid (task-state-id ts) start-time-utc start-time-monotonic)) ;; send agent rpc - (invoke/retry-forever - (lambda () (push-assignment aid (task-state-id ts) mf-raw file-hash - (integer-set-contents assign-data)))) + (define (handle-failure ex) + (log-server-error "failed to push ~a to agent ~a" aid id) + ((error-display-handler) (exn-message ex) ex) + (task-unassign! ts id) + ;; attempt to cancel it + (with-handlers ([exn:fail? + (lambda (ex) + (log-server-error "failed to cancel ~a after error on ~a" aid id) + ((error-display-handler) (exn-message ex) ex))]) + (cancel-assignment aid)) + #t) + (with-handlers ([exn:fail? handle-failure]) + (push-assignment aid (task-state-id ts) mf-raw file-hash + (integer-set-contents assign-data))) #t])) (define (cancel-assignment! assignment) @@ -572,7 +597,12 @@ (task-unassign! ts id)) (hash-remove! assigned-tasks (assignment-id assignment)) ;; send agent rpc - (invoke/retry-forever (lambda () (cancel-assignment (assignment-id assignment))))) + (with-handlers ([exn:fail? + (lambda (ex) + (log-server-error "failed to cancel ~a on ~a" + (assignment-id assignment) id) + ((error-display-handler) (exn-message ex) ex))]) + (cancel-assignment (assignment-id assignment)))) ;; records a match for a certain assignment (define (add-assignment-match! assignment success-input)