implement task file download and dummy execution

This commit is contained in:
xenia 2020-11-30 03:05:40 -05:00
parent 7183bcad33
commit 35122c660c
3 changed files with 139 additions and 57 deletions

View File

@ -59,10 +59,13 @@ takes the difficulty out of creating custom brute force jobs
- ✅ securely connect to server
- retrieve assigned tasks
- report number of cores available (configurable limit)
- report work rate
- report completions
- report errors
- report successes
- low priority: defer to external brute force program (eg, hashcat on GPU)
- this could be implemented on top of the existing project format
- low priority: support finding _all_ matching inputs for a project, rather than just the first one
- the architecture currently doesn't stop on the first match so it could be a thing
# client: submit jobs and view progress
- ✅securely connect to server

View File

@ -27,6 +27,7 @@
(define-logger agent #:parent global-logger)
(define *max-cache-age* (* 3600 24 7))
(define *ping-secs* 30)
;; global variables, yeet
@ -37,55 +38,127 @@
(define (agent-loop workdir cache-info)
(define cust (make-custodian))
(parameterize ([current-custodian cust])
(thread (lambda ()
;; kinda pointless, other than helping keep the connection alive
(let loop () (agent-report-state #f #f) (sleep *ping-secs*) (loop)))))
(define last-cache-update (current-seconds-monotonic))
(define run-agent? #t)
(define assignments (make-hash))
(struct download [thd file-hash/hex extract-dir [waiters #:mutable]] #:transparent)
(define downloads (make-hash))
(define (download/extract tid tgz-file extract-dir)
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files tgz-file))
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir))
(log-agent-info "downloading task data for ~a" tid)
;; TODO this should be updated with the streaming interface
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)))
(log-agent-info "extracting task data for ~a" tid)
(untgz tgz-file #:dest extract-dir))
(let loop ()
(sleep 10)
(loop))
(define cache-update-delta (max 0 (- (+ last-cache-update *max-cache-age*)
(current-seconds-monotonic))))
(define download-thds (map download-thd (hash-values downloads)))
(match (apply sync/timeout cache-update-delta (current-queue) download-thds)
;; timeout -- prune cache
[#f (update-workdir-cache! workdir cache-info)
(set! last-cache-update (current-seconds-monotonic))]
;; sent queue value
['cancel-all
(for ([(aid thd) (in-hash assignments)])
(when thd
(break-thread thd)))
(hash-clear! assignments)
(for ([(_ dl) (in-hash downloads)])
(set-download-waiters! dl '()))]
;; cancel (if incomplete) or deregister (if complete)
[(cons 'stop aid)
(match (hash-ref assignments aid #f)
[#f (hash-remove! assignments aid)]
[thd (break-thread thd)])
(hash-remove! assignments aid)
(for ([(_ dl) (in-hash downloads)])
(set-download-waiters! dl (filter (lambda (as) (not (= aid (assignment-id as))))
(download-waiters dl))))]
;; download completed
[(? thread? dl-thd)
;; argh
(match-define (download thd file-hash/hex extract-dir waiters)
(for/first ([(tid dl) (in-hash downloads)] #:when (eq? dl-thd (download-thd dl)))
(hash-remove! downloads tid)
(log-agent-info "completed download for ~a" tid)
dl))
(hash-set! cache-info file-hash/hex (current-seconds))
(update-workdir-cache! workdir cache-info)
(set! last-cache-update (current-seconds-monotonic))
;; start delayed assignments
(for ([assignment (in-list waiters)])
(parameterize ([current-custodian cust])
(hash-set! assignments (assignment-id assignment)
(thread (lambda () (execute-assignment assignment extract-dir))))))]
[(cons 'new assignment)
(define aid (assignment-id assignment))
;; cancel old assignment with the same id, if exists
;; however, ideally don't send duplicate assignment IDs because of the potential
;; confusion/desynchronization
(when (hash-has-key? assignments aid)
(break-thread (hash-ref assignments aid)))
(define tid (assignment-task-id assignment))
(define file-hash/hex (substring (bytes->hex-string (assignment-file-hash assignment))
0 32))
(define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex)))
(define extract-dir (build-path workdir file-hash/hex))
;; check if we need to start a download, otherwise immediately start the assignment
(cond
[(and (hash-has-key? cache-info file-hash/hex)
(file-exists? tgz-file) (directory-exists? extract-dir))
(hash-set! cache-info file-hash/hex (current-seconds))
(parameterize ([current-custodian cust])
(hash-set! assignments aid
(thread (lambda () (execute-assignment assignment extract-dir)))))]
[(hash-has-key? downloads tid)
(define dl (hash-ref downloads tid))
(set-download-waiters! dl (cons assignment (download-waiters dl)))]
[else
(log-agent-info "starting download for ~a" tid)
(define dl (download (thread (thunk (download/extract tid tgz-file extract-dir)))
file-hash/hex extract-dir (list assignment)))
(hash-set! downloads tid dl)])])
(when run-agent? (loop)))
(custodian-shutdown-all cust))
; (thread (lambda ()
; (define data (get-project-file tid))
; (log-agent-info "assignment data: ~s" data)
; (define *read-size* 8192)
; (call-with-input-file path (lambda (in)
; (define ctx (crypto-blake2b-init))
; (let loop ()
; (match (read-bytes *read-size* in)
; [(? eof-object?) (void)]
; [data (crypto-blake2b-update ctx data)
; (loop)]))
; (crypto-blake2b-final ctx))))
; (log-agent-info "simulating assignment ~a" aid)
; (sleep 10)
; (log-agent-info "sending completion ~a" aid)
; (agent-report-state aid 'complete)))
(define (execute-assignment assignment extract-dir)
(define aid (assignment-id assignment))
(log-agent-info "starting execution of ~a" aid)
(define (cleanup)
(log-agent-info "execution of ~a complete" aid)
(agent-report-state aid 'complete)
(async-channel-put (current-queue) (cons 'stop aid)))
(define (report-error ex)
(log-agent-info "execution of ~a ran into error" aid)
((error-display-handler) (exn-message ex) ex)
(agent-report-state aid 'error)
(async-channel-put (current-queue) (cons 'stop aid)))
(with-handlers ([exn:fail? report-error])
(define work-range (assignment-work-range assignment))
(log-agent-info "the work for assignment ~a is ~a" aid work-range)
(sleep 10)
(void "TODO")
(cleanup)))
;; utils
;; returns the path to the extracted directory, which should be used as the cwd for the command
;; specified in the manifest
(define (download/extract workdir tid file-hash cache-info)
(define file-hash/hex (bytes->hex-string file-hash))
(define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex)))
(define extract-dir (build-path workdir file-hash/hex))
(cond
[(and (hash-has-key? cache-info file-hash/hex)
(file-exists? tgz-file) (directory-exists? extract-dir))
;; do nothing, already exists. but update the cache-info
(hash-set! cache-info file-hash/hex (current-seconds))
extract-dir]
[else
;; just in case one existed but not the other
(delete-directory/files tgz-file) (delete-directory/files extract-dir)
(log-agent-info "downloading task data for ~a" tid)
;; TODO this should be updated with the streaming interface
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)))
(untgz tgz-file #:dest extract-dir)
(hash-set! cache-info file-hash/hex (current-seconds))
extract-dir]))
;; updates the file cache by deleting expired stuff according to the given cache-info
;; the file cache uses utc time, not monotonic time
;; if that means stuff ends up in the future, delete those too
@ -93,11 +166,11 @@
;; mapping of the blake2b hash to a list of files corresponding to it (the .tgz with that hash
;; and the extracted directory)
(define work-files (make-hash))
(for ([file (in-directory workdir)])
(for ([file (in-list (directory-list workdir #:build? #t))])
(define basename (path->string (file-name-from-path file)))
(match basename
;; find all <hash>.tgz and <hash>/
[(pregexp #px"^([a-zA-Z0-9]+)(\\.tgz|)" (list _ base _))
[(pregexp #px"^([a-fA-F0-9]+)(\\.tgz|)" (list _ base _))
(hash-update! work-files base (lambda (el) (cons file el)) '())]
;; delete everything else
[_ (delete-directory/files file)]))
@ -113,10 +186,11 @@
(set! cache-info
(if (and (hash? index-cache-info) ((listof string?) (hash-keys index-cache-info))
((listof integer?) (hash-values index-cache-info)))
index-cache-info
(hash-copy index-cache-info)
(make-hash))))
;; prune cache
(define now (current-seconds))
(for ([file (in-list (hash-keys cache-info))])
(when (or (> now (+ (hash-ref cache-info file) *max-cache-age*)) ;; cache expired
(> (hash-ref cache-info file) (+ now *max-cache-age*))) ;; it's too far in the future
@ -153,7 +227,7 @@
(-> integer? void?)
(enforce-subject 'server)
(log-agent-info "got cancel-assignment ~a" aid)
(async-channel-put (current-queue) (cons 'cancel aid))
(async-channel-put (current-queue) (cons 'stop aid))
(void))
(define/contract (cancel-all-assignments)
@ -210,7 +284,8 @@
(define workdir
(let ([tmpdir (find-system-path 'temp-dir)])
(build-path tmpdir "crossfire-agent" (bytes->hex-string (node-pubkey agent-node)))))
(build-path tmpdir "crossfire-agent" (substring (bytes->hex-string (node-pubkey agent-node))
0 32))))
(make-directory* workdir)
(log-agent-info "using workdir ~a" workdir)
(define cache-info (update-workdir-cache! workdir))
@ -232,4 +307,5 @@
(loop (min 120 (* sleep-time 2))))))
(log-agent-info "connected! ready to do stuff")
(agent-loop workdir cache-info))
(with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))])
(agent-loop workdir cache-info)))

View File

@ -411,8 +411,8 @@
;; this will massively overcommit the last few parts of a project and potentially
;; prioritize doing useless duplicate work instead of moving on to the next project
;; but it'll be fiiiiiine don't worry
(define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)]))
(integer-set-union iset v))
(define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)])
(integer-set-union iset v)))
(define-values [assignment _]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
(hash-set! at agent-id assignment)
@ -445,7 +445,7 @@
(current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration
(s-exp->fasl (integer-set-contents assignment)))
(define new-completed (integer-set-union (task-state-completed-work ts) assignment))
(set-task-state-completed-work! new-completed)
(set-task-state-completed-work! ts new-completed)
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)
;; check if we're fully complete. if so, mark the task complete in the database and cancel
@ -626,15 +626,16 @@
(log-server-info "agent handler ~a got new task ~a" id (task-state-id ts))
(hash-set! current-tasks (task-state-id ts) ts)
(update-assignments!)]
[(cons 'cancel-task ts)
[(cons 'cancel-task tid)
(log-server-info "agent handler ~a deregistering task ~a" id tid)
(define assignments
(for/list ([(_ v) (in-hash assigned-tasks)]
#:when (= (task-state-id ts) (assignment-taskid v)))
#:when (= tid (assignment-taskid v)))
v))
(for ([av (in-list assignments)])
(cancel-assignment! av))
(hash-remove! current-tasks (task-state-id ts) ts)
(hash-remove! task-size (task-state-id ts) ts)
(hash-remove! current-tasks tid)
(hash-remove! task-size tid)
(update-assignments!)]
;; got completion report from agent
[(cons 'agent-report status)
@ -810,7 +811,7 @@
;; assignment-id is a numeric ID or #f if no currently assigned task
;; agents can call this as many times as they want -- it can serve as a sort of "ping"
(define (agent-report-state assignment-id state)
(-> (or/c false? integer?) (or/c 'incomplete 'complete (listof integer?)))
(-> (or/c false? integer?) (or/c 'incomplete 'complete 'error (listof integer?)))
(enforce-subject 'agent)
(define agent-id (node-id (current-from-node)))
;; TODO : maybe wait for an actual completion here? idk
@ -886,9 +887,11 @@
(lambda (out) (write (list agent-node (current-server-public-node)) out)))
(log-server-info "created dev agent ~a" name)
(exit)]
[(vector "dev-new-project")
[(vector "dev-new-project" tgz)
(define id (make-task '((name "test project") (arch "any")
(resources "cpu") (pattern "meow?d?d")) #"no file contents lol"))
(command "./test.sh")
(resources "cpu") (pattern "meow?d?d"))
(file->bytes tgz)))
(log-server-info "created project")
(exit)]
[(vector subcmd _ ...)