From 35122c660ce933bcb528864f80b376ea0487b12d Mon Sep 17 00:00:00 2001 From: haskal Date: Mon, 30 Nov 2020 03:05:40 -0500 Subject: [PATCH] implement task file download and dummy execution --- README.md | 5 +- crossfire/agent.rkt | 168 +++++++++++++++++++++++++++++++------------ crossfire/server.rkt | 23 +++--- 3 files changed, 139 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 963f79b..76d4a97 100644 --- a/README.md +++ b/README.md @@ -59,10 +59,13 @@ takes the difficulty out of creating custom brute force jobs - ✅ securely connect to server - retrieve assigned tasks - report number of cores available (configurable limit) -- report work rate +- report completions +- report errors - report successes - low priority: defer to external brute force program (eg, hashcat on GPU) + - this could be implemented on top of the existing project format - low priority: support finding _all_ matching inputs for a project, rather than just the first one + - the architecture currently doesn't stop on the first match so it could be a thing # client: submit jobs and view progress - ✅securely connect to server diff --git a/crossfire/agent.rkt b/crossfire/agent.rkt index a7bc17e..935446b 100644 --- a/crossfire/agent.rkt +++ b/crossfire/agent.rkt @@ -27,6 +27,7 @@ (define-logger agent #:parent global-logger) (define *max-cache-age* (* 3600 24 7)) +(define *ping-secs* 30) ;; global variables, yeet @@ -37,55 +38,127 @@ (define (agent-loop workdir cache-info) (define cust (make-custodian)) + (parameterize ([current-custodian cust]) + (thread (lambda () + ;; kinda pointless, other than helping keep the connection alive + (let loop () (agent-report-state #f #f) (sleep *ping-secs*) (loop))))) + (define last-cache-update (current-seconds-monotonic)) + (define run-agent? #t) + (define assignments (make-hash)) + + (struct download [thd file-hash/hex extract-dir [waiters #:mutable]] #:transparent) + (define downloads (make-hash)) + + (define (download/extract tid tgz-file extract-dir) + (with-handlers ([exn:fail:filesystem? void]) (delete-directory/files tgz-file)) + (with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir)) + (log-agent-info "downloading task data for ~a" tid) + ;; TODO this should be updated with the streaming interface + (call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out))) + (log-agent-info "extracting task data for ~a" tid) + (untgz tgz-file #:dest extract-dir)) (let loop () - (sleep 10) - (loop)) + (define cache-update-delta (max 0 (- (+ last-cache-update *max-cache-age*) + (current-seconds-monotonic)))) + (define download-thds (map download-thd (hash-values downloads))) + (match (apply sync/timeout cache-update-delta (current-queue) download-thds) + ;; timeout -- prune cache + [#f (update-workdir-cache! workdir cache-info) + (set! last-cache-update (current-seconds-monotonic))] + ;; sent queue value + ['cancel-all + (for ([(aid thd) (in-hash assignments)]) + (when thd + (break-thread thd))) + (hash-clear! assignments) + (for ([(_ dl) (in-hash downloads)]) + (set-download-waiters! dl '()))] + ;; cancel (if incomplete) or deregister (if complete) + [(cons 'stop aid) + (match (hash-ref assignments aid #f) + [#f (hash-remove! assignments aid)] + [thd (break-thread thd)]) + (hash-remove! assignments aid) + (for ([(_ dl) (in-hash downloads)]) + (set-download-waiters! dl (filter (lambda (as) (not (= aid (assignment-id as)))) + (download-waiters dl))))] + ;; download completed + [(? thread? dl-thd) + ;; argh + (match-define (download thd file-hash/hex extract-dir waiters) + (for/first ([(tid dl) (in-hash downloads)] #:when (eq? dl-thd (download-thd dl))) + (hash-remove! downloads tid) + (log-agent-info "completed download for ~a" tid) + dl)) + (hash-set! cache-info file-hash/hex (current-seconds)) + (update-workdir-cache! workdir cache-info) + (set! last-cache-update (current-seconds-monotonic)) + ;; start delayed assignments + (for ([assignment (in-list waiters)]) + (parameterize ([current-custodian cust]) + (hash-set! assignments (assignment-id assignment) + (thread (lambda () (execute-assignment assignment extract-dir))))))] + [(cons 'new assignment) + (define aid (assignment-id assignment)) + ;; cancel old assignment with the same id, if exists + ;; however, ideally don't send duplicate assignment IDs because of the potential + ;; confusion/desynchronization + (when (hash-has-key? assignments aid) + (break-thread (hash-ref assignments aid))) + + (define tid (assignment-task-id assignment)) + (define file-hash/hex (substring (bytes->hex-string (assignment-file-hash assignment)) + 0 32)) + (define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex))) + (define extract-dir (build-path workdir file-hash/hex)) + ;; check if we need to start a download, otherwise immediately start the assignment + (cond + [(and (hash-has-key? cache-info file-hash/hex) + (file-exists? tgz-file) (directory-exists? extract-dir)) + (hash-set! cache-info file-hash/hex (current-seconds)) + (parameterize ([current-custodian cust]) + (hash-set! assignments aid + (thread (lambda () (execute-assignment assignment extract-dir)))))] + [(hash-has-key? downloads tid) + (define dl (hash-ref downloads tid)) + (set-download-waiters! dl (cons assignment (download-waiters dl)))] + [else + (log-agent-info "starting download for ~a" tid) + (define dl (download (thread (thunk (download/extract tid tgz-file extract-dir))) + file-hash/hex extract-dir (list assignment))) + (hash-set! downloads tid dl)])]) + + (when run-agent? (loop))) (custodian-shutdown-all cust)) - ; (thread (lambda () - ; (define data (get-project-file tid)) - ; (log-agent-info "assignment data: ~s" data) - ; (define *read-size* 8192) - ; (call-with-input-file path (lambda (in) - ; (define ctx (crypto-blake2b-init)) - ; (let loop () - ; (match (read-bytes *read-size* in) - ; [(? eof-object?) (void)] - ; [data (crypto-blake2b-update ctx data) - ; (loop)])) - ; (crypto-blake2b-final ctx)))) - ; (log-agent-info "simulating assignment ~a" aid) - ; (sleep 10) - ; (log-agent-info "sending completion ~a" aid) - ; (agent-report-state aid 'complete))) +(define (execute-assignment assignment extract-dir) + (define aid (assignment-id assignment)) + (log-agent-info "starting execution of ~a" aid) + + (define (cleanup) + (log-agent-info "execution of ~a complete" aid) + (agent-report-state aid 'complete) + (async-channel-put (current-queue) (cons 'stop aid))) + + (define (report-error ex) + (log-agent-info "execution of ~a ran into error" aid) + ((error-display-handler) (exn-message ex) ex) + (agent-report-state aid 'error) + (async-channel-put (current-queue) (cons 'stop aid))) + + (with-handlers ([exn:fail? report-error]) + (define work-range (assignment-work-range assignment)) + (log-agent-info "the work for assignment ~a is ~a" aid work-range) + (sleep 10) + (void "TODO") + + (cleanup))) ;; utils -;; returns the path to the extracted directory, which should be used as the cwd for the command -;; specified in the manifest -(define (download/extract workdir tid file-hash cache-info) - (define file-hash/hex (bytes->hex-string file-hash)) - (define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex))) - (define extract-dir (build-path workdir file-hash/hex)) - (cond - [(and (hash-has-key? cache-info file-hash/hex) - (file-exists? tgz-file) (directory-exists? extract-dir)) - ;; do nothing, already exists. but update the cache-info - (hash-set! cache-info file-hash/hex (current-seconds)) - extract-dir] - [else - ;; just in case one existed but not the other - (delete-directory/files tgz-file) (delete-directory/files extract-dir) - (log-agent-info "downloading task data for ~a" tid) - ;; TODO this should be updated with the streaming interface - (call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out))) - (untgz tgz-file #:dest extract-dir) - (hash-set! cache-info file-hash/hex (current-seconds)) - extract-dir])) - ;; updates the file cache by deleting expired stuff according to the given cache-info ;; the file cache uses utc time, not monotonic time ;; if that means stuff ends up in the future, delete those too @@ -93,11 +166,11 @@ ;; mapping of the blake2b hash to a list of files corresponding to it (the .tgz with that hash ;; and the extracted directory) (define work-files (make-hash)) - (for ([file (in-directory workdir)]) + (for ([file (in-list (directory-list workdir #:build? #t))]) (define basename (path->string (file-name-from-path file))) (match basename ;; find all .tgz and / - [(pregexp #px"^([a-zA-Z0-9]+)(\\.tgz|)" (list _ base _)) + [(pregexp #px"^([a-fA-F0-9]+)(\\.tgz|)" (list _ base _)) (hash-update! work-files base (lambda (el) (cons file el)) '())] ;; delete everything else [_ (delete-directory/files file)])) @@ -113,10 +186,11 @@ (set! cache-info (if (and (hash? index-cache-info) ((listof string?) (hash-keys index-cache-info)) ((listof integer?) (hash-values index-cache-info))) - index-cache-info + (hash-copy index-cache-info) (make-hash)))) ;; prune cache + (define now (current-seconds)) (for ([file (in-list (hash-keys cache-info))]) (when (or (> now (+ (hash-ref cache-info file) *max-cache-age*)) ;; cache expired (> (hash-ref cache-info file) (+ now *max-cache-age*))) ;; it's too far in the future @@ -153,7 +227,7 @@ (-> integer? void?) (enforce-subject 'server) (log-agent-info "got cancel-assignment ~a" aid) - (async-channel-put (current-queue) (cons 'cancel aid)) + (async-channel-put (current-queue) (cons 'stop aid)) (void)) (define/contract (cancel-all-assignments) @@ -210,7 +284,8 @@ (define workdir (let ([tmpdir (find-system-path 'temp-dir)]) - (build-path tmpdir "crossfire-agent" (bytes->hex-string (node-pubkey agent-node))))) + (build-path tmpdir "crossfire-agent" (substring (bytes->hex-string (node-pubkey agent-node)) + 0 32)))) (make-directory* workdir) (log-agent-info "using workdir ~a" workdir) (define cache-info (update-workdir-cache! workdir)) @@ -232,4 +307,5 @@ (loop (min 120 (* sleep-time 2)))))) (log-agent-info "connected! ready to do stuff") - (agent-loop workdir cache-info)) + (with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))]) + (agent-loop workdir cache-info))) diff --git a/crossfire/server.rkt b/crossfire/server.rkt index d6636c9..db07477 100644 --- a/crossfire/server.rkt +++ b/crossfire/server.rkt @@ -411,8 +411,8 @@ ;; this will massively overcommit the last few parts of a project and potentially ;; prioritize doing useless duplicate work instead of moving on to the next project ;; but it'll be fiiiiiine don't worry - (define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)])) - (integer-set-union iset v)) + (define wp (for/fold ([iset (make-integer-set '())]) ([(_ v) (in-hash at)]) + (integer-set-union iset v))) (define-values [assignment _] (pattern-range-take (task-state-work-pattern ts) requested-amount)) (hash-set! at agent-id assignment) @@ -445,7 +445,7 @@ (current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration (s-exp->fasl (integer-set-contents assignment))) (define new-completed (integer-set-union (task-state-completed-work ts) assignment)) - (set-task-state-completed-work! new-completed) + (set-task-state-completed-work! ts new-completed) ;; remove tracking - this work is now done (hash-remove! (task-state-agent-todo ts) agent-id) ;; check if we're fully complete. if so, mark the task complete in the database and cancel @@ -626,15 +626,16 @@ (log-server-info "agent handler ~a got new task ~a" id (task-state-id ts)) (hash-set! current-tasks (task-state-id ts) ts) (update-assignments!)] - [(cons 'cancel-task ts) + [(cons 'cancel-task tid) + (log-server-info "agent handler ~a deregistering task ~a" id tid) (define assignments (for/list ([(_ v) (in-hash assigned-tasks)] - #:when (= (task-state-id ts) (assignment-taskid v))) + #:when (= tid (assignment-taskid v))) v)) (for ([av (in-list assignments)]) (cancel-assignment! av)) - (hash-remove! current-tasks (task-state-id ts) ts) - (hash-remove! task-size (task-state-id ts) ts) + (hash-remove! current-tasks tid) + (hash-remove! task-size tid) (update-assignments!)] ;; got completion report from agent [(cons 'agent-report status) @@ -810,7 +811,7 @@ ;; assignment-id is a numeric ID or #f if no currently assigned task ;; agents can call this as many times as they want -- it can serve as a sort of "ping" (define (agent-report-state assignment-id state) - (-> (or/c false? integer?) (or/c 'incomplete 'complete (listof integer?))) + (-> (or/c false? integer?) (or/c 'incomplete 'complete 'error (listof integer?))) (enforce-subject 'agent) (define agent-id (node-id (current-from-node))) ;; TODO : maybe wait for an actual completion here? idk @@ -886,9 +887,11 @@ (lambda (out) (write (list agent-node (current-server-public-node)) out))) (log-server-info "created dev agent ~a" name) (exit)] - [(vector "dev-new-project") + [(vector "dev-new-project" tgz) (define id (make-task '((name "test project") (arch "any") - (resources "cpu") (pattern "meow?d?d")) #"no file contents lol")) + (command "./test.sh") + (resources "cpu") (pattern "meow?d?d")) + (file->bytes tgz))) (log-server-info "created project") (exit)] [(vector subcmd _ ...)