refactor rpc transactions to prepare for streaming
This commit is contained in:
parent
809bd82a03
commit
f6cb069edb
|
@ -45,7 +45,7 @@
|
|||
(thread (lambda ()
|
||||
;; kinda pointless, other than helping keep the connection alive
|
||||
(let loop ()
|
||||
(with-handlers ([exn:fail? (lambda (ex) ((error-display-handler) (exn-message ex) ex))])
|
||||
(with-handlers ([exn:fail? (lambda (ex) (logging-report-error ex))])
|
||||
(agent-report-state #f #f))
|
||||
(sleep *ping-secs*) (loop)))))
|
||||
|
||||
|
@ -62,7 +62,7 @@
|
|||
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir)))
|
||||
(log-agent-info "downloading task data for ~a" tid)
|
||||
(with-handlers ([exn:fail? (lambda (ex)
|
||||
((error-display-handler) (exn-message ex) ex)
|
||||
(logging-report-error ex)
|
||||
(cleanup))])
|
||||
;; TODO this should be updated with the streaming interface
|
||||
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out))
|
||||
|
@ -178,7 +178,7 @@
|
|||
(define (report-error ex)
|
||||
(log-agent-info "execution of ~a ran into error" aid)
|
||||
(custodian-shutdown-all cust)
|
||||
((error-display-handler) (exn-message ex) ex)
|
||||
(logging-report-error ex)
|
||||
(agent-report-state aid 'error)
|
||||
(async-channel-put (current-queue) (cons 'stop aid)))
|
||||
|
||||
|
@ -413,7 +413,7 @@
|
|||
#f))
|
||||
(when maybe-exn
|
||||
(log-agent-error "error connecting to server")
|
||||
((error-display-handler) (exn-message maybe-exn) maybe-exn)
|
||||
(logging-report-error maybe-exn)
|
||||
(sleep sleep-time)
|
||||
(loop (min 120 (* sleep-time 2))))))
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list
|
||||
racket/logging racket/match racket/tcp racket/unit syntax/parse/define
|
||||
(for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime)
|
||||
"logging.rkt" "not-crypto.rkt")
|
||||
"logging.rkt" "not-crypto.rkt" (submod "static-support.rkt" misc-calls))
|
||||
|
||||
;; logging!
|
||||
(define-logger comms #:parent global-logger)
|
||||
|
@ -29,7 +29,37 @@
|
|||
(struct msg [from-id] #:prefab)
|
||||
(struct msg:hello msg [type pubkey] #:prefab)
|
||||
(struct msg:meow msg [meow] #:prefab)
|
||||
(struct msg:transaction msg [trans-id request? rpc-id data] #:prefab)
|
||||
(struct msg:stream msg [trans-id] #:prefab)
|
||||
(struct msg:transaction msg:stream [request? rpc-id data] #:prefab)
|
||||
(struct msg:file-token msg:stream [size] #:prefab)
|
||||
(struct msg:file-request msg:stream [offset] #:prefab)
|
||||
(struct msg:file msg:stream [offset data] #:prefab)
|
||||
|
||||
;; frontend file transfer types
|
||||
;; id: transaction id
|
||||
;; port: an input port if local
|
||||
;; chan: an async-channel if nonlocal
|
||||
(struct file-transfer [id])
|
||||
(struct file-transfer:local file-transfer [port])
|
||||
(struct file-transfer:remote file-transfer [chan size])
|
||||
(define (make-file-transfer port)
|
||||
(unless (current-trans-id)
|
||||
(error "not in a transaction!"))
|
||||
(file-transfer:local (current-trans-id) port))
|
||||
(define (make-nonlocal-file-transfer trans-id size)
|
||||
(file-transfer:remote trans-id (make-async-channel) size))
|
||||
;; connects a remote file transfer to an output port
|
||||
;; but also errors if there was an error
|
||||
(define (file-transfer-connect ft out-port)
|
||||
(let loop ()
|
||||
(match (async-channel-get (file-transfer:remote-chan ft))
|
||||
[(== eof) (void)]
|
||||
[(? bytes? b) (write-bytes b out-port)
|
||||
(loop)]
|
||||
[(? exn:fail? e) (raise e)]
|
||||
[x (error "unexpected file transfer data" x)])))
|
||||
(define file-transfer-size file-transfer:remote-size)
|
||||
(provide make-file-transfer file-transfer-connect file-transfer-size file-transfer?)
|
||||
|
||||
;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy)
|
||||
(struct signed [fasl signature] #:prefab)
|
||||
|
@ -38,9 +68,16 @@
|
|||
;; 30 second timeout to execute the handshake, or the connection will be aborted
|
||||
;; once the handshake is complete, the connection can stay open indefinitely with an indefinite
|
||||
;; time between messages
|
||||
(define HANDSHAKE-TIMEOUT 30)
|
||||
(define TRANSACTION-TIMEOUT 30)
|
||||
(define SHUTDOWN-TIMEOUT 5)
|
||||
(define *handshake-timeout* 30)
|
||||
(define *transaction-timeout* 30)
|
||||
(define *shutdown-timeout* 5)
|
||||
|
||||
;; interval to resend requests if no data
|
||||
(define *file-transfer-soft-timeout* 30)
|
||||
;; error out the file transfer if no data was transferred within this time
|
||||
;; fairly long because idk slow internet connections and such
|
||||
(define *file-transfer-idle-timeout* 600)
|
||||
(define *file-transfer-chunk-size* 131072)
|
||||
|
||||
;; node info (not all fields will always be present)
|
||||
;; type: 'server 'agent 'client
|
||||
|
@ -162,7 +199,7 @@
|
|||
;; engines are cool
|
||||
;; we use an engine to limit runtime for the handshake
|
||||
(let ([eng (engine (lambda (_) (peer-handshake in out)))])
|
||||
(if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng)
|
||||
(if (engine-run (* 1000 *handshake-timeout*) eng)
|
||||
(engine-result eng)
|
||||
(begin (engine-kill eng) (error "handshake timeout")))))
|
||||
|
||||
|
@ -263,7 +300,7 @@
|
|||
(match (hash-ref peer-registry data #f)
|
||||
[#f (void)]
|
||||
[thd (thread-send thd 'stop #f)
|
||||
(sync/timeout SHUTDOWN-TIMEOUT thd)
|
||||
(sync/timeout *shutdown-timeout* thd)
|
||||
(kill-thread thd)
|
||||
(hash-remove! peer-registry data)])
|
||||
(thread-send from (void) #f)]
|
||||
|
@ -297,7 +334,7 @@
|
|||
['shutdown
|
||||
(for ([(_ thd) (in-hash peer-registry)])
|
||||
(thread-send thd 'stop #f)
|
||||
(sync/timeout SHUTDOWN-TIMEOUT thd)
|
||||
(sync/timeout *shutdown-timeout* thd)
|
||||
(kill-thread thd))
|
||||
(when listener-thd
|
||||
(kill-thread listener-thd)
|
||||
|
@ -370,8 +407,24 @@
|
|||
(define trans-id 0)
|
||||
|
||||
(define local-channel (comms-local-channel comms))
|
||||
(define response-table (make-hash))
|
||||
(define rpc-table (make-hash))
|
||||
(define dispatch-table (make-weak-hash))
|
||||
|
||||
(define (dispatch-table-add! node-id stream-id [thd (current-thread)])
|
||||
(define id (cons node-id stream-id))
|
||||
(define eph (make-ephemeron thd (cons thd id)))
|
||||
(hash-set! dispatch-table thd eph))
|
||||
|
||||
(define (dispatch-table-dispatch node-id stream-id msg)
|
||||
(for ([v (in-list (hash-values dispatch-table))])
|
||||
(match (ephemeron-value v)
|
||||
[(cons thd (cons (== node-id) (== stream-id)))
|
||||
(thread-send thd msg #f)]
|
||||
;; GC or no match
|
||||
[_ (void)])))
|
||||
|
||||
(define (dispatch-table-all-threads)
|
||||
(map car (filter identity (map ephemeron-value (hash-values dispatch-table)))))
|
||||
|
||||
(define tm-thread (current-thread))
|
||||
(define tm-cust (make-custodian))
|
||||
|
@ -381,8 +434,9 @@
|
|||
trans-id)
|
||||
|
||||
(define (cleanup)
|
||||
(for ([(_ thd) (in-hash response-table)])
|
||||
(thread-send thd (trans-data-serialize (make-error "transaction manager shut down")) #f))
|
||||
(define thds (dispatch-table-all-threads))
|
||||
(map break-thread thds)
|
||||
(apply sync/timeout *transaction-timeout* thds)
|
||||
(custodian-shutdown-all tm-cust))
|
||||
|
||||
(define (trans-data-serialize data)
|
||||
|
@ -405,46 +459,74 @@
|
|||
(constructor message (current-continuation-marks))]
|
||||
[x x]))
|
||||
|
||||
(define (recv-transaction from key to-id transaction)
|
||||
(define (cleanup)
|
||||
(thread-sendrecv tm-thread 'deregister-response key))
|
||||
(with-handlers ([exn:fail? (lambda (ex)
|
||||
(cleanup) (thread-send from ex #f))])
|
||||
(thread-receive) ;; go token
|
||||
(define (recv-transaction from to-id rpc-id transaction)
|
||||
(break-enabled #t)
|
||||
(with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))]
|
||||
[exn:break? (lambda (_)
|
||||
(thread-send
|
||||
from (make-error "transaction manager shutdown") #f))])
|
||||
(comms-dispatch-msg/retry comms to-id transaction)
|
||||
(match (sync/timeout TRANSACTION-TIMEOUT (thread-receive-evt))
|
||||
(match (sync/timeout *transaction-timeout* (thread-receive-evt))
|
||||
[#f
|
||||
(cleanup)
|
||||
(log-tm-error "timeout sending transaction to ~a" to-id)
|
||||
(thread-send from (make-error "transaction timeout") #f)]
|
||||
[_ (define response (thread-receive))
|
||||
(thread-send from (trans-data-deserialize response) #f)])))
|
||||
[_
|
||||
(match (thread-receive)
|
||||
[(msg:transaction _ _ #f (== rpc-id) response)
|
||||
(thread-send from (trans-data-deserialize response) #f)]
|
||||
[(msg:file-token _ _ size) (void "TODO: receive file")]
|
||||
[x (error "got invalid response data" x)])])))
|
||||
|
||||
(define (send-transaction from to-id rpc-id rpc-data)
|
||||
(define trans-id (make-trans-id))
|
||||
(define key (cons to-id trans-id))
|
||||
(define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id
|
||||
(trans-data-serialize rpc-data)))
|
||||
(define response-thread (thread (lambda () (recv-transaction from key to-id transaction))))
|
||||
(hash-set! response-table key response-thread)
|
||||
(thread-send response-thread 'go))
|
||||
(define response-thread
|
||||
(thread (lambda () (recv-transaction from to-id rpc-id transaction))))
|
||||
(dispatch-table-add! to-id trans-id response-thread))
|
||||
|
||||
(define (handle-local-ft from-id trans-id port)
|
||||
(break-enabled #t)
|
||||
(with-handlers ([exn? (lambda (ex) (close-input-port port) (raise ex))])
|
||||
;; only supported for file and string ports
|
||||
;; but we do need to know the length beforehand, and the ability to seek
|
||||
(file-position port eof)
|
||||
(define port-len (file-position port))
|
||||
(file-position port 0)
|
||||
(comms-dispatch-msg/retry comms from-id (msg:file-token (node-id my-node) trans-id port-len))
|
||||
(define bstr (make-bytes *file-transfer-chunk-size*))
|
||||
(let loop ([offs 0])
|
||||
(define thread-evt (thread-receive-evt))
|
||||
(match (sync/timeout *file-transfer-idle-timeout* port thread-evt)
|
||||
[(== port)
|
||||
(match (read-bytes-avail! bstr)
|
||||
;; TODO
|
||||
[(== eof) (void)]
|
||||
[n (void)])]
|
||||
[(== thread-evt) (void "TODO")]))
|
||||
;; wait for completion req
|
||||
(void)))
|
||||
|
||||
(define (handle-incoming-transaction func msg)
|
||||
(match-define (msg:transaction from-id trans-id _ rpc-id rpc-data) msg)
|
||||
|
||||
(define (respond data)
|
||||
(define resp
|
||||
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (ex)
|
||||
((error-display-handler) "failed to dispatch transaction response" ex))])
|
||||
(comms-dispatch-msg/retry comms from-id resp)))
|
||||
(match data
|
||||
[(file-transfer:local id port)
|
||||
(handle-local-ft from-id trans-id port)]
|
||||
[_ (define resp
|
||||
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (ex)
|
||||
(logging-report-error ex "failed to send transaction response"))])
|
||||
(comms-dispatch-msg/retry comms from-id resp))]))
|
||||
|
||||
(respond
|
||||
(with-handlers ([exn:fail? identity])
|
||||
(define arg-data (trans-data-deserialize rpc-data))
|
||||
(define result
|
||||
(parameterize ([current-from-node (comms-get-node-info comms from-id)])
|
||||
(parameterize ([current-from-node (comms-get-node-info comms from-id)]
|
||||
[current-trans-id trans-id])
|
||||
;; TODO : apply timeout on the handler function?
|
||||
;; we don't want this thread to potentially hang forever if there's some sort of
|
||||
;; deadlock
|
||||
|
@ -458,7 +540,6 @@
|
|||
(thread-send from (void) #f)]
|
||||
['deregister-rpc (hash-remove! rpc-table data)
|
||||
(thread-send from (void) #f)]
|
||||
['deregister-response (hash-remove! response-table data) (thread-send from (void) #f)]
|
||||
['transact
|
||||
(match-define (cons to-id (cons rpc-id rpc-data)) data)
|
||||
(send-transaction from to-id rpc-id rpc-data)]
|
||||
|
@ -470,12 +551,11 @@
|
|||
[(msg:transaction from-id trans-id #t rpc-id rpc-data)
|
||||
(match (hash-ref rpc-table rpc-id #f)
|
||||
[#f (log-tm-warning "got unknown rpc req: ~a" msg)]
|
||||
[func (thread (lambda () (handle-incoming-transaction func msg)))])]
|
||||
[(msg:transaction from-id trans-id #f rpc-id rpc-data)
|
||||
(define key (cons from-id trans-id))
|
||||
(match (hash-ref response-table key #f)
|
||||
[#f (log-tm-warning "got spurious transaction response: ~a" msg)]
|
||||
[thd (thread-send thd rpc-data #f) (hash-remove! response-table key)])]
|
||||
[func
|
||||
(define thd (thread (lambda () (handle-incoming-transaction func msg))))
|
||||
(dispatch-table-add! from-id trans-id thd)])]
|
||||
[(msg:stream from-id trans-id)
|
||||
(dispatch-table-dispatch from-id trans-id msg)]
|
||||
[_ (log-tm-warning "got unknown msg: ~a" msg)]))
|
||||
|
||||
;; it's a thread cell and i'm too lazy to add a parameterize clause... it should work
|
||||
|
@ -521,6 +601,7 @@
|
|||
(define current-tm (make-parameter #f))
|
||||
(define current-to-node (make-parameter #f))
|
||||
(define current-from-node (make-parameter #f))
|
||||
(define current-trans-id (make-parameter #f))
|
||||
|
||||
|
||||
;; this is entirely a proc macro because idk how to do this in any fancier way
|
||||
|
|
|
@ -18,10 +18,13 @@
|
|||
|
||||
(require racket/bool racket/date racket/format racket/match racket/string)
|
||||
|
||||
(provide global-logger install-logging!)
|
||||
(provide global-logger install-logging! logging-report-error)
|
||||
|
||||
(define global-logger (make-logger))
|
||||
|
||||
(define (logging-report-error ex [msg #f])
|
||||
((error-display-handler) (or msg (exn-message ex)) ex))
|
||||
|
||||
(define (recv-thd receiver stop-chan)
|
||||
;; iso8601 gang
|
||||
(date-display-format 'iso-8601)
|
||||
|
|
|
@ -556,7 +556,7 @@
|
|||
#f))
|
||||
(when maybe-exn
|
||||
(log-server-error "agent ~a encountered error" id)
|
||||
((error-display-handler) (exn-message maybe-exn) maybe-exn)
|
||||
(logging-report-error maybe-exn)
|
||||
(sleep retry-delay)
|
||||
(init-loop (min *max-retry-delay*
|
||||
(* *retry-delay-ratio* retry-delay))))))
|
||||
|
@ -581,13 +581,13 @@
|
|||
;; send agent rpc
|
||||
(define (handle-failure ex)
|
||||
(log-server-error "failed to push ~a to agent ~a" aid id)
|
||||
((error-display-handler) (exn-message ex) ex)
|
||||
(logging-report-error ex)
|
||||
(task-unassign! ts id)
|
||||
;; attempt to cancel it
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (ex)
|
||||
(log-server-error "failed to cancel ~a after error on ~a" aid id)
|
||||
((error-display-handler) (exn-message ex) ex))])
|
||||
(logging-report-error ex))])
|
||||
(cancel-assignment aid))
|
||||
#t)
|
||||
(with-handlers ([exn:fail? handle-failure])
|
||||
|
@ -607,7 +607,7 @@
|
|||
(lambda (ex)
|
||||
(log-server-error "failed to cancel ~a on ~a"
|
||||
(assignment-id assignment) id)
|
||||
((error-display-handler) (exn-message ex) ex))])
|
||||
(logging-report-error ex))])
|
||||
(cancel-assignment (assignment-id assignment))))
|
||||
|
||||
;; records a match for a certain assignment
|
||||
|
|
Loading…
Reference in New Issue