implement file streaming for project uploads

This commit is contained in:
xenia 2021-01-04 02:58:59 -05:00
parent 3a36cae048
commit ce3866f236
3 changed files with 130 additions and 72 deletions

View File

@ -161,7 +161,8 @@
[_ (error "not shonks...")]))))
(with-server-connection
(new-project (serialize-manifest mf) (file->bytes tmp-targz))))
(define ft (make-file-transfer (open-input-file tmp-targz) progress))
(new-project ft)))
(define (cmd-delete id)

View File

@ -31,7 +31,7 @@
(struct msg:meow msg [meow] #:prefab)
(struct msg:stream msg [trans-id] #:prefab)
(struct msg:transaction msg:stream [request? rpc-id data] #:prefab)
(struct msg:file-token msg:stream [size] #:prefab)
(struct file-token [size] #:prefab)
(struct msg:file-request msg:stream [offset] #:prefab)
(struct msg:file msg:stream [offset data] #:prefab)
@ -43,8 +43,6 @@
(struct file-transfer:local file-transfer [port progress])
(struct file-transfer:remote file-transfer [chan size])
(define (make-file-transfer port [progress void])
(unless (current-trans-id)
(error "not in a transaction!"))
(file-transfer:local (current-trans-id) port progress))
(define (make-nonlocal-file-transfer trans-id size)
(file-transfer:remote trans-id (make-async-channel) size))
@ -326,6 +324,9 @@
(match (hash-ref peer-registry peer-id #f)
[#f (thread-send from (make-error "no such peer connection") #f)]
[thd
;; TODO : this is kind of a problem t b h
;; we need this to block the requester until the queues are flushed
;; this could be solved by passing the originating thread to the peer handler
(match (thread-send thd msg #f)
[#f (kill-thread thd) (hash-remove! peer-registry peer-id)
(thread-send from (make-error "failed to dispatch to thread") #f)]
@ -505,50 +506,65 @@
(async-channel-put ft-chan eof)
(log-tm-info "remote file transfer ~a complete" trans-id)))
(define (recv-transaction from to-id trans-id rpc-id transaction)
(define (recv-transaction from to-id trans-id rpc-id rpc-data)
(break-enabled #t)
(with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))]
[exn:break? (lambda (_)
(thread-send
from (make-error "transaction manager shutdown") #f))])
(comms-dispatch-msg/retry comms to-id transaction)
(match (sync/timeout *transaction-timeout* (thread-receive-evt))
[#f
(log-tm-error "timeout sending transaction to ~a" to-id)
(thread-send from (make-error "transaction timeout") #f)]
(match rpc-data
[(list (? file-transfer? ft))
(handle-local-ft to-id trans-id rpc-id (file-transfer:local-port ft)
(file-transfer:local-progress ft) from)]
[_
(match (thread-receive)
[(msg:transaction _ _ #f (== rpc-id) response)
(thread-send from (trans-data-deserialize response) #f)]
[(msg:file-token _ _ size) (handle-remote-ft from to-id trans-id size)]
[x (error "got invalid response data" x)])])))
(comms-dispatch-msg/retry
comms to-id (msg:transaction (node-id my-node) trans-id #t rpc-id
(trans-data-serialize rpc-data)))
(match (sync/timeout *transaction-timeout* (thread-receive-evt))
[#f
(log-tm-error "timeout sending transaction to ~a" to-id)
(thread-send from (make-error "transaction timeout") #f)]
[_
(match (thread-receive)
[(msg:transaction _ _ #f (== rpc-id) (? file-token? tok))
(handle-remote-ft from to-id trans-id (file-token-size tok))]
[(msg:transaction _ _ #f (== rpc-id) response)
(thread-send from (trans-data-deserialize response) #f)]
[x (error "got invalid response data" x)])])])))
(define (send-transaction from to-id rpc-id rpc-data)
(define trans-id (make-trans-id))
(define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id
(trans-data-serialize rpc-data)))
(define response-thread
(thread (lambda () (recv-transaction from to-id trans-id rpc-id transaction))))
(thread (lambda () (recv-transaction from to-id trans-id rpc-id rpc-data))))
(dispatch-table-add! to-id trans-id response-thread))
(define (handle-local-ft from-id trans-id port progress)
(define (handle-local-ft from-id trans-id rpc-id port progress [from-thd #f])
(break-enabled #t)
(log-tm-info "starting file transfer ~a" trans-id)
;; TODO : on break, notify the remote endpoint that we're shutting down
;; it's not super important, remote will time out eventually
(with-handlers ([exn? (lambda (ex) (close-input-port port) (raise ex))])
(with-handlers ([exn? (lambda (ex)
(and from-thd (thread-send from-thd (make-error "file transfer error") #f))
(close-input-port port)
(raise ex))])
;; only supported for file and string ports
;; but we do need to know the length beforehand, and the ability to seek
(file-position port eof)
(define port-len (file-position port))
(comms-dispatch-msg/retry comms from-id (msg:file-token (node-id my-node) trans-id port-len))
(comms-dispatch-msg/retry
comms from-id
(msg:transaction (node-id my-node) trans-id (if from-thd #t #f) rpc-id (file-token port-len)))
(define bstr (make-bytes *file-transfer-chunk-size*))
(let loop ([offs 0])
(progress offs port-len)
(if offs
(progress offs port-len)
(progress port-len port-len))
(define thread-evt (thread-receive-evt))
(define port-evt (if offs port never-evt))
(match (sync/timeout *file-transfer-idle-timeout* port-evt thread-evt)
[#f (log-tm-warning "file transfer ~a hit timeout" trans-id)]
[#f
(log-tm-warning "file transfer ~a hit timeout" trans-id)
(and from-thd (thread-send from-thd (make-error "file transfer timeout") #f))]
[(== port)
(file-position port offs)
(match (read-bytes-avail! bstr port)
@ -560,9 +576,23 @@
(loop (+ offs n))])]
[(== thread-evt)
(match (thread-receive)
[(msg:transaction _ _ #f (== rpc-id) response)
(and from-thd (thread-send from-thd (trans-data-deserialize response) #f))
(set! from-thd #f)]
[(msg:file-request _ _ #f)
(close-input-port port)
(log-tm-info "file transfer ~a complete" trans-id)]
(log-tm-info "file transfer ~a complete" trans-id)
;; wait for RPC-level response message, if we're still waiting
(when from-thd
(match (sync/timeout *transaction-timeout* (thread-receive-evt))
[#f
(log-tm-error "timeout sending transaction to ~a" from-id)
(thread-send from-thd (make-error "transaction timeout") #f)]
[_
(match (thread-receive)
[(msg:transaction _ _ #f (== rpc-id) response)
(thread-send from-thd (trans-data-deserialize response) #f)]
[x (error "got invalid response data" x)])]))]
[(msg:file-request _ _ new-offs) (loop new-offs)]
[x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])]))
(void)))
@ -573,7 +603,7 @@
(define (respond data)
(match data
[(file-transfer:local id port progress)
(handle-local-ft from-id trans-id port progress)]
(handle-local-ft from-id trans-id rpc-id port progress)]
[_ (define resp
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
(with-handlers ([exn:fail?
@ -581,17 +611,29 @@
(logging-report-error ex "failed to send transaction response"))])
(comms-dispatch-msg/retry comms from-id resp))]))
(respond
(with-handlers ([exn:fail? identity])
(define arg-data (trans-data-deserialize rpc-data))
(define result
(parameterize ([current-from-node (comms-get-node-info comms from-id)]
[current-trans-id trans-id])
;; TODO : apply timeout on the handler function?
;; we don't want this thread to potentially hang forever if there's some sort of
;; deadlock
(apply func arg-data)))
result)))
(define (apply-func arg-data)
(respond
(with-handlers ([exn:fail? identity])
(define result
(parameterize ([current-from-node (comms-get-node-info comms from-id)]
[current-trans-id trans-id])
;; TODO : apply timeout on the handler function?
;; we don't want this thread to potentially hang forever if there's some sort of
;; deadlock
(apply func arg-data)))
result)))
(match rpc-data
[(file-token size)
(define apply-thd
(thread
(lambda ()
(apply-func (list (thread-receive))))))
(handle-remote-ft apply-thd from-id trans-id size)]
[_
(define arg-data (trans-data-deserialize rpc-data))
(apply-func arg-data)]))
(define (handle-thread-msg)
(match-define (cons from (cons type data)) (thread-receive))

View File

@ -17,7 +17,7 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file
data/queue file/untgz racket/async-channel racket/bool racket/contract racket/fasl racket/file
racket/function racket/list racket/logging racket/match racket/path racket/random
racket/runtime-path racket/set racket/string racket/unit
north/base north/adapter/base north/adapter/sqlite
@ -154,17 +154,6 @@
(delete-file subpath)))
(void))
;; commits a file corresponding to the task
(define (server-commit-file taskid data)
(define path (get-project-file-path taskid))
(call-with-output-file path
(lambda (out)
(write-bytes data out)
(port-fsync out))
#:mode 'binary
#:exists 'truncate)
(query-exec (current-db) q-set-task-commit taskid))
;; computes a hash of the file identifying its current contents for agents
;; (in case we reuse taskids)
(define (server-hash-file taskid)
@ -225,14 +214,6 @@
[(list _ ... "linux" "gnu") (configure.linux-gnu)]
[_ (error "XXX: don't know how to configure arch" agent-arch)]))
;; manifest is the raw form
(define (make-task manifest tar)
(define manifest-data (s-exp->fasl manifest))
(define name (second (assoc 'name manifest)))
(define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar)
id)
(define (enforce-subject type)
;; override if the from-node is us
(unless (or (symbol=? type (node-type (current-from-node)))
@ -307,16 +288,57 @@
;; client rpcs :: projects
(define/contract (new-project manifest tar)
(-> list? bytes? integer?)
;; TODO : streaming interface
(define/contract (new-project upload-ft)
(-> file-transfer? integer?)
(enforce-subject 'client)
;; check validity
(define mf-parsed (parse-manifest manifest))
(define id (make-task manifest tar))
;; notify agent handler
(agent-handler-new-task id mf-parsed)
id)
(log-server-info "new project upload")
(define out-file (make-temporary-file))
(define extract-dir (make-temporary-file "rkttmp~a" 'directory))
(with-handlers ([exn? (lambda (ex)
(delete-directory/files out-file)
(delete-directory/files extract-dir)
(raise ex))])
(define out-port (open-output-file out-file #:exists 'truncate))
(file-transfer-connect upload-ft out-port)
(port-fsync out-port)
(close-output-port out-port)
(log-server-info "file uploaded, processing")
;; extract manifest
(define mf-file (build-path "manifest.rktd"))
(parameterize ([current-directory extract-dir])
(untgz out-file #:filter (lambda (path dst type size target modtime perms)
(define exploded (explode-path (build-path path)))
(and (symbol=? type 'file)
;; tar format is weird
(or (equal? (list 'same mf-file) exploded)
(equal? (list mf-file) exploded))))))
(define mf-data (call-with-input-file (build-path extract-dir mf-file) read))
(define mf-fasl (s-exp->fasl mf-data))
(delete-directory/files extract-dir)
;; check validity of manifest
(define mf (parse-manifest mf-data))
(log-server-info "read manifest")
;; add to database, and get an id
(define name (first (manifest-data-ref mf 'name)))
(define id (query/insert-id (current-db) q-new-task name mf-fasl))
(log-server-info "finalizing upload")
;; move upload to id
(define path (get-project-file-path id))
(rename-file-or-directory out-file path #t)
;; mark upload as committed
(query-exec (current-db) q-set-task-commit id)
;; notify agent handler
(agent-handler-new-task id mf)
(log-server-info "done")
id))
(define (get-projects)
(enforce-subject 'client)
@ -989,13 +1011,6 @@
(with-output-to-file (build-path *state-root* "client0.rktd")
(lambda () (write data)))
(exit)]
[(vector "dev-new-project" tgz)
(define id (make-task '((name "test project") (arch "any")
(command "./test.sh")
(resources "cpu") (pattern "meow?d?d"))
(file->bytes tgz)))
(log-server-info "created project")
(exit)]
[(vector subcmd _ ...)
(error "invalid subcommand" subcmd)]
[argv (void)])