From 3a36cae04807d75267f7f459734d1c6ab52a0e71 Mon Sep 17 00:00:00 2001 From: haskal Date: Mon, 4 Jan 2021 00:04:13 -0500 Subject: [PATCH] implement file streaming in agent --- crossfire/agent.rkt | 7 ++++--- crossfire/client.rkt | 7 +++++-- crossfire/comms.rkt | 4 +++- crossfire/server.rkt | 9 +++++---- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/crossfire/agent.rkt b/crossfire/agent.rkt index 09f7752..f7e96e4 100644 --- a/crossfire/agent.rkt +++ b/crossfire/agent.rkt @@ -64,9 +64,10 @@ (with-handlers ([exn:fail? (lambda (ex) (logging-report-error ex) (cleanup))]) - ;; TODO this should be updated with the streaming interface - (call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)) - #:exists 'truncate) + (call-with-output-file + tgz-file + (lambda (out) (define ft (get-project-file tid)) (file-transfer-connect ft out)) + #:exists 'truncate) (log-agent-info "extracting task data for ~a" tid) (untgz tgz-file #:dest extract-dir) (set-box! success-box #t))) diff --git a/crossfire/client.rkt b/crossfire/client.rkt index 5bf4c86..cbaffc6 100644 --- a/crossfire/client.rkt +++ b/crossfire/client.rkt @@ -302,10 +302,11 @@ ;; TODO : xterm/vt100/etc-specific ;; this should use a library maybe (printf "\r\x1b[Ktransferred ~a% [~a/~a]" pct a b) - (when (= a b) - (printf "\n")) (flush-output)) + (define (finish-progress) + (printf "\n")) + (define (make-random-filename) (string-append (bytes->hex-string (crypto-random-bytes 8)) ".agent")) @@ -358,6 +359,7 @@ ;; do submit (report-status "submitting project...!") (cmd-submit (current-directory) print-progress) + (finish-progress) (report-status "project submitted!! time for crab")) (subcommand (delete "Delete an executed or completed project") @@ -422,6 +424,7 @@ (define out-name (make-random-filename)) (call-with-output-file out-name (lambda (o) (cmd-get-deployment aid o print-progress))) + (finish-progress) (do-final out-name)] [_ (error "you must provide -l, -c, -d, or -g for this command")])) diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index efeae68..261928e 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -560,7 +560,9 @@ (loop (+ offs n))])] [(== thread-evt) (match (thread-receive) - [(msg:file-request _ _ #f) (log-tm-info "file transfer ~a complete" trans-id)] + [(msg:file-request _ _ #f) + (close-input-port port) + (log-tm-info "file transfer ~a complete" trans-id)] [(msg:file-request _ _ new-offs) (loop new-offs)] [x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])])) (void))) diff --git a/crossfire/server.rkt b/crossfire/server.rkt index dde5be0..debbd07 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -172,7 +172,7 @@ (define (server-get-file taskid) (define path (get-project-file-path taskid)) - (file->bytes path)) + (and (file-exists? path) (open-input-file path))) ;; rpc helpers @@ -348,10 +348,11 @@ (project-info id name mf progress matches (hash-ref agent-states id '())))) (define/contract (get-project-file taskid) - (-> integer? bytes?) + (-> integer? file-transfer?) ;; TODO : streaming interface - (with-handlers ([exn:fail? (lambda (ex) (error "unable to fetch the requested file"))]) - (server-get-file taskid))) + (match (server-get-file taskid) + [#f (error "no such project file")] + [port (make-file-transfer port)])) (define/contract (get-project-matches taskid) (-> integer? (listof (listof integer?)))