crossfire/crossfire/agent.rkt

399 lines
17 KiB
Racket

#lang racket/base
;; crossfire: distributed brute force infrastructure
;;
;; Copyright (C) 2020 haskal
;;
;; This program is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Affero General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require file/untgz (only-in file/sha1 bytes->hex-string) racket/async-channel racket/bool
racket/cmdline racket/contract racket/fasl racket/file racket/function racket/match
racket/path racket/port racket/string racket/unit
"comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt"
"pattern.rkt" "static-support.rkt"
(submod "static-support.rkt" misc-calls))
(provide agent-main)
(define-logger agent #:parent global-logger)
(define *max-cache-age* (* 3600 24 7))
(define *ping-secs* 30)
(define *subproc-kill-delay* 10)
(define *report-retry-secs* 30)
;; global variables, yeet
(struct assignment [id task-id manifest file-hash work-range] #:transparent)
(define current-queue (make-parameter #f))
;; main loop
(define (agent-loop workdir cache-info)
(define cust (make-custodian))
(parameterize ([current-custodian cust])
(thread (lambda ()
;; kinda pointless, other than helping keep the connection alive
(let loop () (agent-report-state #f #f) (sleep *ping-secs*) (loop)))))
(define last-cache-update (current-seconds-monotonic))
(define run-agent? #t)
(define assignments (make-hash))
(struct download [thd file-hash/hex extract-dir [waiters #:mutable]] #:transparent)
(define downloads (make-hash))
(define (download/extract tid tgz-file extract-dir)
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files tgz-file))
(with-handlers ([exn:fail:filesystem? void]) (delete-directory/files extract-dir))
(log-agent-info "downloading task data for ~a" tid)
;; TODO this should be updated with the streaming interface
(call-with-output-file tgz-file (lambda (out) (write-bytes (get-project-file tid) out)))
(log-agent-info "extracting task data for ~a" tid)
(untgz tgz-file #:dest extract-dir))
(let loop ()
(define cache-update-delta (max 0 (- (+ last-cache-update *max-cache-age*)
(current-seconds-monotonic))))
(define download-thds (map download-thd (hash-values downloads)))
(match (apply sync/timeout cache-update-delta (current-queue) download-thds)
;; timeout -- prune cache
[#f (update-workdir-cache! workdir cache-info)
(set! last-cache-update (current-seconds-monotonic))]
;; sent queue value
['cancel-all
(for ([(aid thd) (in-hash assignments)])
(when thd
(break-thread thd)))
(hash-clear! assignments)
(for ([(_ dl) (in-hash downloads)])
(set-download-waiters! dl '()))]
;; cancel (if incomplete) or deregister (if complete)
[(cons 'stop aid)
(match (hash-ref assignments aid #f)
[#f (hash-remove! assignments aid)]
[thd (break-thread thd)])
(hash-remove! assignments aid)
(for ([(_ dl) (in-hash downloads)])
(set-download-waiters! dl (filter (lambda (as) (not (= aid (assignment-id as))))
(download-waiters dl))))]
;; download completed
[(? thread? dl-thd)
;; argh
(match-define (download thd file-hash/hex extract-dir waiters)
(for/first ([(tid dl) (in-hash downloads)] #:when (eq? dl-thd (download-thd dl)))
(hash-remove! downloads tid)
(log-agent-info "completed download for ~a" tid)
dl))
(hash-set! cache-info file-hash/hex (current-seconds))
(update-workdir-cache! workdir cache-info)
(set! last-cache-update (current-seconds-monotonic))
;; start delayed assignments
(for ([assignment (in-list waiters)])
(parameterize ([current-custodian cust])
(hash-set! assignments (assignment-id assignment)
(thread (lambda () (execute-assignment assignment extract-dir))))))]
[(cons 'new assignment)
(define aid (assignment-id assignment))
;; cancel old assignment with the same id, if exists
;; however, ideally don't send duplicate assignment IDs because of the potential
;; confusion/desynchronization
(when (hash-has-key? assignments aid)
(break-thread (hash-ref assignments aid)))
(define tid (assignment-task-id assignment))
(define file-hash/hex (substring (bytes->hex-string (assignment-file-hash assignment))
0 32))
(define tgz-file (build-path workdir (format "~a.tgz" file-hash/hex)))
(define extract-dir (build-path workdir file-hash/hex))
;; check if we need to start a download, otherwise immediately start the assignment
(cond
[(and (hash-has-key? cache-info file-hash/hex)
(file-exists? tgz-file) (directory-exists? extract-dir))
(hash-set! cache-info file-hash/hex (current-seconds))
(parameterize ([current-custodian cust])
(hash-set! assignments aid
(thread (lambda () (execute-assignment assignment extract-dir)))))]
[(hash-has-key? downloads tid)
(define dl (hash-ref downloads tid))
(set-download-waiters! dl (cons assignment (download-waiters dl)))]
[else
(log-agent-info "starting download for ~a" tid)
(define dl (download (thread (thunk (download/extract tid tgz-file extract-dir)))
file-hash/hex extract-dir (list assignment)))
(hash-set! downloads tid dl)])])
(when run-agent? (loop)))
;; TODO : report errors for all in-progress assignments or something
(custodian-shutdown-all cust))
(define (execute-assignment assignment extract-dir)
(define aid (assignment-id assignment))
(log-agent-info "starting execution of ~a in ~a" aid extract-dir)
(define cust (make-custodian))
(current-subprocess-custodian-mode 'kill)
(define (cleanup)
(log-agent-info "execution of ~a complete" aid)
(custodian-shutdown-all cust)
(agent-report-state aid 'complete)
(async-channel-put (current-queue) (cons 'stop aid)))
(define (report-error ex)
(log-agent-info "execution of ~a ran into error" aid)
(custodian-shutdown-all cust)
((error-display-handler) (exn-message ex) ex)
(agent-report-state aid 'error)
(async-channel-put (current-queue) (cons 'stop aid)))
(with-handlers ([exn:fail? report-error])
(define work-range (assignment-work-range assignment))
(define manifest (assignment-manifest assignment))
(log-agent-info "the work for assignment ~a is ~a" aid work-range)
(define cmd (manifest-data-ref manifest 'command))
(define num-cpus (count-cpus))
;; TODO : handle smp
(define pattern (manifest-pattern manifest))
(for ([interval (in-list work-range)])
(define pp-start (resolve-pattern-pos pattern (pos->pattern-pos pattern (car interval))))
(define pp-end (resolve-pattern-pos pattern (pos->pattern-pos pattern (cdr interval))))
(define args (for/fold ([args '()]) ([pps (in-vector pp-start)] [ppe (in-vector pp-end)])
;; TODO : this isn't very efficient...
(append args (list (number->string (car pps) 16)
(number->string (cdr pps) 16)
(number->string (car ppe) 16)
(number->string (cdr ppe) 16)))))
;; TODO : handle stdio mode lol
(define-values [proc out in _]
(parameterize ([current-custodian cust] [current-directory extract-dir])
(apply subprocess #f #f (current-error-port) 'new (append cmd args))))
(with-handlers ([exn:break? (lambda (_) (subprocess-kill proc #f)
(sync/timeout *subproc-kill-delay* proc)
;; will handle killing for us :P
(custodian-shutdown-all cust)
;; exit without reporting status
(kill-thread (current-thread)))])
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
(define eof-e (eof-evt out))
(let loop ([reached-eof #f] [proc-done #f])
(match (sync proc line-match eof-e)
[(== proc)
(unless reached-eof
(loop reached-eof #t))]
[(? eof-object?)
(unless proc-done
(loop #t proc-done))]
[(list line)
(define line-str (bytes->string/utf-8 line #\?))
(define line-parts
(and line-str
(map (lambda (x) (string->number x 16))
(string-split (string-trim line-str) " "))))
;; check format, if it looks correct-ish then report it
;; otherwise warn
;; it will be #f if the line failed to decode as utf-8
;; theoretically since we're only dealing with 0-9a-f we could also just decode as
;; ascii but i like utf-8 so whatever potential bugs i'm introducing with this be
;; hecked tbh
(if (and ((listof integer?) line-parts)
(= (length line-parts) (vector-length (manifest-pattern manifest))))
(report-success/retry aid line-parts)
(log-agent-warning "assignment ~a input loop got unparseable line ~s" aid line))
(loop reached-eof proc-done)]
[x (log-agent-warning "assignment ~a input loop got unexpected value ~a" aid x)
(loop reached-eof proc-done)])))
(define errcode (subprocess-status proc))
(log-agent-info "assignment ~a process exited with code ~a" aid errcode)
;; report error if it's a nonzero exit code
(unless (zero? errcode)
(error "process exited with nonzero code" errcode))))
(cleanup)
(void))
;; utils
;; reports success, forever, until it works
(define (report-success/retry aid value)
(log-agent-info "assignment ~a reporting succeeding input ~a" aid value)
(with-handlers ([exn:fail? (lambda (ex)
(log-agent-warning "assignment ~a failed to report success" aid)
(sleep *report-retry-secs*)
(report-success/retry aid value))])
(agent-report-state aid value)))
;; updates the file cache by deleting expired stuff according to the given cache-info
;; the file cache uses utc time, not monotonic time
;; if that means stuff ends up in the future, delete those too
(define (update-workdir-cache! workdir [cache-info #f])
;; mapping of the blake2b hash to a list of files corresponding to it (the .tgz with that hash
;; and the extracted directory)
(define work-files (make-hash))
(for ([file (in-list (directory-list workdir #:build? #t))])
(define basename (path->string (file-name-from-path file)))
(match basename
;; find all <hash>.tgz and <hash>/
[(pregexp #px"^([a-fA-F0-9]+)(\\.tgz|)" (list _ base _))
(hash-update! work-files base (lambda (el) (cons file el)) '())]
;; delete everything else
[_ (delete-directory/files file)]))
(define index-file (build-path workdir "index.rktd"))
;; load the cache-info from file if possible
;; conservatively throw away everything if it seems corrupted
(when (false? cache-info)
(define index-cache-info
(with-handlers ([exn:fail:filesystem? (lambda (ex) (make-hash))])
(call-with-input-file index-file read)))
(set! cache-info
(if (and (hash? index-cache-info) ((listof string?) (hash-keys index-cache-info))
((listof integer?) (hash-values index-cache-info)))
(hash-copy index-cache-info)
(make-hash))))
;; prune cache
(define now (current-seconds))
(for ([file (in-list (hash-keys cache-info))])
(when (or (> now (+ (hash-ref cache-info file) *max-cache-age*)) ;; cache expired
(> (hash-ref cache-info file) (+ now *max-cache-age*))) ;; it's too far in the future
(hash-remove! cache-info file)))
(call-with-output-file index-file (lambda (out) (write cache-info out))
#:mode 'binary #:exists 'truncate)
;; delete everything that is not present in the cache-info now
(for ([(base files) (in-hash work-files)])
(unless (hash-has-key? cache-info base)
(log-agent-info "deleting expired files for hash ~a" base)
(map delete-directory/files files)))
cache-info)
;; rpc impl
(define (enforce-subject type)
(unless (symbol=? type (node-type (current-from-node)))
(error "unauthorized")))
(define/contract (push-assignment aid tid mf-raw file-hash assign-data)
(-> integer? integer? list? bytes? (listof pair?) void?)
(enforce-subject 'server)
(log-agent-info "got push-assignment ~a ~a ~s ~a" aid mf-raw file-hash assign-data)
(async-channel-put
(current-queue) (cons 'new (assignment aid tid (parse-manifest mf-raw) file-hash assign-data)))
(void))
(define/contract (cancel-assignment aid)
(-> integer? void?)
(enforce-subject 'server)
(log-agent-info "got cancel-assignment ~a" aid)
(async-channel-put (current-queue) (cons 'stop aid))
(void))
(define/contract (cancel-all-assignments)
(-> void?)
(enforce-subject 'server)
(log-agent-info "got cancel-all-assignments")
(async-channel-put (current-queue) 'cancel-all)
(void))
;; agent impl unit
(define-unit-from-context agent-impl@ agent^)
;; server wrapper unit
(define server-wrapper@ (make-rpc-wrapper-unit server^))
(define-values/invoke-unit server-wrapper@ (import) (export server^))
(define (agent-main)
(install-logging!)
(log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version))
(log-agent-info "ffi mode: ~a" (if (static-ffi-available?) "static" "regular"))
(log-agent-info "hostname ~a" (get-hostname))
(log-agent-info "arch ~a" (static-ffi-arch))
(log-agent-info "~a cpus available" (count-cpus))
(current-queue (make-async-channel))
;; XXX : platform-specific behavior
(define (get-config.linux-gnu)
(call-with-input-file "/proc/self/exe"
(lambda (in)
(file-position in eof)
(define len (file-position in))
(file-position in (- len 4))
(define offset (integer-bytes->integer (port->bytes in) #f #t))
(file-position in (- len offset))
(fasl->s-exp in))))
(match-define (list agent-node server-node)
(if (static-ffi-available?)
(match (string-split (static-ffi-arch) "-")
[(list _ ... "linux" "gnu") (get-config.linux-gnu)]
[arch (error "XXX: don't know how to get config on arch" arch)])
(let ([config-location (make-parameter #f)])
(log-agent-info "loading development config using command line")
(command-line
#:once-each
[("-c" "--config") param "development plain config file" (config-location param)]
#:args ()
(call-with-input-file (config-location) read)))))
(current-to-node server-node)
(current-comms (make-comms agent-node))
(current-tm (make-transaction-manager agent-node (current-comms)))
(rpc-register-all agent^ agent-impl@)
(comms-set-node-info (current-comms) server-node)
(define workdir
(let ([tmpdir (find-system-path 'temp-dir)])
(build-path tmpdir "crossfire-agent" (substring (bytes->hex-string (node-pubkey agent-node))
0 32))))
(make-directory* workdir)
(log-agent-info "using workdir ~a" workdir)
(define cache-info (update-workdir-cache! workdir))
(log-agent-info "connecting to server...")
(with-handlers ([exn:break? (lambda (_)
(log-agent-info "connection cancelled")
(exit))])
;; TODO : use net/dns dns lookup if it's a hostname because glibc crashes lol
(let loop ([sleep-time 1])
(define maybe-exn
(with-handlers ([exn:fail? identity])
(comms-connect (current-comms) (node-id server-node))
(agent-report-state #f #f)
#f))
(when maybe-exn
(log-agent-error "error connecting to server")
((error-display-handler) (exn-message maybe-exn) maybe-exn)
(sleep sleep-time)
(loop (min 120 (* sleep-time 2))))))
(log-agent-info "connected! ready to do stuff")
(with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))])
(agent-loop workdir cache-info))
(void))
(module+ main
(agent-main))