implement streaming file transfers (kind of)

This commit is contained in:
xenia 2021-01-03 23:13:26 -05:00
parent f6cb069edb
commit 68b9984900
3 changed files with 99 additions and 29 deletions

View File

@ -145,7 +145,7 @@
messages)
(define (cmd-submit project-dir)
(define (cmd-submit project-dir [progress void])
(define mf (read-manifest project-dir))
;; create targz
@ -248,10 +248,10 @@
(with-server-connection
(delete-agent id)))
(define (cmd-get-deployment id out-port)
(define (cmd-get-deployment id out-port [progress-func void])
(with-server-connection
;; TODO : streaming
(write-bytes (get-agent-deployment id) out-port)
(define ft (get-agent-deployment id))
(file-transfer-connect ft out-port progress-func)
(void)))
@ -297,6 +297,15 @@
(write-string (~a item #:width (+ 2 width))))
(write-string "\n")))
(define (print-progress a b)
(define pct (quotient (* 100 a) b))
;; TODO : xterm/vt100/etc-specific
;; this should use a library maybe
(printf "\r\x1b[Ktransferred ~a% [~a/~a]" pct a b)
(when (= a b)
(printf "\n"))
(flush-output))
(define (make-random-filename)
(string-append (bytes->hex-string (crypto-random-bytes 8)) ".agent"))
@ -348,7 +357,7 @@
(interactive-check)
;; do submit
(report-status "submitting project...!")
(cmd-submit (current-directory))
(cmd-submit (current-directory) print-progress)
(report-status "project submitted!! time for crab"))
(subcommand (delete "Delete an executed or completed project")
@ -412,7 +421,7 @@
(error "invalid agent id provided"))
(define out-name (make-random-filename))
(call-with-output-file out-name
(lambda (o) (cmd-get-deployment aid o)))
(lambda (o) (cmd-get-deployment aid o print-progress)))
(do-final out-name)]
[_ (error "you must provide -l, -c, -d, or -g for this command")]))

View File

@ -40,22 +40,24 @@
;; port: an input port if local
;; chan: an async-channel if nonlocal
(struct file-transfer [id])
(struct file-transfer:local file-transfer [port])
(struct file-transfer:local file-transfer [port progress])
(struct file-transfer:remote file-transfer [chan size])
(define (make-file-transfer port)
(define (make-file-transfer port [progress void])
(unless (current-trans-id)
(error "not in a transaction!"))
(file-transfer:local (current-trans-id) port))
(file-transfer:local (current-trans-id) port progress))
(define (make-nonlocal-file-transfer trans-id size)
(file-transfer:remote trans-id (make-async-channel) size))
;; connects a remote file transfer to an output port
;; but also errors if there was an error
(define (file-transfer-connect ft out-port)
(let loop ()
(define (file-transfer-connect ft out-port [progress void])
(define total (file-transfer:remote-size ft))
(let loop ([written 0])
(progress written total)
(match (async-channel-get (file-transfer:remote-chan ft))
[(== eof) (void)]
[(== eof) (progress total total)]
[(? bytes? b) (write-bytes b out-port)
(loop)]
(loop (+ written (bytes-length b)))]
[(? exn:fail? e) (raise e)]
[x (error "unexpected file transfer data" x)])))
(define file-transfer-size file-transfer:remote-size)
@ -459,7 +461,51 @@
(constructor message (current-continuation-marks))]
[x x]))
(define (recv-transaction from to-id rpc-id transaction)
(define (handle-remote-ft from-thd to-id trans-id size)
(log-tm-info "downloading file ~a" trans-id)
(define ft-obj (make-nonlocal-file-transfer trans-id size))
(define ft-chan (file-transfer:remote-chan ft-obj))
(thread-send from-thd ft-obj #f)
(with-handlers ([exn:fail? (lambda (ex) (async-channel-put ft-chan ex))]
[exn:break?
(lambda (_)
(async-channel-put ft-chan (make-error "transaction manager shutdown")))])
(let loop ([offs 0]
[last-hard-time (current-seconds-monotonic)]
[last-soft-time (current-seconds-monotonic)])
(define evt (thread-receive-evt))
(define now (current-seconds-monotonic))
(define hard-timeout (- (+ now *file-transfer-idle-timeout*) last-hard-time))
(define soft-timeout (- (+ now *file-transfer-soft-timeout*) last-soft-time))
(match (sync/timeout (min hard-timeout soft-timeout) evt)
[#f (cond
[(< hard-timeout soft-timeout)
(log-tm-warning "file transfer hard timeout ~a" trans-id)
(async-channel-put ft-chan (make-error "file transfer timeout"))
(kill-thread (current-thread))]
[else
(comms-dispatch-msg/retry comms to-id
(msg:file-request (node-id my-node) trans-id offs))
(loop offs last-hard-time (current-seconds-monotonic))])]
[(== evt)
(match (thread-receive)
[(msg:file _ _ (== offs) (? bytes? data))
(async-channel-put ft-chan data)
(define now (current-seconds-monotonic))
(define next-offs (+ offs (bytes-length data)))
(unless (= next-offs size)
(loop next-offs now now))]
[(msg:file _ _ _ _)
(comms-dispatch-msg/retry comms to-id
(msg:file-request (node-id my-node) trans-id offs))
(define now (current-seconds-monotonic))
(loop offs now now)]
[x (log-tm-warning "remote file transfer ~a send invalid msg ~a" trans-id x)])]))
(comms-dispatch-msg/retry comms to-id (msg:file-request (node-id my-node) trans-id #f))
(async-channel-put ft-chan eof)
(log-tm-info "remote file transfer ~a complete" trans-id)))
(define (recv-transaction from to-id trans-id rpc-id transaction)
(break-enabled #t)
(with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))]
[exn:break? (lambda (_)
@ -474,7 +520,7 @@
(match (thread-receive)
[(msg:transaction _ _ #f (== rpc-id) response)
(thread-send from (trans-data-deserialize response) #f)]
[(msg:file-token _ _ size) (void "TODO: receive file")]
[(msg:file-token _ _ size) (handle-remote-ft from to-id trans-id size)]
[x (error "got invalid response data" x)])])))
(define (send-transaction from to-id rpc-id rpc-data)
@ -482,29 +528,41 @@
(define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id
(trans-data-serialize rpc-data)))
(define response-thread
(thread (lambda () (recv-transaction from to-id rpc-id transaction))))
(thread (lambda () (recv-transaction from to-id trans-id rpc-id transaction))))
(dispatch-table-add! to-id trans-id response-thread))
(define (handle-local-ft from-id trans-id port)
(define (handle-local-ft from-id trans-id port progress)
(break-enabled #t)
(log-tm-info "starting file transfer ~a" trans-id)
;; TODO : on break, notify the remote endpoint that we're shutting down
;; it's not super important, remote will time out eventually
(with-handlers ([exn? (lambda (ex) (close-input-port port) (raise ex))])
;; only supported for file and string ports
;; but we do need to know the length beforehand, and the ability to seek
(file-position port eof)
(define port-len (file-position port))
(file-position port 0)
(comms-dispatch-msg/retry comms from-id (msg:file-token (node-id my-node) trans-id port-len))
(define bstr (make-bytes *file-transfer-chunk-size*))
(let loop ([offs 0])
(progress offs port-len)
(define thread-evt (thread-receive-evt))
(match (sync/timeout *file-transfer-idle-timeout* port thread-evt)
(define port-evt (if offs port never-evt))
(match (sync/timeout *file-transfer-idle-timeout* port-evt thread-evt)
[#f (log-tm-warning "file transfer ~a hit timeout" trans-id)]
[(== port)
(match (read-bytes-avail! bstr)
;; TODO
[(== eof) (void)]
[n (void)])]
[(== thread-evt) (void "TODO")]))
;; wait for completion req
(file-position port offs)
(match (read-bytes-avail! bstr port)
[(== eof)
(log-tm-info "file transfer ~a waiting for confirmation" trans-id)
(loop #f)]
[n (comms-dispatch-msg/retry
comms from-id (msg:file (node-id my-node) trans-id offs (subbytes bstr 0 n)))
(loop (+ offs n))])]
[(== thread-evt)
(match (thread-receive)
[(msg:file-request _ _ #f) (log-tm-info "file transfer ~a complete" trans-id)]
[(msg:file-request _ _ new-offs) (loop new-offs)]
[x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])]))
(void)))
(define (handle-incoming-transaction func msg)
@ -512,8 +570,8 @@
(define (respond data)
(match data
[(file-transfer:local id port)
(handle-local-ft from-id trans-id port)]
[(file-transfer:local id port progress)
(handle-local-ft from-id trans-id port progress)]
[_ (define resp
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
(with-handlers ([exn:fail?

View File

@ -280,12 +280,15 @@
(void))
(define/contract (get-agent-deployment id)
(-> integer? bytes?)
(-> integer? file-transfer?)
;; TODO : streaming interface
(enforce-subject 'client)
(define-values [agent-node arch] (load-comms-node id #t #t))
(match (node-type agent-node)
['agent (configure-agent-binary agent-node arch (current-server-public-node))]
['agent
(define binary (configure-agent-binary agent-node arch (current-server-public-node)))
(define port (open-input-bytes binary))
(make-file-transfer port)]
[_ (error "invalid node type")]))
(define/contract (delete-agent id)