implement file streaming in agent
This commit is contained in:
parent
68b9984900
commit
3a36cae048
|
@ -64,8 +64,9 @@
|
||||||
(with-handlers ([exn:fail? (lambda (ex)
|
(with-handlers ([exn:fail? (lambda (ex)
|
||||||
(logging-report-error ex)
|
(logging-report-error ex)
|
||||||
(cleanup))])
|
(cleanup))])
|
||||||
;; TODO this should be updated with the streaming interface
|
(call-with-output-file
|
||||||
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out))
|
tgz-file
|
||||||
|
(lambda (out) (define ft (get-project-file tid)) (file-transfer-connect ft out))
|
||||||
#:exists 'truncate)
|
#:exists 'truncate)
|
||||||
(log-agent-info "extracting task data for ~a" tid)
|
(log-agent-info "extracting task data for ~a" tid)
|
||||||
(untgz tgz-file #:dest extract-dir)
|
(untgz tgz-file #:dest extract-dir)
|
||||||
|
|
|
@ -302,10 +302,11 @@
|
||||||
;; TODO : xterm/vt100/etc-specific
|
;; TODO : xterm/vt100/etc-specific
|
||||||
;; this should use a library maybe
|
;; this should use a library maybe
|
||||||
(printf "\r\x1b[Ktransferred ~a% [~a/~a]" pct a b)
|
(printf "\r\x1b[Ktransferred ~a% [~a/~a]" pct a b)
|
||||||
(when (= a b)
|
|
||||||
(printf "\n"))
|
|
||||||
(flush-output))
|
(flush-output))
|
||||||
|
|
||||||
|
(define (finish-progress)
|
||||||
|
(printf "\n"))
|
||||||
|
|
||||||
(define (make-random-filename)
|
(define (make-random-filename)
|
||||||
(string-append (bytes->hex-string (crypto-random-bytes 8)) ".agent"))
|
(string-append (bytes->hex-string (crypto-random-bytes 8)) ".agent"))
|
||||||
|
|
||||||
|
@ -358,6 +359,7 @@
|
||||||
;; do submit
|
;; do submit
|
||||||
(report-status "submitting project...!")
|
(report-status "submitting project...!")
|
||||||
(cmd-submit (current-directory) print-progress)
|
(cmd-submit (current-directory) print-progress)
|
||||||
|
(finish-progress)
|
||||||
(report-status "project submitted!! time for crab"))
|
(report-status "project submitted!! time for crab"))
|
||||||
|
|
||||||
(subcommand (delete "Delete an executed or completed project")
|
(subcommand (delete "Delete an executed or completed project")
|
||||||
|
@ -422,6 +424,7 @@
|
||||||
(define out-name (make-random-filename))
|
(define out-name (make-random-filename))
|
||||||
(call-with-output-file out-name
|
(call-with-output-file out-name
|
||||||
(lambda (o) (cmd-get-deployment aid o print-progress)))
|
(lambda (o) (cmd-get-deployment aid o print-progress)))
|
||||||
|
(finish-progress)
|
||||||
(do-final out-name)]
|
(do-final out-name)]
|
||||||
[_ (error "you must provide -l, -c, -d, or -g for this command")]))
|
[_ (error "you must provide -l, -c, -d, or -g for this command")]))
|
||||||
|
|
||||||
|
|
|
@ -560,7 +560,9 @@
|
||||||
(loop (+ offs n))])]
|
(loop (+ offs n))])]
|
||||||
[(== thread-evt)
|
[(== thread-evt)
|
||||||
(match (thread-receive)
|
(match (thread-receive)
|
||||||
[(msg:file-request _ _ #f) (log-tm-info "file transfer ~a complete" trans-id)]
|
[(msg:file-request _ _ #f)
|
||||||
|
(close-input-port port)
|
||||||
|
(log-tm-info "file transfer ~a complete" trans-id)]
|
||||||
[(msg:file-request _ _ new-offs) (loop new-offs)]
|
[(msg:file-request _ _ new-offs) (loop new-offs)]
|
||||||
[x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])]))
|
[x (log-tm-warning "invalid data during file transfer ~a" x) (loop offs)])]))
|
||||||
(void)))
|
(void)))
|
||||||
|
|
|
@ -172,7 +172,7 @@
|
||||||
|
|
||||||
(define (server-get-file taskid)
|
(define (server-get-file taskid)
|
||||||
(define path (get-project-file-path taskid))
|
(define path (get-project-file-path taskid))
|
||||||
(file->bytes path))
|
(and (file-exists? path) (open-input-file path)))
|
||||||
|
|
||||||
|
|
||||||
;; rpc helpers
|
;; rpc helpers
|
||||||
|
@ -348,10 +348,11 @@
|
||||||
(project-info id name mf progress matches (hash-ref agent-states id '()))))
|
(project-info id name mf progress matches (hash-ref agent-states id '()))))
|
||||||
|
|
||||||
(define/contract (get-project-file taskid)
|
(define/contract (get-project-file taskid)
|
||||||
(-> integer? bytes?)
|
(-> integer? file-transfer?)
|
||||||
;; TODO : streaming interface
|
;; TODO : streaming interface
|
||||||
(with-handlers ([exn:fail? (lambda (ex) (error "unable to fetch the requested file"))])
|
(match (server-get-file taskid)
|
||||||
(server-get-file taskid)))
|
[#f (error "no such project file")]
|
||||||
|
[port (make-file-transfer port)]))
|
||||||
|
|
||||||
(define/contract (get-project-matches taskid)
|
(define/contract (get-project-matches taskid)
|
||||||
(-> integer? (listof (listof integer?)))
|
(-> integer? (listof (listof integer?)))
|
||||||
|
|
Loading…
Reference in New Issue