fix agent bugs with task execution
This commit is contained in:
parent
8ec0861907
commit
b97bfcbc6f
4
Makefile
4
Makefile
|
@ -33,7 +33,9 @@ dev-rollback:
|
||||||
raco north rollback -p crossfire/migrations -f
|
raco north rollback -p crossfire/migrations -f
|
||||||
|
|
||||||
dev-make-agent:
|
dev-make-agent:
|
||||||
|
$(RM) agent-deployment/app.zo
|
||||||
cd agent-deployment && $(MAKE)
|
cd agent-deployment && $(MAKE)
|
||||||
[ -d lib ] || mkdir lib
|
[ -d lib ] || mkdir lib
|
||||||
cd agent-deployment && for dir in arch_*; do \
|
cd agent-deployment && for dir in arch_*; do \
|
||||||
mkdir ../lib/$$dir; cp $$dir/crossfire-agent ../lib/$$dir/; done
|
[ -d ../lib/$$dir ] || mkdir ../lib/$$dir; \
|
||||||
|
cp $$dir/crossfire-agent ../lib/$$dir/; done
|
||||||
|
|
|
@ -44,23 +44,32 @@
|
||||||
(parameterize ([current-custodian cust])
|
(parameterize ([current-custodian cust])
|
||||||
(thread (lambda ()
|
(thread (lambda ()
|
||||||
;; kinda pointless, other than helping keep the connection alive
|
;; kinda pointless, other than helping keep the connection alive
|
||||||
(let loop () (agent-report-state #f #f) (sleep *ping-secs*) (loop)))))
|
(let loop ()
|
||||||
|
(with-handlers ([exn:fail? (lambda (ex) ((error-display-handler) (exn-message ex) ex))])
|
||||||
|
(agent-report-state #f #f))
|
||||||
|
(sleep *ping-secs*) (loop)))))
|
||||||
|
|
||||||
(define last-cache-update (current-seconds-monotonic))
|
(define last-cache-update (current-seconds-monotonic))
|
||||||
(define run-agent? #t)
|
(define run-agent? #t)
|
||||||
(define assignments (make-hash))
|
(define assignments (make-hash))
|
||||||
|
|
||||||
(struct download [thd file-hash/hex extract-dir [waiters #:mutable]] #:transparent)
|
(struct download [thd file-hash/hex extract-dir [waiters #:mutable] success] #:transparent)
|
||||||
(define downloads (make-hash))
|
(define downloads (make-hash))
|
||||||
|
|
||||||
(define (download/extract tid tgz-file extract-dir)
|
(define (download/extract tid tgz-file extract-dir success-box)
|
||||||
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files tgz-file))
|
(define (cleanup)
|
||||||
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir))
|
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files tgz-file))
|
||||||
|
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir)))
|
||||||
(log-agent-info "downloading task data for ~a" tid)
|
(log-agent-info "downloading task data for ~a" tid)
|
||||||
;; TODO this should be updated with the streaming interface
|
(with-handlers ([exn:fail? (lambda (ex)
|
||||||
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)))
|
((error-display-handler) (exn-message ex) ex)
|
||||||
(log-agent-info "extracting task data for ~a" tid)
|
(cleanup))])
|
||||||
(untgz tgz-file #:dest extract-dir))
|
;; TODO this should be updated with the streaming interface
|
||||||
|
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out))
|
||||||
|
#:exists 'truncate)
|
||||||
|
(log-agent-info "extracting task data for ~a" tid)
|
||||||
|
(untgz tgz-file #:dest extract-dir)
|
||||||
|
(set-box! success-box #t)))
|
||||||
|
|
||||||
(let loop ()
|
(let loop ()
|
||||||
(define cache-update-delta (max 0 (- (+ last-cache-update *max-cache-age*)
|
(define cache-update-delta (max 0 (- (+ last-cache-update *max-cache-age*)
|
||||||
|
@ -90,19 +99,29 @@
|
||||||
;; download completed
|
;; download completed
|
||||||
[(? thread? dl-thd)
|
[(? thread? dl-thd)
|
||||||
;; argh
|
;; argh
|
||||||
(match-define (download thd file-hash/hex extract-dir waiters)
|
(define the-dl
|
||||||
(for/first ([(tid dl) (in-hash downloads)] #:when (eq? dl-thd (download-thd dl)))
|
(for/first ([(tid dl) (in-hash downloads)] #:when (eq? dl-thd (download-thd dl)))
|
||||||
(hash-remove! downloads tid)
|
(hash-remove! downloads tid)
|
||||||
(log-agent-info "completed download for ~a" tid)
|
(log-agent-info "completed download for ~a" tid)
|
||||||
dl))
|
dl))
|
||||||
(hash-set! cache-info file-hash/hex (current-seconds))
|
(cond
|
||||||
(update-workdir-cache! workdir cache-info)
|
[(false? the-dl) (log-agent-error "download completed, but missing record of it")]
|
||||||
(set! last-cache-update (current-seconds-monotonic))
|
[(false? (unbox (download-success the-dl)))
|
||||||
;; start delayed assignments
|
;; download failed, report error
|
||||||
(for ([assignment (in-list waiters)])
|
(for ([assignment (in-list (download-waiters the-dl))])
|
||||||
(parameterize ([current-custodian cust])
|
(log-agent-error "download failed for ~a" (assignment-id assignment))
|
||||||
(hash-set! assignments (assignment-id assignment)
|
(agent-report-state (assignment-id assignment) 'error))]
|
||||||
(thread (lambda () (execute-assignment assignment extract-dir))))))]
|
[else
|
||||||
|
;; start it
|
||||||
|
(match-define (download thd file-hash/hex extract-dir waiters success-box) the-dl)
|
||||||
|
(hash-set! cache-info file-hash/hex (current-seconds))
|
||||||
|
(update-workdir-cache! workdir cache-info)
|
||||||
|
(set! last-cache-update (current-seconds-monotonic))
|
||||||
|
;; start delayed assignments
|
||||||
|
(for ([assignment (in-list waiters)])
|
||||||
|
(parameterize ([current-custodian cust])
|
||||||
|
(hash-set! assignments (assignment-id assignment)
|
||||||
|
(thread (lambda () (execute-assignment assignment extract-dir))))))])]
|
||||||
[(cons 'new assignment)
|
[(cons 'new assignment)
|
||||||
(define aid (assignment-id assignment))
|
(define aid (assignment-id assignment))
|
||||||
;; cancel old assignment with the same id, if exists
|
;; cancel old assignment with the same id, if exists
|
||||||
|
@ -129,16 +148,18 @@
|
||||||
(set-download-waiters! dl (cons assignment (download-waiters dl)))]
|
(set-download-waiters! dl (cons assignment (download-waiters dl)))]
|
||||||
[else
|
[else
|
||||||
(log-agent-info "starting download for ~a" tid)
|
(log-agent-info "starting download for ~a" tid)
|
||||||
(define dl (download (thread (thunk (download/extract tid tgz-file extract-dir)))
|
(define success-box (box #f))
|
||||||
file-hash/hex extract-dir (list assignment)))
|
(define dl
|
||||||
(hash-set! downloads tid dl)])])
|
(download (thread (thunk (download/extract tid tgz-file extract-dir success-box)))
|
||||||
|
file-hash/hex extract-dir (list assignment) success-box))
|
||||||
|
(hash-set! downloads tid dl)])]
|
||||||
|
[x (log-agent-error "unexpected message" x)])
|
||||||
|
|
||||||
(when run-agent? (loop)))
|
(when run-agent? (loop)))
|
||||||
;; TODO : report errors for all in-progress assignments or something
|
;; TODO : report errors for all in-progress assignments or something
|
||||||
(custodian-shutdown-all cust))
|
(custodian-shutdown-all cust))
|
||||||
|
|
||||||
(define (execute-assignment assignment extract-dir)
|
(define (execute-assignment assignment extract-dir)
|
||||||
;; TODO : on cancel-assignment, actually kill the process and stuff
|
|
||||||
;; TODO : do local verification of resource usage. if the server starts an assignment that uses
|
;; TODO : do local verification of resource usage. if the server starts an assignment that uses
|
||||||
;; resource A and we're already using resource A, kill the old assignment
|
;; resource A and we're already using resource A, kill the old assignment
|
||||||
|
|
||||||
|
@ -187,7 +208,10 @@
|
||||||
(parameterize ([current-custodian cust] [current-directory extract-dir])
|
(parameterize ([current-custodian cust] [current-directory extract-dir])
|
||||||
(apply subprocess #f #f (current-error-port) 'new (append cmd args))))
|
(apply subprocess #f #f (current-error-port) 'new (append cmd args))))
|
||||||
|
|
||||||
(with-handlers ([exn:break? (lambda (_) (subprocess-kill proc #f)
|
(with-handlers ([exn:break? (lambda (_)
|
||||||
|
(log-agent-info "killing process for ~a" aid)
|
||||||
|
;; nicely ask the process to stop
|
||||||
|
(subprocess-kill proc #f)
|
||||||
(sync/timeout *subproc-kill-delay* proc)
|
(sync/timeout *subproc-kill-delay* proc)
|
||||||
;; will handle killing for us :P
|
;; will handle killing for us :P
|
||||||
(custodian-shutdown-all cust)
|
(custodian-shutdown-all cust)
|
||||||
|
@ -196,7 +220,7 @@
|
||||||
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
|
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
|
||||||
(define eof-e (eof-evt out))
|
(define eof-e (eof-evt out))
|
||||||
(let loop ([reached-eof #f] [proc-done #f])
|
(let loop ([reached-eof #f] [proc-done #f])
|
||||||
(match (sync proc line-match eof-e)
|
(match (sync/enable-break proc line-match eof-e)
|
||||||
[(== proc)
|
[(== proc)
|
||||||
(unless reached-eof
|
(unless reached-eof
|
||||||
(loop reached-eof #t))]
|
(loop reached-eof #t))]
|
||||||
|
|
Loading…
Reference in New Issue