implement agent file cache
This commit is contained in:
parent
039174dbe7
commit
88b671d1d4
|
@ -16,31 +16,126 @@
|
|||
;; You should have received a copy of the GNU Affero General Public License
|
||||
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
(require racket/async-channel racket/bool racket/contract racket/fasl racket/file racket/function
|
||||
racket/match racket/port racket/string racket/unit
|
||||
(require file/untgz (only-in file/sha1 bytes->hex-string) racket/async-channel racket/bool
|
||||
racket/contract racket/fasl racket/file racket/function racket/match racket/path
|
||||
racket/port racket/string racket/unit srfi/19
|
||||
"comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt"
|
||||
"static-support.rkt")
|
||||
"static-support.rkt"
|
||||
;; port-fsync
|
||||
(submod "static-support.rkt" misc-calls))
|
||||
|
||||
(define-logger agent #:parent global-logger)
|
||||
|
||||
(define *max-cache-age* (* 3600 24 7))
|
||||
(define (current-seconds-utc)
|
||||
(time-second (current-time 'time-utc)))
|
||||
(define (current-seconds-monotonic)
|
||||
(time-second (current-time 'time-monotonic)))
|
||||
|
||||
;; global variables, yeet
|
||||
|
||||
(struct assignment [id task-id manifest file-hash work-range] #:transparent)
|
||||
(define incoming-queue (make-async-channel))
|
||||
|
||||
(define current-queue (make-parameter #f))
|
||||
|
||||
;; main loop
|
||||
(define (agent-loop)
|
||||
(define (agent-loop workdir cache-info)
|
||||
(define cust (make-custodian))
|
||||
|
||||
(define last-cache-update (current-seconds-monotonic))
|
||||
|
||||
(let loop ()
|
||||
(sleep 10)
|
||||
(loop))
|
||||
|
||||
(custodian-shutdown-all cust))
|
||||
|
||||
; (thread (lambda ()
|
||||
; (log-agent-info "downloading assignment ~a" aid)
|
||||
; (define data (get-project-file tid))
|
||||
; (log-agent-info "assignment data: ~s" data)
|
||||
; (define *read-size* 8192)
|
||||
; (call-with-input-file path (lambda (in)
|
||||
; (define ctx (crypto-blake2b-init))
|
||||
; (let loop ()
|
||||
; (match (read-bytes *read-size* in)
|
||||
; [(? eof-object?) (void)]
|
||||
; [data (crypto-blake2b-update ctx data)
|
||||
; (loop)]))
|
||||
; (crypto-blake2b-final ctx))))
|
||||
; (log-agent-info "simulating assignment ~a" aid)
|
||||
; (sleep 10)
|
||||
; (log-agent-info "sending completion ~a" aid)
|
||||
; (agent-report-state aid 'complete)))
|
||||
(sleep 10)
|
||||
(agent-loop))
|
||||
|
||||
;; utils
|
||||
|
||||
;; returns the path to the extracted directory, which should be used as the cwd for the command
|
||||
;; specified in the manifest
|
||||
(define (download/extract workdir tid file-hash cache-info)
|
||||
(define file-hash/hex (bytes->hex-string file-hash))
|
||||
(define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex)))
|
||||
(define extract-dir (build-path workdir file-hash/hex))
|
||||
(cond
|
||||
[(and (hash-has-key? cache-info file-hash/hex)
|
||||
(file-exists? tgz-file) (directory-exists? extract-dir))
|
||||
;; do nothing, already exists. but update the cache-info
|
||||
(hash-set! cache-info file-hash/hex (current-seconds-utc))
|
||||
extract-dir]
|
||||
[else
|
||||
;; just in case one existed but not the other
|
||||
(delete-directory/files tgz-file) (delete-directory/files extract-dir)
|
||||
(log-agent-info "downloading task data for ~a" tid)
|
||||
;; TODO this should be updated with the streaming interface
|
||||
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)))
|
||||
(untgz tgz-file #:dest extract-dir)
|
||||
(hash-set! cache-info file-hash/hex (current-seconds-utc))
|
||||
extract-dir]))
|
||||
|
||||
;; updates the file cache by deleting expired stuff according to the given cache-info
|
||||
;; the file cache uses utc time, not monotonic time
|
||||
;; if that means stuff ends up in the future, delete those too
|
||||
(define (update-workdir-cache! workdir [cache-info #f])
|
||||
;; mapping of the blake2b hash to a list of files corresponding to it (the .tgz with that hash
|
||||
;; and the extracted directory)
|
||||
(define work-files (make-hash))
|
||||
(for ([file (in-directory workdir)])
|
||||
(define basename (path->string (file-name-from-path file)))
|
||||
(match basename
|
||||
;; find all <hash>.tgz and <hash>/
|
||||
[(pregexp #px"^([a-zA-Z0-9]+)(\\.tgz|)" (list _ base _))
|
||||
(hash-update! work-files base (lambda (el) (cons file el)) '())]
|
||||
;; delete everything else
|
||||
[_ (delete-directory/files file)]))
|
||||
|
||||
(define index-file (build-path workdir "index.rktd"))
|
||||
|
||||
;; load the cache-info from file if possible
|
||||
;; conservatively throw away everything if it seems corrupted
|
||||
(when (false? cache-info)
|
||||
(define index-cache-info
|
||||
(with-handlers ([exn:fail:filesystem? (lambda (ex) (make-hash))])
|
||||
(call-with-input-file index-file read)))
|
||||
(set! cache-info
|
||||
(if (and (hash? index-cache-info) ((listof string?) (hash-keys index-cache-info))
|
||||
((listof integer?) (hash-values index-cache-info)))
|
||||
index-cache-info
|
||||
(make-hash))))
|
||||
|
||||
;; prune cache
|
||||
(for ([file (in-list (hash-keys cache-info))])
|
||||
(when (or (> now (+ (hash-ref cache-info file) *max-cache-age*)) ;; cache expired
|
||||
(> (hash-ref cache-info file) (+ now *max-cache-age*))) ;; it's too far in the future
|
||||
(hash-remove! cache-info file)))
|
||||
|
||||
(call-with-output-file index-file (lambda (out) (write cache-info out))
|
||||
#:mode 'binary #:exists 'truncate)
|
||||
|
||||
;; delete everything that is not present in the cache-info now
|
||||
(for ([(base files) (in-hash work-files)])
|
||||
(unless (hash-has-key? cache-info base)
|
||||
(log-agent-info "deleting expired files for hash ~a" base)
|
||||
(map delete-directory/files files)))
|
||||
|
||||
cache-info)
|
||||
|
||||
|
||||
;; rpc impl
|
||||
|
@ -55,21 +150,21 @@
|
|||
(log-agent-info "got push-assignment ~a ~a ~s ~a" aid mf-raw file-hash assign-data)
|
||||
|
||||
(async-channel-put
|
||||
incoming-queue (cons 'new (assignment aid tid (parse-manifest mf-raw) file-hash assign-data)))
|
||||
(current-queue) (cons 'new (assignment aid tid (parse-manifest mf-raw) file-hash assign-data)))
|
||||
(void))
|
||||
|
||||
(define/contract (cancel-assignment aid)
|
||||
(-> integer? void?)
|
||||
(enforce-subject 'server)
|
||||
(log-agent-info "got cancel-assignment ~a" aid)
|
||||
(async-channel-put incoming-queue (cons 'cancel aid))
|
||||
(async-channel-put (current-queue) (cons 'cancel aid))
|
||||
(void))
|
||||
|
||||
(define/contract (cancel-all-assignments)
|
||||
(-> void?)
|
||||
(enforce-subject 'server)
|
||||
(log-agent-info "got cancel-all-assignments")
|
||||
(async-channel-put incoming-queue 'cancel-all)
|
||||
(async-channel-put (current-queue) 'cancel-all)
|
||||
(void))
|
||||
|
||||
;; agent impl unit
|
||||
|
@ -84,6 +179,7 @@
|
|||
|
||||
(install-logging!)
|
||||
(log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version))
|
||||
(current-queue (make-async-channel))
|
||||
|
||||
(define (get-config.linux-gnu)
|
||||
(call-with-input-file "/proc/self/exe"
|
||||
|
@ -115,6 +211,13 @@
|
|||
|
||||
(comms-set-node-info (current-comms) server-node)
|
||||
|
||||
(define workdir
|
||||
(let ([tmpdir (find-system-path 'temp-dir)])
|
||||
(build-path tmpdir "crossfire-agent" (bytes->hex-string (node-pubkey agent-node)))))
|
||||
(make-directory* workdir)
|
||||
(log-agent-info "using workdir ~a" workdir)
|
||||
(define cache-info (update-workdir-cache! workdir))
|
||||
|
||||
(log-agent-info "connecting to server...")
|
||||
(with-handlers ([exn:break? (lambda (_)
|
||||
(log-agent-info "connection cancelled")
|
||||
|
@ -132,4 +235,4 @@
|
|||
(loop (min 120 (* sleep-time 2))))))
|
||||
|
||||
(log-agent-info "connected! ready to do stuff")
|
||||
(agent-loop))
|
||||
(agent-loop workdir cache-info))
|
||||
|
|
|
@ -581,6 +581,13 @@
|
|||
(lambda (v) (max *min-subtask-size* (/ 2 v))) *min-subtask-size*)]
|
||||
[else (void)])))
|
||||
|
||||
;; examine the current tasks and immediately assign anything that has work to do and that we
|
||||
;; have resources for. this is idempotent, it can be called whenever there is any sort of state
|
||||
;; change regarding assignments and it will take appropriate action
|
||||
;; this is also the main point to introduce a potentially smarter scheduling algorithm. the
|
||||
;; algorithm is currently "yolo it because we don't really expect many competing tasks at
|
||||
;; the same time anyway" -- this means performance with nontrivial sets of resources and
|
||||
;; a large number of concurrent tasks could be suboptimal
|
||||
(define (update-assignments!)
|
||||
(log-server-info "agent handler ~a: updating assignments" id)
|
||||
;; detect set of used resources
|
||||
|
@ -639,7 +646,8 @@
|
|||
(for ([av (in-list assignments)])
|
||||
(cancel-assignment! av))
|
||||
(hash-remove! current-tasks (task-state-id ts) ts)
|
||||
(hash-remove! task-size (task-state-id ts) ts)]
|
||||
(hash-remove! task-size (task-state-id ts) ts)
|
||||
(update-assignments!)]
|
||||
;; got completion report from agent
|
||||
[(cons 'agent-report status)
|
||||
(match status
|
||||
|
@ -652,6 +660,13 @@
|
|||
(define av (hash-ref assigned-tasks assignment-id #f))
|
||||
(unless (false? av) (complete-assignment! av))
|
||||
(update-assignments!)]
|
||||
;; execution failed, unassign task
|
||||
;; TODO : maybe blacklist this task from this agent if there are too many errors
|
||||
;; TODO : notify connected clients that an error occurred
|
||||
[(cons assignment-id 'error)
|
||||
(define av (hash-ref assigned-tasks assignment-id #f))
|
||||
(unless (false? av) (cancel-assignment! av))
|
||||
(update-assignments!)]
|
||||
;; got succeeding input
|
||||
[(cons assignment-id success-input)
|
||||
(define av (hash-ref assigned-tasks assignment-id #f))
|
||||
|
@ -803,7 +818,7 @@
|
|||
|
||||
;; agent rpcs
|
||||
|
||||
;; report state 'incomplete 'complete or a list of integer representing a success result
|
||||
;; report state 'incomplete 'complete 'error or a list of integer representing a success result
|
||||
;; assignment-id is a numeric ID or #f if no currently assigned task
|
||||
;; agents can call this as many times as they want -- it can serve as a sort of "ping"
|
||||
(define (agent-report-state assignment-id state)
|
||||
|
|
Loading…
Reference in New Issue