diff --git a/crossfire/client.rkt b/crossfire/client.rkt index cbaffc6..dc3a270 100644 --- a/crossfire/client.rkt +++ b/crossfire/client.rkt @@ -161,7 +161,8 @@ [_ (error "not shonks...")])))) (with-server-connection - (new-project (serialize-manifest mf) (file->bytes tmp-targz)))) + (define ft (make-file-transfer (open-input-file tmp-targz) progress)) + (new-project ft))) (define (cmd-delete id) diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 261928e..d231739 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -31,7 +31,7 @@ (struct msg:meow msg [meow] #:prefab) (struct msg:stream msg [trans-id] #:prefab) (struct msg:transaction msg:stream [request? rpc-id data] #:prefab) -(struct msg:file-token msg:stream [size] #:prefab) +(struct file-token [size] #:prefab) (struct msg:file-request msg:stream [offset] #:prefab) (struct msg:file msg:stream [offset data] #:prefab) @@ -43,8 +43,6 @@ (struct file-transfer:local file-transfer [port progress]) (struct file-transfer:remote file-transfer [chan size]) (define (make-file-transfer port [progress void]) - (unless (current-trans-id) - (error "not in a transaction!")) (file-transfer:local (current-trans-id) port progress)) (define (make-nonlocal-file-transfer trans-id size) (file-transfer:remote trans-id (make-async-channel) size)) @@ -326,6 +324,9 @@ (match (hash-ref peer-registry peer-id #f) [#f (thread-send from (make-error "no such peer connection") #f)] [thd + ;; TODO : this is kind of a problem t b h + ;; we need this to block the requester until the queues are flushed + ;; this could be solved by passing the originating thread to the peer handler (match (thread-send thd msg #f) [#f (kill-thread thd) (hash-remove! peer-registry peer-id) (thread-send from (make-error "failed to dispatch to thread") #f)] @@ -505,50 +506,65 @@ (async-channel-put ft-chan eof) (log-tm-info "remote file transfer ~a complete" trans-id))) - (define (recv-transaction from to-id trans-id rpc-id transaction) + (define (recv-transaction from to-id trans-id rpc-id rpc-data) (break-enabled #t) (with-handlers ([exn:fail? (lambda (ex) (thread-send from ex #f))] [exn:break? (lambda (_) (thread-send from (make-error "transaction manager shutdown") #f))]) - (comms-dispatch-msg/retry comms to-id transaction) - (match (sync/timeout *transaction-timeout* (thread-receive-evt)) - [#f - (log-tm-error "timeout sending transaction to ~a" to-id) - (thread-send from (make-error "transaction timeout") #f)] + (match rpc-data + [(list (? file-transfer? ft)) + (handle-local-ft to-id trans-id rpc-id (file-transfer:local-port ft) + (file-transfer:local-progress ft) from)] [_ - (match (thread-receive) - [(msg:transaction _ _ #f (== rpc-id) response) - (thread-send from (trans-data-deserialize response) #f)] - [(msg:file-token _ _ size) (handle-remote-ft from to-id trans-id size)] - [x (error "got invalid response data" x)])]))) + (comms-dispatch-msg/retry + comms to-id (msg:transaction (node-id my-node) trans-id #t rpc-id + (trans-data-serialize rpc-data))) + (match (sync/timeout *transaction-timeout* (thread-receive-evt)) + [#f + (log-tm-error "timeout sending transaction to ~a" to-id) + (thread-send from (make-error "transaction timeout") #f)] + [_ + (match (thread-receive) + [(msg:transaction _ _ #f (== rpc-id) (? file-token? tok)) + (handle-remote-ft from to-id trans-id (file-token-size tok))] + [(msg:transaction _ _ #f (== rpc-id) response) + (thread-send from (trans-data-deserialize response) #f)] + [x (error "got invalid response data" x)])])]))) (define (send-transaction from to-id rpc-id rpc-data) (define trans-id (make-trans-id)) - (define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id - (trans-data-serialize rpc-data))) (define response-thread - (thread (lambda () (recv-transaction from to-id trans-id rpc-id transaction)))) + (thread (lambda () (recv-transaction from to-id trans-id rpc-id rpc-data)))) (dispatch-table-add! to-id trans-id response-thread)) - (define (handle-local-ft from-id trans-id port progress) + (define (handle-local-ft from-id trans-id rpc-id port progress [from-thd #f]) (break-enabled #t) (log-tm-info "starting file transfer ~a" trans-id) ;; TODO : on break, notify the remote endpoint that we're shutting down ;; it's not super important, remote will time out eventually - (with-handlers ([exn? (lambda (ex) (close-input-port port) (raise ex))]) + (with-handlers ([exn? (lambda (ex) + (and from-thd (thread-send from-thd (make-error "file transfer error") #f)) + (close-input-port port) + (raise ex))]) ;; only supported for file and string ports ;; but we do need to know the length beforehand, and the ability to seek (file-position port eof) (define port-len (file-position port)) - (comms-dispatch-msg/retry comms from-id (msg:file-token (node-id my-node) trans-id port-len)) + (comms-dispatch-msg/retry + comms from-id + (msg:transaction (node-id my-node) trans-id (if from-thd #t #f) rpc-id (file-token port-len))) (define bstr (make-bytes *file-transfer-chunk-size*)) (let loop ([offs 0]) - (progress offs port-len) + (if offs + (progress offs port-len) + (progress port-len port-len)) (define thread-evt (thread-receive-evt)) (define port-evt (if offs port never-evt)) (match (sync/timeout *file-transfer-idle-timeout* port-evt thread-evt) - [#f (log-tm-warning "file transfer ~a hit timeout" trans-id)] + [#f + (log-tm-warning "file transfer ~a hit timeout" trans-id) + (and from-thd (thread-send from-thd (make-error "file transfer timeout") #f))] [(== port) (file-position port offs) (match (read-bytes-avail! bstr port) @@ -560,9 +576,23 @@ (loop (+ offs n))])] [(== thread-evt) (match (thread-receive) + [(msg:transaction _ _ #f (== rpc-id) response) + (and from-thd (thread-send from-thd (trans-data-deserialize response) #f)) + (set! from-thd #f)] [(msg:file-request _ _ #f) (close-input-port port) - (log-tm-info "file transfer ~a complete" trans-id)] + (log-tm-info "file transfer ~a complete" trans-id) + ;; wait for RPC-level response message, if we're still waiting + (when from-thd + (match (sync/timeout *transaction-timeout* (thread-receive-evt)) + [#f + (log-tm-error "timeout sending transaction to ~a" from-id) + (thread-send from-thd (make-error "transaction timeout") #f)] + [_ + (match (thread-receive) + [(msg:transaction _ _ #f (== rpc-id) response) + (thread-send from-thd (trans-data-deserialize response) #f)] + [x (error "got invalid response data" x)])]))] [(msg:file-request _ _ new-offs) (loop new-offs)] [x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])])) (void))) @@ -573,7 +603,7 @@ (define (respond data) (match data [(file-transfer:local id port progress) - (handle-local-ft from-id trans-id port progress)] + (handle-local-ft from-id trans-id rpc-id port progress)] [_ (define resp (msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data))) (with-handlers ([exn:fail? @@ -581,17 +611,29 @@ (logging-report-error ex "failed to send transaction response"))]) (comms-dispatch-msg/retry comms from-id resp))])) - (respond - (with-handlers ([exn:fail? identity]) - (define arg-data (trans-data-deserialize rpc-data)) - (define result - (parameterize ([current-from-node (comms-get-node-info comms from-id)] - [current-trans-id trans-id]) - ;; TODO : apply timeout on the handler function? - ;; we don't want this thread to potentially hang forever if there's some sort of - ;; deadlock - (apply func arg-data))) - result))) + (define (apply-func arg-data) + (respond + (with-handlers ([exn:fail? identity]) + (define result + (parameterize ([current-from-node (comms-get-node-info comms from-id)] + [current-trans-id trans-id]) + ;; TODO : apply timeout on the handler function? + ;; we don't want this thread to potentially hang forever if there's some sort of + ;; deadlock + (apply func arg-data))) + result))) + + (match rpc-data + [(file-token size) + (define apply-thd + (thread + (lambda () + (apply-func (list (thread-receive)))))) + (handle-remote-ft apply-thd from-id trans-id size)] + [_ + (define arg-data (trans-data-deserialize rpc-data)) + (apply-func arg-data)])) + (define (handle-thread-msg) (match-define (cons from (cons type data)) (thread-receive)) diff --git a/crossfire/server.rkt b/crossfire/server.rkt index debbd07..124af53 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -17,7 +17,7 @@ ;; along with this program. If not, see . (require db/base db/sqlite3 - data/queue racket/async-channel racket/bool racket/contract racket/fasl racket/file + data/queue file/untgz racket/async-channel racket/bool racket/contract racket/fasl racket/file racket/function racket/list racket/logging racket/match racket/path racket/random racket/runtime-path racket/set racket/string racket/unit north/base north/adapter/base north/adapter/sqlite @@ -154,17 +154,6 @@ (delete-file subpath))) (void)) -;; commits a file corresponding to the task -(define (server-commit-file taskid data) - (define path (get-project-file-path taskid)) - (call-with-output-file path - (lambda (out) - (write-bytes data out) - (port-fsync out)) - #:mode 'binary - #:exists 'truncate) - (query-exec (current-db) q-set-task-commit taskid)) - ;; computes a hash of the file identifying its current contents for agents ;; (in case we reuse taskids) (define (server-hash-file taskid) @@ -225,14 +214,6 @@ [(list _ ... "linux" "gnu") (configure.linux-gnu)] [_ (error "XXX: don't know how to configure arch" agent-arch)])) -;; manifest is the raw form -(define (make-task manifest tar) - (define manifest-data (s-exp->fasl manifest)) - (define name (second (assoc 'name manifest))) - (define id (query/insert-id (current-db) q-new-task name manifest-data)) - (server-commit-file id tar) - id) - (define (enforce-subject type) ;; override if the from-node is us (unless (or (symbol=? type (node-type (current-from-node))) @@ -307,16 +288,57 @@ ;; client rpcs :: projects -(define/contract (new-project manifest tar) - (-> list? bytes? integer?) - ;; TODO : streaming interface +(define/contract (new-project upload-ft) + (-> file-transfer? integer?) (enforce-subject 'client) - ;; check validity - (define mf-parsed (parse-manifest manifest)) - (define id (make-task manifest tar)) - ;; notify agent handler - (agent-handler-new-task id mf-parsed) - id) + (log-server-info "new project upload") + (define out-file (make-temporary-file)) + (define extract-dir (make-temporary-file "rkttmp~a" 'directory)) + (with-handlers ([exn? (lambda (ex) + (delete-directory/files out-file) + (delete-directory/files extract-dir) + (raise ex))]) + (define out-port (open-output-file out-file #:exists 'truncate)) + (file-transfer-connect upload-ft out-port) + (port-fsync out-port) + (close-output-port out-port) + (log-server-info "file uploaded, processing") + + ;; extract manifest + (define mf-file (build-path "manifest.rktd")) + (parameterize ([current-directory extract-dir]) + (untgz out-file #:filter (lambda (path dst type size target modtime perms) + (define exploded (explode-path (build-path path))) + (and (symbol=? type 'file) + ;; tar format is weird + (or (equal? (list 'same mf-file) exploded) + (equal? (list mf-file) exploded)))))) + (define mf-data (call-with-input-file (build-path extract-dir mf-file) read)) + (define mf-fasl (s-exp->fasl mf-data)) + (delete-directory/files extract-dir) + + ;; check validity of manifest + (define mf (parse-manifest mf-data)) + + (log-server-info "read manifest") + + ;; add to database, and get an id + (define name (first (manifest-data-ref mf 'name))) + (define id (query/insert-id (current-db) q-new-task name mf-fasl)) + + (log-server-info "finalizing upload") + + ;; move upload to id + (define path (get-project-file-path id)) + (rename-file-or-directory out-file path #t) + + ;; mark upload as committed + (query-exec (current-db) q-set-task-commit id) + + ;; notify agent handler + (agent-handler-new-task id mf) + (log-server-info "done") + id)) (define (get-projects) (enforce-subject 'client) @@ -989,13 +1011,6 @@ (with-output-to-file (build-path *state-root* "client0.rktd") (lambda () (write data))) (exit)] - [(vector "dev-new-project" tgz) - (define id (make-task '((name "test project") (arch "any") - (command "./test.sh") - (resources "cpu") (pattern "meow?d?d")) - (file->bytes tgz))) - (log-server-info "created project") - (exit)] [(vector subcmd _ ...) (error "invalid subcommand" subcmd)] [argv (void)])