WIP implement agent basics

This commit is contained in:
xenia 2020-12-12 20:28:15 -05:00
parent 35122c660c
commit 93be02c535
6 changed files with 139 additions and 15 deletions

View File

@ -18,7 +18,7 @@
.PHONY: all clean .PHONY: all clean
APP_NAME=crossfire-agent APP_NAME=crossfire-agent
RKT_NAME=../crossfire/agent.rkt RKT_NAME=$(APP_NAME).rkt
MONOCYPHER_VERSION=3.1.1 MONOCYPHER_VERSION=3.1.1

View File

@ -0,0 +1,22 @@
#lang racket/base
;; this is necessary because our C embedding runtime doesn't flush the plumber by default
;; idk how to do it in C, therefore we do it in racket, just before exiting
;; we need to flush otherwise logs might get dropped, particularly those relating to the error
(define old-esc-handler (error-escape-handler))
(error-escape-handler (lambda ()
(plumber-flush-all (current-plumber))
(old-esc-handler)))
;; execute agent-main
; (require "../crossfire/agent.rkt")
; (agent-main)
(require "../crossfire/static-support.rkt")
(require ffi/unsafe)
(define _rktio (_cpointer 'rktio))
(define rktio-inst (get-ffi-obj/static "scheme_rktio" _rktio))
(define rktio-call (get-ffi-obj/static "rktio_processor_count" (_fun _rktio -> _int)))
(printf "num cpus: ~a\n" (rktio-call rktio-inst))
;; and if no error, also flush
(plumber-flush-all (current-plumber))

View File

@ -37,6 +37,10 @@ typedef struct {
#define STR(x) #x #define STR(x) #x
#define FFI_ENT(name) {STR(name), (uintptr_t) name} #define FFI_ENT(name) {STR(name), (uintptr_t) name}
// runtime hacks
void rktio_init_cpu(void* rktio);
int rktio_processor_count(void* rktio);
static const ffi_ent ffi_table[] = { static const ffi_ent ffi_table[] = {
FFI_ENT(crypto_sign_public_key), FFI_ENT(crypto_sign_public_key),
FFI_ENT(crypto_sign), FFI_ENT(crypto_sign),
@ -44,7 +48,13 @@ static const ffi_ent ffi_table[] = {
FFI_ENT(crypto_key_exchange), FFI_ENT(crypto_key_exchange),
FFI_ENT(crypto_lock), FFI_ENT(crypto_lock),
FFI_ENT(crypto_unlock), FFI_ENT(crypto_unlock),
FFI_ENT(crypto_wipe) FFI_ENT(crypto_wipe),
FFI_ENT(crypto_blake2b_init),
FFI_ENT(crypto_blake2b_update),
FFI_ENT(crypto_blake2b_final),
FFI_ENT(rktio_init_cpu),
FFI_ENT(rktio_processor_count)
}; };
static const size_t ffi_table_size = sizeof(ffi_table)/sizeof(ffi_ent); static const size_t ffi_table_size = sizeof(ffi_table)/sizeof(ffi_ent);

View File

@ -17,17 +17,21 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>. ;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require file/untgz (only-in file/sha1 bytes->hex-string) racket/async-channel racket/bool (require file/untgz (only-in file/sha1 bytes->hex-string) racket/async-channel racket/bool
racket/contract racket/fasl racket/file racket/function racket/match racket/path racket/cmdline racket/contract racket/fasl racket/file racket/function racket/match
racket/port racket/string racket/unit srfi/19 racket/path racket/port racket/string racket/unit srfi/19
"comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt" "comms.rkt" "info.rkt" "logging.rkt" "not-crypto.rkt" "manifest.rkt" "protocol.rkt"
"static-support.rkt" "pattern.rkt" "static-support.rkt"
;; port-fsync ;; port-fsync
(submod "static-support.rkt" misc-calls)) (submod "static-support.rkt" misc-calls))
(provide agent-main)
(define-logger agent #:parent global-logger) (define-logger agent #:parent global-logger)
(define *max-cache-age* (* 3600 24 7)) (define *max-cache-age* (* 3600 24 7))
(define *ping-secs* 30) (define *ping-secs* 30)
(define *subproc-kill-delay* 10)
(define *report-retry-secs* 30)
;; global variables, yeet ;; global variables, yeet
@ -131,34 +135,91 @@
(hash-set! downloads tid dl)])]) (hash-set! downloads tid dl)])])
(when run-agent? (loop))) (when run-agent? (loop)))
;; TODO : report errors for all in-progress assignments or something
(custodian-shutdown-all cust)) (custodian-shutdown-all cust))
(define (execute-assignment assignment extract-dir) (define (execute-assignment assignment extract-dir)
(define aid (assignment-id assignment)) (define aid (assignment-id assignment))
(log-agent-info "starting execution of ~a" aid) (log-agent-info "starting execution of ~a" aid)
(define cust (make-custodian))
(current-subprocess-custodian-mode 'kill)
(define (cleanup) (define (cleanup)
(log-agent-info "execution of ~a complete" aid) (log-agent-info "execution of ~a complete" aid)
(custodian-shutdown-all cust)
(agent-report-state aid 'complete) (agent-report-state aid 'complete)
(async-channel-put (current-queue) (cons 'stop aid))) (async-channel-put (current-queue) (cons 'stop aid)))
(define (report-error ex) (define (report-error ex)
(log-agent-info "execution of ~a ran into error" aid) (log-agent-info "execution of ~a ran into error" aid)
(custodian-shutdown-all cust)
((error-display-handler) (exn-message ex) ex) ((error-display-handler) (exn-message ex) ex)
(agent-report-state aid 'error) (agent-report-state aid 'error)
(async-channel-put (current-queue) (cons 'stop aid))) (async-channel-put (current-queue) (cons 'stop aid)))
(with-handlers ([exn:fail? report-error]) (with-handlers ([exn:fail? report-error])
(define work-range (assignment-work-range assignment)) (define work-range (assignment-work-range assignment))
(define manifest (assignment-manifest assignment))
(log-agent-info "the work for assignment ~a is ~a" aid work-range) (log-agent-info "the work for assignment ~a is ~a" aid work-range)
(sleep 10)
(void "TODO")
(cleanup))) (define cmd (manifest-data-ref manifest 'command))
(define num-cpus (count-cpus))
;; TODO : handle smp
(for ([interval (in-list work-range)])
(define pp-start (resolve-pattern-pos (pos->pattern-pos (car interval))))
(define pp-end (resolve-pattern-pos (pos->pattern-pos (cdr interval))))
(define args (for/fold ([args '()]) ([pps (in-vector pp-start)] [ppe (in-vector pp-end)])
;; TODO : this isn't very efficient...
(append args (list (number->string (car pps) 16)
(number->string (cdr pps) 16)
(number->string (car ppe) 16)
(number->string (cdr ppe) 16)))))
(define-values [proc in out _]
(parameterize ([current-custodian cust])
(apply subprocess #f #f (current-error-port) 'new (append cmd args))))
(with-handlers ([exn:break (lambda (_) (subprocess-kill proc #f)
(sync/timeout *subproc-kill-delay* proc)
;; will handle killing for us :P
(custodian-shutdown-all cust)
;; exit without reporting status
(kill-thread (current-thread)))])
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
(let loop ()
(match (sync proc line-match)
[(== proc) (void "TODO")]
[(list line)
(define line-parts (map (lambda (x) (string->number x 16))
(string-split line " ")))
;; check format, if it looks correct-ish then report it
;; otherwise warn
(if (and ((listof integer?) line-parts)
(= (length line-parts) (vector-length (manifest-pattern manifest))))
(report-success/retry aid line-parts)
(log-agent-warning "assignment ~a input loop got unparseable line ~a" aid line))
(loop)]
[x (log-agent-warning "assignment ~a input loop got unexpected value ~a" aid x)
(loop)])))
(void "TODO")))
(cleanup)
(void))
;; utils ;; utils
;; reports success, forever, until it works
(define (report-success/retry aid value)
(log-agent-info "assignment ~a reporting succeeding input ~a" aid value)
(with-handlers ([exn:fail? (lambda (ex)
(log-agent-warning "assignment ~a failed to report success" aid)
(sleep *report-retry-secs*)
(report-success/retry aid value))])
(agent-report-state aid value)))
;; updates the file cache by deleting expired stuff according to the given cache-info ;; updates the file cache by deleting expired stuff according to the given cache-info
;; the file cache uses utc time, not monotonic time ;; the file cache uses utc time, not monotonic time
;; if that means stuff ends up in the future, delete those too ;; if that means stuff ends up in the future, delete those too
@ -244,11 +305,11 @@
(define server-wrapper@ (make-rpc-wrapper-unit server^)) (define server-wrapper@ (make-rpc-wrapper-unit server^))
(define-values/invoke-unit server-wrapper@ (import) (export server^)) (define-values/invoke-unit server-wrapper@ (import) (export server^))
(module+ main (define (agent-main)
(require racket/cmdline)
(install-logging!) (install-logging!)
(log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version)) (log-agent-info "starting crossfire-agent v~a" (#%info-lookup 'version))
(log-agent-info "ffi mode: ~a" (if (static-ffi-available?) "static" "regular"))
(log-agent-info "~a cpus available" (count-cpus))
(current-queue (make-async-channel)) (current-queue (make-async-channel))
;; XXX : platform-specific behavior ;; XXX : platform-specific behavior
@ -308,4 +369,8 @@
(log-agent-info "connected! ready to do stuff") (log-agent-info "connected! ready to do stuff")
(with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))]) (with-handlers ([exn:break? (lambda (_) (log-agent-info "stopping agent"))])
(agent-loop workdir cache-info))) (agent-loop workdir cache-info))
(void))
(module+ main
(agent-main))

View File

@ -136,7 +136,8 @@ int main(int argc, char* argv[]) {
} }
@list{ @list{
void cf_report_success( @arg-vs ) { void cf_report_success( @arg-vs ) {
// TODO @;; currently, we output to stdout, similarly if we were running in a piped mode
printf( @(format "\"~a\\n\"" fmt) , @vs );
} }
}) })

View File

@ -88,7 +88,7 @@
(module+ misc-calls (module+ misc-calls
(require ffi/unsafe/port racket/match) (require ffi/unsafe/port racket/match)
(provide port-fsync current-seconds-monotonic) (provide port-fsync count-cpus current-seconds-monotonic)
;; XXX : platform-specific behavior ;; XXX : platform-specific behavior
@ -121,6 +121,32 @@
[x (error "don't know how to fsync on" x)])) [x (error "don't know how to fsync on" x)]))
;; this provides an actual count of the number of CPUs on the system, even without
;; --enable-futures by hooking into the underlying rktio call that gets skipped when the current
;; VM is configured without --enable-futures
;; XXX : i'm not entirely sure what the actual processor-count call looks like on chez, and it
;; probably doesn't suffer from the same issue. i'll get back to this when chez is the default vm
(define count-cpus/bc
(let ([num-cpus #f])
(lambda ()
(when (false? num-cpus)
(define self (if (static-ffi-available?) #f (ffi-lib #f)))
;; this should be big enough...
(define rktio-fake (malloc 512 'atomic))
(define rktio-init (get-ffi-obj/runtime "rktio_init_cpu" self
(_fun _pointer -> _void)))
(define rktio-call (get-ffi-obj/runtime "rktio_processor_count" self
(_fun _pointer -> _int)))
(rktio-init rktio-fake)
(set! num-cpus (rktio-call rktio-fake)))
num-cpus)))
(define (count-cpus)
(match (system-type 'vm)
['racket (count-cpus/bc)]
[x (error "don't know how to count-cpus on vm" x)]))
;; time helpers (because time is a bigge heck) ;; time helpers (because time is a bigge heck)
;; monotonic time can be different than wall clock time ;; monotonic time can be different than wall clock time
;; for our purposes, tasks have two measures of time associated with them: ;; for our purposes, tasks have two measures of time associated with them: