diff --git a/crossfire/agent.rkt b/crossfire/agent.rkt index b529927..09f7752 100644 --- a/crossfire/agent.rkt +++ b/crossfire/agent.rkt @@ -45,7 +45,7 @@ (thread (lambda () ;; kinda pointless, other than helping keep the connection alive (let loop () - (with-handlers ([exn:fail? (lambda (ex) ((error-display-handler) (exn-message ex) ex))]) + (with-handlers ([exn:fail? (lambda (ex) (logging-report-error ex))]) (agent-report-state #f #f)) (sleep *ping-secs*) (loop))))) @@ -62,7 +62,7 @@ (with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir))) (log-agent-info "downloading task data for ~a" tid) (with-handlers ([exn:fail? (lambda (ex) - ((error-display-handler) (exn-message ex) ex) + (logging-report-error ex) (cleanup))]) ;; TODO this should be updated with the streaming interface (call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)) @@ -178,7 +178,7 @@ (define (report-error ex) (log-agent-info "execution of ~a ran into error" aid) (custodian-shutdown-all cust) - ((error-display-handler) (exn-message ex) ex) + (logging-report-error ex) (agent-report-state aid 'error) (async-channel-put (current-queue) (cons 'stop aid))) @@ -413,7 +413,7 @@ #f)) (when maybe-exn (log-agent-error "error connecting to server") - ((error-display-handler) (exn-message maybe-exn) maybe-exn) + (logging-report-error maybe-exn) (sleep sleep-time) (loop (min 120 (* sleep-time 2)))))) diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 16d9bed..e76c0bd 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -19,7 +19,7 @@ (require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list racket/logging racket/match racket/tcp racket/unit syntax/parse/define (for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime) - "logging.rkt" "not-crypto.rkt") + "logging.rkt" "not-crypto.rkt" (submod "static-support.rkt" misc-calls)) ;; logging! (define-logger comms #:parent global-logger) @@ -29,7 +29,37 @@ (struct msg [from-id] #:prefab) (struct msg:hello msg [type pubkey] #:prefab) (struct msg:meow msg [meow] #:prefab) -(struct msg:transaction msg [trans-id request? rpc-id data] #:prefab) +(struct msg:stream msg [trans-id] #:prefab) +(struct msg:transaction msg:stream [request? rpc-id data] #:prefab) +(struct msg:file-token msg:stream [size] #:prefab) +(struct msg:file-request msg:stream [offset] #:prefab) +(struct msg:file msg:stream [offset data] #:prefab) + +;; frontend file transfer types +;; id: transaction id +;; port: an input port if local +;; chan: an async-channel if nonlocal +(struct file-transfer [id]) +(struct file-transfer:local file-transfer [port]) +(struct file-transfer:remote file-transfer [chan size]) +(define (make-file-transfer port) + (unless (current-trans-id) + (error "not in a transaction!")) + (file-transfer:local (current-trans-id) port)) +(define (make-nonlocal-file-transfer trans-id size) + (file-transfer:remote trans-id (make-async-channel) size)) +;; connects a remote file transfer to an output port +;; but also errors if there was an error +(define (file-transfer-connect ft out-port) + (let loop () + (match (async-channel-get (file-transfer:remote-chan ft)) + [(== eof) (void)] + [(? bytes? b) (write-bytes b out-port) + (loop)] + [(? exn:fail? e) (raise e)] + [x (error "unexpected file transfer data" x)]))) +(define file-transfer-size file-transfer:remote-size) +(provide make-file-transfer file-transfer-connect file-transfer-size file-transfer?) ;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy) (struct signed [fasl signature] #:prefab) @@ -38,9 +68,16 @@ ;; 30 second timeout to execute the handshake, or the connection will be aborted ;; once the handshake is complete, the connection can stay open indefinitely with an indefinite ;; time between messages -(define HANDSHAKE-TIMEOUT 30) -(define TRANSACTION-TIMEOUT 30) -(define SHUTDOWN-TIMEOUT 5) +(define *handshake-timeout* 30) +(define *transaction-timeout* 30) +(define *shutdown-timeout* 5) + +;; interval to resend requests if no data +(define *file-transfer-soft-timeout* 30) +;; error out the file transfer if no data was transferred within this time +;; fairly long because idk slow internet connections and such +(define *file-transfer-idle-timeout* 600) +(define *file-transfer-chunk-size* 131072) ;; node info (not all fields will always be present) ;; type: 'server 'agent 'client @@ -162,7 +199,7 @@ ;; engines are cool ;; we use an engine to limit runtime for the handshake (let ([eng (engine (lambda (_) (peer-handshake in out)))]) - (if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng) + (if (engine-run (* 1000 *handshake-timeout*) eng) (engine-result eng) (begin (engine-kill eng) (error "handshake timeout"))))) @@ -263,7 +300,7 @@ (match (hash-ref peer-registry data #f) [#f (void)] [thd (thread-send thd 'stop #f) - (sync/timeout SHUTDOWN-TIMEOUT thd) + (sync/timeout *shutdown-timeout* thd) (kill-thread thd) (hash-remove! peer-registry data)]) (thread-send from (void) #f)] @@ -297,7 +334,7 @@ ['shutdown (for ([(_ thd) (in-hash peer-registry)]) (thread-send thd 'stop #f) - (sync/timeout SHUTDOWN-TIMEOUT thd) + (sync/timeout *shutdown-timeout* thd) (kill-thread thd)) (when listener-thd (kill-thread listener-thd) @@ -370,8 +407,24 @@ (define trans-id 0) (define local-channel (comms-local-channel comms)) - (define response-table (make-hash)) (define rpc-table (make-hash)) + (define dispatch-table (make-weak-hash)) + + (define (dispatch-table-add! node-id stream-id [thd (current-thread)]) + (define id (cons node-id stream-id)) + (define eph (make-ephemeron thd (cons thd id))) + (hash-set! dispatch-table thd eph)) + + (define (dispatch-table-dispatch node-id stream-id msg) + (for ([v (in-list (hash-values dispatch-table))]) + (match (ephemeron-value v) + [(cons thd (cons (== node-id) (== stream-id))) + (thread-send thd msg #f)] + ;; GC or no match + [_ (void)]))) + + (define (dispatch-table-all-threads) + (map car (filter identity (map ephemeron-value (hash-values dispatch-table))))) (define tm-thread (current-thread)) (define tm-cust (make-custodian)) @@ -381,8 +434,9 @@ trans-id) (define (cleanup) - (for ([(_ thd) (in-hash response-table)]) - (thread-send thd (trans-data-serialize (make-error "transaction manager shut down")) #f)) + (define thds (dispatch-table-all-threads)) + (map break-thread thds) + (apply sync/timeout *transaction-timeout* thds) (custodian-shutdown-all tm-cust)) (define (trans-data-serialize data) @@ -405,46 +459,74 @@ (constructor message (current-continuation-marks))] [x x])) - (define (recv-transaction from key to-id transaction) - (define (cleanup) - (thread-sendrecv tm-thread 'deregister-response key)) - (with-handlers ([exn:fail? (lambda (ex) - (cleanup) (thread-send from ex #f))]) - (thread-receive) ;; go token + (define (recv-transaction from to-id rpc-id transaction) + (break-enabled #t) + (with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))] + [exn:break? (lambda (_) + (thread-send + from (make-error "transaction manager shutdown") #f))]) (comms-dispatch-msg/retry comms to-id transaction) - (match (sync/timeout TRANSACTION-TIMEOUT (thread-receive-evt)) + (match (sync/timeout *transaction-timeout* (thread-receive-evt)) [#f - (cleanup) (log-tm-error "timeout sending transaction to ~a" to-id) (thread-send from (make-error "transaction timeout") #f)] - [_ (define response (thread-receive)) - (thread-send from (trans-data-deserialize response) #f)]))) + [_ + (match (thread-receive) + [(msg:transaction _ _ #f (== rpc-id) response) + (thread-send from (trans-data-deserialize response) #f)] + [(msg:file-token _ _ size) (void "TODO: receive file")] + [x (error "got invalid response data" x)])]))) (define (send-transaction from to-id rpc-id rpc-data) (define trans-id (make-trans-id)) - (define key (cons to-id trans-id)) (define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id (trans-data-serialize rpc-data))) - (define response-thread (thread (lambda () (recv-transaction from key to-id transaction)))) - (hash-set! response-table key response-thread) - (thread-send response-thread 'go)) + (define response-thread + (thread (lambda () (recv-transaction from to-id rpc-id transaction)))) + (dispatch-table-add! to-id trans-id response-thread)) + + (define (handle-local-ft from-id trans-id port) + (break-enabled #t) + (with-handlers ([exn? (lambda (ex) (close-input-port port) (raise ex))]) + ;; only supported for file and string ports + ;; but we do need to know the length beforehand, and the ability to seek + (file-position port eof) + (define port-len (file-position port)) + (file-position port 0) + (comms-dispatch-msg/retry comms from-id (msg:file-token (node-id my-node) trans-id port-len)) + (define bstr (make-bytes *file-transfer-chunk-size*)) + (let loop ([offs 0]) + (define thread-evt (thread-receive-evt)) + (match (sync/timeout *file-transfer-idle-timeout* port thread-evt) + [(== port) + (match (read-bytes-avail! bstr) + ;; TODO + [(== eof) (void)] + [n (void)])] + [(== thread-evt) (void "TODO")])) + ;; wait for completion req + (void))) (define (handle-incoming-transaction func msg) (match-define (msg:transaction from-id trans-id _ rpc-id rpc-data) msg) (define (respond data) - (define resp - (msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data))) - (with-handlers ([exn:fail? - (lambda (ex) - ((error-display-handler) "failed to dispatch transaction response" ex))]) - (comms-dispatch-msg/retry comms from-id resp))) + (match data + [(file-transfer:local id port) + (handle-local-ft from-id trans-id port)] + [_ (define resp + (msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data))) + (with-handlers ([exn:fail? + (lambda (ex) + (logging-report-error ex "failed to send transaction response"))]) + (comms-dispatch-msg/retry comms from-id resp))])) (respond (with-handlers ([exn:fail? identity]) (define arg-data (trans-data-deserialize rpc-data)) (define result - (parameterize ([current-from-node (comms-get-node-info comms from-id)]) + (parameterize ([current-from-node (comms-get-node-info comms from-id)] + [current-trans-id trans-id]) ;; TODO : apply timeout on the handler function? ;; we don't want this thread to potentially hang forever if there's some sort of ;; deadlock @@ -458,7 +540,6 @@ (thread-send from (void) #f)] ['deregister-rpc (hash-remove! rpc-table data) (thread-send from (void) #f)] - ['deregister-response (hash-remove! response-table data) (thread-send from (void) #f)] ['transact (match-define (cons to-id (cons rpc-id rpc-data)) data) (send-transaction from to-id rpc-id rpc-data)] @@ -470,12 +551,11 @@ [(msg:transaction from-id trans-id #t rpc-id rpc-data) (match (hash-ref rpc-table rpc-id #f) [#f (log-tm-warning "got unknown rpc req: ~a" msg)] - [func (thread (lambda () (handle-incoming-transaction func msg)))])] - [(msg:transaction from-id trans-id #f rpc-id rpc-data) - (define key (cons from-id trans-id)) - (match (hash-ref response-table key #f) - [#f (log-tm-warning "got spurious transaction response: ~a" msg)] - [thd (thread-send thd rpc-data #f) (hash-remove! response-table key)])] + [func + (define thd (thread (lambda () (handle-incoming-transaction func msg)))) + (dispatch-table-add! from-id trans-id thd)])] + [(msg:stream from-id trans-id) + (dispatch-table-dispatch from-id trans-id msg)] [_ (log-tm-warning "got unknown msg: ~a" msg)])) ;; it's a thread cell and i'm too lazy to add a parameterize clause... it should work @@ -521,6 +601,7 @@ (define current-tm (make-parameter #f)) (define current-to-node (make-parameter #f)) (define current-from-node (make-parameter #f)) +(define current-trans-id (make-parameter #f)) ;; this is entirely a proc macro because idk how to do this in any fancier way diff --git a/crossfire/logging.rkt b/crossfire/logging.rkt index 701a5de..0ecf6af 100644 --- a/crossfire/logging.rkt +++ b/crossfire/logging.rkt @@ -18,10 +18,13 @@ (require racket/bool racket/date racket/format racket/match racket/string) -(provide global-logger install-logging!) +(provide global-logger install-logging! logging-report-error) (define global-logger (make-logger)) +(define (logging-report-error ex [msg #f]) + ((error-display-handler) (or msg (exn-message ex)) ex)) + (define (recv-thd receiver stop-chan) ;; iso8601 gang (date-display-format 'iso-8601) diff --git a/crossfire/server.rkt b/crossfire/server.rkt index 8e1b447..9e6ac95 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -556,7 +556,7 @@ #f)) (when maybe-exn (log-server-error "agent ~a encountered error" id) - ((error-display-handler) (exn-message maybe-exn) maybe-exn) + (logging-report-error maybe-exn) (sleep retry-delay) (init-loop (min *max-retry-delay* (* *retry-delay-ratio* retry-delay)))))) @@ -581,13 +581,13 @@ ;; send agent rpc (define (handle-failure ex) (log-server-error "failed to push ~a to agent ~a" aid id) - ((error-display-handler) (exn-message ex) ex) + (logging-report-error ex) (task-unassign! ts id) ;; attempt to cancel it (with-handlers ([exn:fail? (lambda (ex) (log-server-error "failed to cancel ~a after error on ~a" aid id) - ((error-display-handler) (exn-message ex) ex))]) + (logging-report-error ex))]) (cancel-assignment aid)) #t) (with-handlers ([exn:fail? handle-failure]) @@ -607,7 +607,7 @@ (lambda (ex) (log-server-error "failed to cancel ~a on ~a" (assignment-id assignment) id) - ((error-display-handler) (exn-message ex) ex))]) + (logging-report-error ex))]) (cancel-assignment (assignment-id assignment)))) ;; records a match for a certain assignment