diff --git a/agent-deployment/Makefile b/agent-deployment/Makefile index 77289ee..d4f8f91 100644 --- a/agent-deployment/Makefile +++ b/agent-deployment/Makefile @@ -18,7 +18,7 @@ .PHONY: all clean APP_NAME=crossfire-agent -RKT_NAME=../crossfire/agent.rkt +RKT_NAME=$(APP_NAME).rkt MONOCYPHER_VERSION=3.1.1 diff --git a/agent-deployment/crossfire-agent.rkt b/agent-deployment/crossfire-agent.rkt new file mode 100644 index 0000000..fab1737 --- /dev/null +++ b/agent-deployment/crossfire-agent.rkt @@ -0,0 +1,22 @@ +#lang racket/base + +;; this is necessary because our C embedding runtime doesn't flush the plumber by default +;; idk how to do it in C, therefore we do it in racket, just before exiting +;; we need to flush otherwise logs might get dropped, particularly those relating to the error +(define old-esc-handler (error-escape-handler)) +(error-escape-handler (lambda () + (plumber-flush-all (current-plumber)) + (old-esc-handler))) +;; execute agent-main +; (require "../crossfire/agent.rkt") +; (agent-main) +(require "../crossfire/static-support.rkt") +(require ffi/unsafe) +(define _rktio (_cpointer 'rktio)) +(define rktio-inst (get-ffi-obj/static "scheme_rktio" _rktio)) +(define rktio-call (get-ffi-obj/static "rktio_processor_count" (_fun _rktio -> _int))) + +(printf "num cpus: ~a\n" (rktio-call rktio-inst)) + +;; and if no error, also flush +(plumber-flush-all (current-plumber)) diff --git a/agent-deployment/main_bc.c b/agent-deployment/main_bc.c index c4d6475..518487d 100644 --- a/agent-deployment/main_bc.c +++ b/agent-deployment/main_bc.c @@ -37,6 +37,10 @@ typedef struct { #define STR(x) #x #define FFI_ENT(name) {STR(name), (uintptr_t) name} +// runtime hacks +void rktio_init_cpu(void* rktio); +int rktio_processor_count(void* rktio); + static const ffi_ent ffi_table[] = { FFI_ENT(crypto_sign_public_key), FFI_ENT(crypto_sign), @@ -44,7 +48,13 @@ static const ffi_ent ffi_table[] = { FFI_ENT(crypto_key_exchange), FFI_ENT(crypto_lock), FFI_ENT(crypto_unlock), - FFI_ENT(crypto_wipe) + FFI_ENT(crypto_wipe), + FFI_ENT(crypto_blake2b_init), + FFI_ENT(crypto_blake2b_update), + FFI_ENT(crypto_blake2b_final), + + FFI_ENT(rktio_init_cpu), + FFI_ENT(rktio_processor_count) }; static const size_t ffi_table_size = sizeof(ffi_table)/sizeof(ffi_ent); diff --git a/crossfire/agent.rkt b/crossfire/agent.rkt index 935446b..bed5dcf 100644 --- a/crossfire/agent.rkt +++ b/crossfire/agent.rkt @@ -17,17 +17,21 @@ ;; along with this program. If not, see . (require file/untgz (only-in file/sha1 bytes->hex-string) racket/async-channel racket/bool - racket/contract racket/fasl racket/file racket/function racket/match racket/path - racket/port racket/string racket/unit srfi/19 + racket/cmdline racket/contract racket/fasl racket/file racket/function racket/match + racket/path racket/port racket/string racket/unit srfi/19 "comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt" - "static-support.rkt" + "pattern.rkt" "static-support.rkt" ;; port-fsync (submod "static-support.rkt" misc-calls)) +(provide agent-main) + (define-logger agent #:parent global-logger) (define *max-cache-age* (* 3600 24 7)) (define *ping-secs* 30) +(define *subproc-kill-delay* 10) +(define *report-retry-secs* 30) ;; global variables, yeet @@ -131,34 +135,91 @@ (hash-set! downloads tid dl)])]) (when run-agent? (loop))) - + ;; TODO : report errors for all in-progress assignments or something (custodian-shutdown-all cust)) (define (execute-assignment assignment extract-dir) (define aid (assignment-id assignment)) (log-agent-info "starting execution of ~a" aid) + (define cust (make-custodian)) + (current-subprocess-custodian-mode 'kill) + (define (cleanup) (log-agent-info "execution of ~a complete" aid) + (custodian-shutdown-all cust) (agent-report-state aid 'complete) (async-channel-put (current-queue) (cons 'stop aid))) (define (report-error ex) (log-agent-info "execution of ~a ran into error" aid) + (custodian-shutdown-all cust) ((error-display-handler) (exn-message ex) ex) (agent-report-state aid 'error) (async-channel-put (current-queue) (cons 'stop aid))) (with-handlers ([exn:fail? report-error]) (define work-range (assignment-work-range assignment)) + (define manifest (assignment-manifest assignment)) (log-agent-info "the work for assignment ~a is ~a" aid work-range) - (sleep 10) - (void "TODO") - (cleanup))) + (define cmd (manifest-data-ref manifest 'command)) + (define num-cpus (count-cpus)) + ;; TODO : handle smp + + (for ([interval (in-list work-range)]) + (define pp-start (resolve-pattern-pos (pos->pattern-pos (car interval)))) + (define pp-end (resolve-pattern-pos (pos->pattern-pos (cdr interval)))) + (define args (for/fold ([args '()]) ([pps (in-vector pp-start)] [ppe (in-vector pp-end)]) + ;; TODO : this isn't very efficient... + (append args (list (number->string (car pps) 16) + (number->string (cdr pps) 16) + (number->string (car ppe) 16) + (number->string (cdr ppe) 16))))) + + (define-values [proc in out _] + (parameterize ([current-custodian cust]) + (apply subprocess #f #f (current-error-port) 'new (append cmd args)))) + + (with-handlers ([exn:break (lambda (_) (subprocess-kill proc #f) + (sync/timeout *subproc-kill-delay* proc) + ;; will handle killing for us :P + (custodian-shutdown-all cust) + ;; exit without reporting status + (kill-thread (current-thread)))]) + (define line-match (regexp-match-evt #px"^[^\n]*\n" out)) + (let loop () + (match (sync proc line-match) + [(== proc) (void "TODO")] + [(list line) + (define line-parts (map (lambda (x) (string->number x 16)) + (string-split line " "))) + ;; check format, if it looks correct-ish then report it + ;; otherwise warn + (if (and ((listof integer?) line-parts) + (= (length line-parts) (vector-length (manifest-pattern manifest)))) + (report-success/retry aid line-parts) + (log-agent-warning "assignment ~a input loop got unparseable line ~a" aid line)) + (loop)] + [x (log-agent-warning "assignment ~a input loop got unexpected value ~a" aid x) + (loop)]))) + + (void "TODO"))) + + (cleanup) + (void)) ;; utils +;; reports success, forever, until it works +(define (report-success/retry aid value) + (log-agent-info "assignment ~a reporting succeeding input ~a" aid value) + (with-handlers ([exn:fail? (lambda (ex) + (log-agent-warning "assignment ~a failed to report success" aid) + (sleep *report-retry-secs*) + (report-success/retry aid value))]) + (agent-report-state aid value))) + ;; updates the file cache by deleting expired stuff according to the given cache-info ;; the file cache uses utc time, not monotonic time ;; if that means stuff ends up in the future, delete those too @@ -244,11 +305,11 @@ (define server-wrapper@ (make-rpc-wrapper-unit server^)) (define-values/invoke-unit server-wrapper@ (import) (export server^)) -(module+ main - (require racket/cmdline) - +(define (agent-main) (install-logging!) (log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version)) + (log-agent-info "ffi mode: ~a" (if (static-ffi-available?) "static" "regular")) + (log-agent-info "~a cpus available" (count-cpus)) (current-queue (make-async-channel)) ;; XXX : platform-specific behavior @@ -308,4 +369,8 @@ (log-agent-info "connected! ready to do stuff") (with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))]) - (agent-loop workdir cache-info))) + (agent-loop workdir cache-info)) + (void)) + +(module+ main + (agent-main)) diff --git a/crossfire/codegen.rktc b/crossfire/codegen.rktc index 4f66c2f..cfe35f5 100644 --- a/crossfire/codegen.rktc +++ b/crossfire/codegen.rktc @@ -136,7 +136,8 @@ int main(int argc, char* argv[]) { } @list{ void cf_report_success( @arg-vs ) { - // TODO + @;; currently, we output to stdout, similarly if we were running in a piped mode + printf( @(format "\"~a\\n\"" fmt) , @vs ); } }) diff --git a/crossfire/static-support.rkt b/crossfire/static-support.rkt index ce490c6..d32efb3 100644 --- a/crossfire/static-support.rkt +++ b/crossfire/static-support.rkt @@ -88,7 +88,7 @@ (module+ misc-calls (require ffi/unsafe/port racket/match) - (provide port-fsync current-seconds-monotonic) + (provide port-fsync count-cpus current-seconds-monotonic) ;; XXX : platform-specific behavior @@ -121,6 +121,32 @@ [x (error "don't know how to fsync on" x)])) + ;; this provides an actual count of the number of CPUs on the system, even without + ;; --enable-futures by hooking into the underlying rktio call that gets skipped when the current + ;; VM is configured without --enable-futures + ;; XXX : i'm not entirely sure what the actual processor-count call looks like on chez, and it + ;; probably doesn't suffer from the same issue. i'll get back to this when chez is the default vm + (define count-cpus/bc + (let ([num-cpus #f]) + (lambda () + (when (false? num-cpus) + (define self (if (static-ffi-available?) #f (ffi-lib #f))) + ;; this should be big enough... + (define rktio-fake (malloc 512 'atomic)) + (define rktio-init (get-ffi-obj/runtime "rktio_init_cpu" self + (_fun _pointer -> _void))) + (define rktio-call (get-ffi-obj/runtime "rktio_processor_count" self + (_fun _pointer -> _int))) + (rktio-init rktio-fake) + (set! num-cpus (rktio-call rktio-fake))) + num-cpus))) + + (define (count-cpus) + (match (system-type 'vm) + ['racket (count-cpus/bc)] + [x (error "don't know how to count-cpus on vm" x)])) + + ;; time helpers (because time is a bigge heck) ;; monotonic time can be different than wall clock time ;; for our purposes, tasks have two measures of time associated with them: