fix issues with exn:break handling

This commit is contained in:
xenia 2020-11-27 06:01:20 -05:00
parent d4482794ab
commit 0ae728a278
4 changed files with 89 additions and 54 deletions

View File

@ -16,52 +16,60 @@
;; You should have received a copy of the GNU Affero General Public License ;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>. ;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/contract racket/fasl racket/file racket/match racket/port racket/string racket/unit (require racket/async-channel racket/bool racket/contract racket/fasl racket/file racket/function
racket/match racket/port racket/string racket/unit
"comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt" "comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt"
"static-support.rkt") "static-support.rkt")
(define-logger agent #:parent global-logger) (define-logger agent #:parent global-logger)
(define (get-config.linux-gnu) ;; global variables, yeet
(call-with-input-file "/proc/self/exe"
(lambda (in) (struct assignment [id task-id manifest file-hash work-range] #:transparent)
(file-position in eof) (define incoming-queue (make-async-channel))
(define len (file-position in))
(file-position in (- len 4))
(define offset (integer-bytes->integer (port->bytes in) #f #t))
(file-position in (- len offset))
(fasl->s-exp in))))
;; main loop ;; main loop
(define (agent-loop) (define (agent-loop)
; (thread (lambda ()
; (log-agent-info "downloading assignment ~a" aid)
; (define data (get-project-file tid))
; (log-agent-info "assignment data: ~s" data)
; (log-agent-info "simulating assignment ~a" aid)
; (sleep 10)
; (log-agent-info "sending completion ~a" aid)
; (agent-report-state aid 'complete)))
(sleep 10) (sleep 10)
(agent-loop)) (agent-loop))
;; rpc impl ;; rpc impl
(define (enforce-subject type)
(unless (symbol=? type (node-type (current-from-node)))
(error "unauthorized")))
(define/contract (push-assignment aid tid mf-raw file-hash assign-data) (define/contract (push-assignment aid tid mf-raw file-hash assign-data)
(-> integer? integer? list? bytes? (listof pair?) void?) (-> integer? integer? list? bytes? (listof pair?) void?)
(enforce-subject 'server)
(log-agent-info "got push-assignment ~a ~a ~s ~a" aid mf-raw file-hash assign-data) (log-agent-info "got push-assignment ~a ~a ~s ~a" aid mf-raw file-hash assign-data)
(thread (lambda ()
(log-agent-info "downloading assignment ~a" aid) (async-channel-put
(define data (get-project-file tid)) incoming-queue (cons 'new (assignment aid tid (parse-manifest mf-raw) file-hash assign-data)))
(log-agent-info "assignment data: ~s" data)
(log-agent-info "simulating assignment ~a" aid)
(sleep 10)
(log-agent-info "sending completion ~a" aid)
(agent-report-state aid 'complete)))
(void)) (void))
(define/contract (cancel-assignment aid) (define/contract (cancel-assignment aid)
(-> integer? void?) (-> integer? void?)
(enforce-subject 'server)
(log-agent-info "got cancel-assignment ~a" aid) (log-agent-info "got cancel-assignment ~a" aid)
(async-channel-put incoming-queue (cons 'cancel aid))
(void)) (void))
(define/contract (cancel-all-assignments) (define/contract (cancel-all-assignments)
(-> void?) (-> void?)
(enforce-subject 'server)
(log-agent-info "got cancel-all-assignments") (log-agent-info "got cancel-all-assignments")
(async-channel-put incoming-queue 'cancel-all)
(void)) (void))
;; agent impl unit ;; agent impl unit
@ -77,6 +85,16 @@
(install-logging!) (install-logging!)
(log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version)) (log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version))
(define (get-config.linux-gnu)
(call-with-input-file "/proc/self/exe"
(lambda (in)
(file-position in eof)
(define len (file-position in))
(file-position in (- len 4))
(define offset (integer-bytes->integer (port->bytes in) #f #t))
(file-position in (- len offset))
(fasl->s-exp in))))
(match-define (list agent-node server-node) (match-define (list agent-node server-node)
(if (static-ffi-available?) (if (static-ffi-available?)
(match (string-split (static-ffi-arch) "-") (match (string-split (static-ffi-arch) "-")
@ -98,13 +116,20 @@
(comms-set-node-info (current-comms) server-node) (comms-set-node-info (current-comms) server-node)
(log-agent-info "connecting to server...") (log-agent-info "connecting to server...")
(let loop ([sleep-time 1]) (with-handlers ([exn:break? (lambda (_)
(with-handlers ([exn? (lambda (ex) (log-agent-info "connection cancelled")
(log-agent-error "error connecting to server: ~a" ex) (exit))])
(sleep sleep-time) (let loop ([sleep-time 1])
(loop (min 120 (* sleep-time 2))))]) (define maybe-exn
(comms-connect (current-comms) (node-id server-node)) (with-handlers ([exn:fail? identity])
(agent-report-state #f #f))) (comms-connect (current-comms) (node-id server-node))
(log-agent-info "connected! ready to do stuff") (agent-report-state #f #f)
#f))
(when maybe-exn
(log-agent-error "error connecting to server")
((error-display-handler) (exn-message maybe-exn) maybe-exn)
(sleep sleep-time)
(loop (min 120 (* sleep-time 2))))))
(log-agent-info "connected! ready to do stuff")
(agent-loop)) (agent-loop))

View File

@ -112,7 +112,7 @@
;; handles tcp data in ;; handles tcp data in
(define (handle-in-msg) (define (handle-in-msg)
(match (with-handlers ([exn? (lambda (_) #f)]) (fasl->s-exp in)) (match (with-handlers ([exn:fail? (lambda (_) #f)]) (fasl->s-exp in))
[(locked fasl nonce mac) [(locked fasl nonce mac)
(match (crypto-unlock session-key nonce mac fasl) (match (crypto-unlock session-key nonce mac fasl)
[#f (error "corrupted message from peer" (node-id peer-data))] [#f (error "corrupted message from peer" (node-id peer-data))]
@ -179,7 +179,7 @@
(thread-wait new-thd) (thread-wait new-thd)
(custodian-shutdown-all cust) (custodian-shutdown-all cust)
(crypto-wipe session-key) (crypto-wipe session-key)
(with-handlers ([exn? void]) (with-handlers ([exn:fail? void])
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data))))) (thread-sendrecv el-thread 'deregister-channel (node-id peer-data)))))
new-thd) new-thd)
@ -200,7 +200,7 @@
;; starts the tcp listener ;; starts the tcp listener
(define (handle-listen from port) (define (handle-listen from port)
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))]) (with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))])
(when (false? listener-thd) (when (false? listener-thd)
(set! listener (tcp-listen port 4 #t)) (set! listener (tcp-listen port 4 #t))
(log-comms-info "listening on port ~a" port) (log-comms-info "listening on port ~a" port)
@ -219,7 +219,7 @@
(define (handle-connect from id) (define (handle-connect from id)
(thread (thread
(lambda () (lambda ()
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))]) (with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))])
(match (hash-ref node-registry id #f) (match (hash-ref node-registry id #f)
[#f (thread-send from (make-error "no such node" id) #f)] [#f (thread-send from (make-error "no such node" id) #f)]
[(node id name type pubkey seckey host port) [(node id name type pubkey seckey host port)
@ -350,10 +350,14 @@
(raise ex)) (raise ex))
(sleep retry-delay) (sleep retry-delay)
(comms-dispatch-msg/retry comms to-id msg (sub1 tries) retry-delay)) (comms-dispatch-msg/retry comms to-id msg (sub1 tries) retry-delay))
(with-handlers ([exn? do-retry]) (define maybe-exn
(unless (comms-channel-available? comms to-id) (with-handlers ([exn:fail? identity])
(comms-connect comms to-id)) (unless (comms-channel-available? comms to-id)
(comms-dispatch-msg comms to-id msg))) (comms-connect comms to-id))
(comms-dispatch-msg comms to-id msg)
#f))
(when maybe-exn
(do-retry maybe-exn)))
(provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info (provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info
comms-delete-node comms-channel-available? comms-shutdown) comms-delete-node comms-channel-available? comms-shutdown)
@ -404,7 +408,7 @@
(define (recv-transaction from key to-id transaction) (define (recv-transaction from key to-id transaction)
(define (cleanup) (define (cleanup)
(thread-sendrecv tm-thread 'deregister-response key)) (thread-sendrecv tm-thread 'deregister-response key))
(with-handlers ([exn? (lambda (ex) (with-handlers ([exn:fail? (lambda (ex)
(cleanup) (thread-send from ex #f))]) (cleanup) (thread-send from ex #f))])
(thread-receive) ;; go token (thread-receive) ;; go token
(comms-dispatch-msg/retry comms to-id transaction) (comms-dispatch-msg/retry comms to-id transaction)
@ -431,19 +435,21 @@
(define (respond data) (define (respond data)
(define resp (define resp
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data))) (msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
(with-handlers ([exn? (with-handlers ([exn:fail?
(lambda (ex) (lambda (ex)
((error-display-handler) "failed to dispatch transaction response" ex))]) ((error-display-handler) "failed to dispatch transaction response" ex))])
(comms-dispatch-msg/retry comms from-id resp))) (comms-dispatch-msg/retry comms from-id resp)))
(with-handlers ([exn? respond]) (respond
(define arg-data (trans-data-deserialize rpc-data)) (with-handlers ([exn:fail? identity])
(define result (define arg-data (trans-data-deserialize rpc-data))
(parameterize ([current-from-node (comms-get-node-info comms from-id)]) (define result
;; TODO : apply timeout on the handler function? (parameterize ([current-from-node (comms-get-node-info comms from-id)])
;; we don't want this thread to potentially hang forever if there's some sort of deadlock ;; TODO : apply timeout on the handler function?
(apply func arg-data))) ;; we don't want this thread to potentially hang forever if there's some sort of
(respond result))) ;; deadlock
(apply func arg-data)))
result)))
(define (handle-thread-msg) (define (handle-thread-msg)
(match-define (cons from (cons type data)) (thread-receive)) (match-define (cons from (cons type data)) (thread-receive))

View File

@ -18,8 +18,8 @@
(require db/base db/sqlite3 (require db/base db/sqlite3
data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file
racket/list racket/logging racket/match racket/path racket/random racket/runtime-path racket/function racket/list racket/logging racket/match racket/path racket/random
racket/set racket/string racket/unit srfi/19 racket/runtime-path racket/set racket/string racket/unit srfi/19
north/base north/adapter/base north/adapter/sqlite north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "info.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt" "comms.rkt" "info.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync ;; port-fsync
@ -332,7 +332,7 @@
(define/contract (get-project-file taskid) (define/contract (get-project-file taskid)
(-> integer? bytes?) (-> integer? bytes?)
;; TODO : streaming interface ;; TODO : streaming interface
(with-handlers ([exn? (lambda (ex) (error "unable to fetch the requested file"))]) (with-handlers ([exn:fail? (lambda (ex) (error "unable to fetch the requested file"))])
(server-get-file taskid))) (server-get-file taskid)))
@ -505,12 +505,16 @@
;; helper to repeatedly invoke an agent rpc ;; helper to repeatedly invoke an agent rpc
(define (invoke/retry-forever proc) (define (invoke/retry-forever proc)
(let init-loop ([retry-delay *min-retry-delay*]) (let init-loop ([retry-delay *min-retry-delay*])
(with-handlers ([exn? (lambda (ex) (define maybe-exn
(log-server-error "agent ~a encountered error ~a" id ex) (with-handlers ([exn:fail? identity])
(sleep retry-delay) (proc)
(init-loop (min *max-retry-delay* #f))
(* *retry-delay-ratio* retry-delay))))]) (when maybe-exn
(proc)))) (log-server-error "agent ~a encountered error" id)
((error-display-handler) (exn-message maybe-exn) maybe-exn)
(sleep retry-delay)
(init-loop (min *max-retry-delay*
(* *retry-delay-ratio* retry-delay))))))
;; #t if a new assignment was added, otherwise #f ;; #t if a new assignment was added, otherwise #f
(define (create-assignment! ts) (define (create-assignment! ts)

View File

@ -39,7 +39,7 @@
;; checks if the current runtime provides a static ffi table ;; checks if the current runtime provides a static ffi table
(define (static-ffi-available?) (define (static-ffi-available?)
(with-handlers ([exn? (lambda (ex) #f)]) (with-handlers ([exn:fail? (lambda (ex) #f)])
(dynamic-require ''#%static-ffi 'table) (dynamic-require ''#%static-ffi 'table)
#t)) #t))