implement almost the rest of agent handling

This commit is contained in:
xenia 2020-11-23 01:51:26 -05:00
parent 71a3a6d2ef
commit 88f8ba6749
9 changed files with 331 additions and 58 deletions

View File

@ -47,6 +47,9 @@ takes the difficulty out of creating custom brute force jobs
- ✅ base definitions of input classes and how to divide them - ✅ base definitions of input classes and how to divide them
- dynamic slicing and scheduling based on agents' reported work rate - dynamic slicing and scheduling based on agents' reported work rate
- low priority: randomized input space distribution - low priority: randomized input space distribution
- low priority: store common configuration templates for clients
- low priority: track upload/download progress
- streaming interface for file transfers
- accept submitted projects (with client-compiled input generator) and distribute to agents - accept submitted projects (with client-compiled input generator) and distribute to agents
- ✅ low priority: support for multiple architectures - ✅ low priority: support for multiple architectures
- ✅ agent authentication - ✅ agent authentication
@ -59,6 +62,7 @@ takes the difficulty out of creating custom brute force jobs
- report work rate - report work rate
- report successes - report successes
- low priority: defer to external brute force program (eg, hashcat on GPU) - low priority: defer to external brute force program (eg, hashcat on GPU)
- low priority: support finding _all_ matching inputs for a project, rather than just the first one
# client: submit jobs and view progress # client: submit jobs and view progress
- ✅securely connect to server - ✅securely connect to server

View File

@ -434,6 +434,8 @@
(define arg-data (trans-data-deserialize rpc-data)) (define arg-data (trans-data-deserialize rpc-data))
(define result (define result
(parameterize ([current-from-node (comms-get-node-info comms from-id)]) (parameterize ([current-from-node (comms-get-node-info comms from-id)])
;; TODO : apply timeout on the handler function?
;; we don't want this thread to potentially hang forever if there's some sort of deadlock
(apply func arg-data))) (apply func arg-data)))
(displayln (list "result" result "sending back...")) (displayln (list "result" result "sending back..."))
(respond result))) (respond result)))
@ -504,6 +506,7 @@
;; utility functions and macros for defining rpcs ;; utility functions and macros for defining rpcs
;; parameters for comms, tm, and targeted node ;; parameters for comms, tm, and targeted node
;; TODO : make these default parameters for all the wrapper functions?
(define current-comms (make-parameter #f)) (define current-comms (make-parameter #f))
(define current-tm (make-parameter #f)) (define current-tm (make-parameter #f))
(define current-to-node (make-parameter #f)) (define current-to-node (make-parameter #f))

View File

@ -21,7 +21,7 @@
(define pkg-authors '(haskal)) (define pkg-authors '(haskal))
(define collection "crossfire") (define collection "crossfire")
(define deps '("base" "db-lib" "scribble-text-lib" (define deps '("base" "db-lib" "scribble-text-lib" "srfi-lite-lib"
"north")) "north"))
(define build-deps '("scribble-lib" "racket-doc" "rackunit-lib")) (define build-deps '("scribble-lib" "racket-doc" "rackunit-lib"))
(define scribblings '(("scribblings/crossfire.scrbl" ()))) (define scribblings '(("scribblings/crossfire.scrbl" ())))

View File

@ -116,7 +116,7 @@
(manifest manifest-def pattern psize)) (manifest manifest-def pattern psize))
;; get data from the manifest ;; get data from the manifest
(define/contract (manifest-data-get mf key [fail-thunk #f]) (define/contract (manifest-data-ref mf key [fail-thunk #f])
(->* (manifest? symbol?) (any/c) any) (->* (manifest? symbol?) (any/c) any)
(or (rest (assoc key (manifest-data mf))) (or (rest (assoc key (manifest-data mf)))
(and (procedure? fail-thunk) (fail-thunk)) (and (procedure? fail-thunk) (fail-thunk))
@ -124,3 +124,5 @@
;; the struct contains the original data ;; the struct contains the original data
(define serialize-manifest manifest-data) (define serialize-manifest manifest-data)
(provide parse-manifest manifest-data-ref serialize-manifest)

View File

@ -19,15 +19,17 @@ create table task(id integer primary key, name text not null, manifest blob not
-- } -- }
-- @up { -- @up {
create table task_log(taskid integer not null, worker integer not null, create table task_log(taskid integer not null, worker integer not null,
time_start timestamp not null, time_wall_start timestamp not null,
time_end timestamp check(time_end >= time_start) not null, duration integer not null,
pattern blob not null, pattern blob not null,
foreign key (taskid) references tasks(id) on delete cascade, foreign key (taskid) references tasks(id) on delete cascade,
foreign key (worker) references node(id) on delete restrict); foreign key (worker) references node(id) on delete restrict);
-- } -- }
-- @up { -- @up {
create table task_match(taskid integer not null, time timestamp not null, match blob not null, create table task_match(taskid integer not null, worker integer not null,
foreign key (taskid) references tasks(id) on delete cascade); time_wall timestamp not null, match blob not null,
foreign key (taskid) references tasks(id) on delete cascade,
foreign key (worker) references node(id) on delete restrict);
-- } -- }
-- @down { -- @down {

View File

@ -106,6 +106,32 @@
(crypto-random-bytes 24)) (crypto-random-bytes 24))
(provide crypto-lock-make-key crypto-lock-make-nonce) (provide crypto-lock-make-key crypto-lock-make-nonce)
;; hashing
;; we use the size of crypto_blake2b_ctx on 64-bit platforms, as this should be large enough even on
;; 32-bit platforms
;; this should be more robust tbh but without pulling in the whole C header parsing library idk how
;; else we can know the size of this struct without just hardcoding it
(define *crypto-blake2b-ctx-size* 224)
;; default hash output size
(define *crypto-blake2b-hash-size* 64)
;; opaque datatype yeet
(struct crypto-blake2b-ctx [data])
(define/ffi crypto-blake2b-init
(_fun (ctx : (_u8vector o *crypto-blake2b-ctx-size*)) -> _void -> (crypto-blake2b-ctx ctx)))
(provide crypto-blake2b-init)
(define/ffi crypto-blake2b-update
(_fun (ctx-obj : _?) (ctx : _u8vector = (crypto-blake2b-ctx-data ctx-obj))
(msg : _u8vector) (msgsize : _size = (u8vector-length msg))
-> _void))
(provide/contract [crypto-blake2b-update (contract:-> crypto-blake2b-ctx? bytes? void?)])
(define/ffi crypto-blake2b-final
(_fun (ctx-obj : _?) (ctx : _u8vector = (crypto-blake2b-ctx-data ctx-obj))
(hash : (_u8vector o *crypto-blake2b-hash-size*)) -> _void -> hash))
(provide/contract [crypto-blake2b-final (contract:-> crypto-blake2b-ctx? bytes?)])
;; misc ;; misc
(define/ffi crypto-wipe (define/ffi crypto-wipe

View File

@ -29,10 +29,13 @@
delete-agent delete-agent
new-project new-project
get-projects get-projects
get-project-file
agent-report-state]) agent-report-state])
(define-signature agent^ (define-signature agent^
[todo]) [push-assignment
cancel-assignment
cancel-all-assignments])
(define-signature client^ (define-signature client^
[todo]) [todo])

View File

@ -17,8 +17,8 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>. ;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3 (require db/base db/sqlite3
data/queue racket/bool racket/fasl racket/file racket/match racket/path racket/random data/queue racket/bool racket/contract racket/fasl racket/file racket/list racket/match
racket/runtime-path racket/set racket/string racket/unit racket/path racket/random racket/runtime-path racket/set racket/string racket/unit srfi/19
north/base north/adapter/base north/adapter/sqlite north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt" "comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync ;; port-fsync
@ -88,12 +88,28 @@
(define-stmt q-get-tasks "select id, name, manifest, complete from task") (define-stmt q-get-tasks "select id, name, manifest, complete from task")
(define-stmt q-get-task-log (define-stmt q-get-task-log
"select worker, time_start, time_end, pattern from task_log where taskid=?") "select worker, time_wall_start, duration, pattern from task_log where taskid=?")
(define-stmt q-add-task-log (define-stmt q-add-task-log
"insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)") "insert into task_log (taskid, worker, time_wall_start, duration, pattern)
values (?, ?, ?, ?, ?)")
(define-stmt q-add-task-match
"insert into task_match (taskid, worker, time_wall, match) values (?, ?, ?, ?)")
;; utils ;; utils
;; time helpers (because time is a bigge heck)
;; monotonic time can be different than wall clock time
;; for our purposes, tasks have two measures of time associated with them:
;; - wall clock instant representing when the task was started
;; - duration of the task (as a difference of monotonic times)
;; it's important never to mix up wall clock and monotonic measurements, and never to take a
;; difference between two wall clock times
;; fortunately, racket provides srfi/19 as part of the default installation which is very convenient
(define (current-seconds-monotonic)
(time-second (current-time 'time-monotonic)))
(define (current-seconds-utc)
(time-second (current-time 'time-utc)))
(define (query/insert-id db stmt . args) (define (query/insert-id db stmt . args)
(define info (simple-result-info (apply query db stmt args))) (define info (simple-result-info (apply query db stmt args)))
(cdr (assoc 'insert-id info))) (cdr (assoc 'insert-id info)))
@ -135,7 +151,24 @@
#:exists 'truncate) #:exists 'truncate)
(query-exec (current-db) q-set-task-commit taskid)) (query-exec (current-db) q-set-task-commit taskid))
;; rpc calls ;; computes a hash of the file identifying its current contents for agents
;; (in case we reuse taskids)
(define (server-hash-file taskid)
(define path (build-path SERVER-FILES-PATH (number->string taskid)))
(define *read-size* 8192)
(call-with-input-file path (lambda (in)
(define ctx (crypto-blake2b-init))
(let loop ()
(match (read-bytes *read-size* in)
[(? eof-object?) (void)]
[data (crypto-blake2b-update ctx data)
(loop)]))
(crypto-blake2b-final ctx))))
(define (server-get-file taskid)
(define path (build-path SERVER-FILES-PATH (number->string taskid)))
(file->bytes path))
(struct node-info [id name arch type resources online?] #:prefab) (struct node-info [id name arch type resources online?] #:prefab)
;; manifest is the raw source format ;; manifest is the raw source format
@ -175,8 +208,9 @@
[_ (error "XXX: don't know how to configure arch" agent-arch)])) [_ (error "XXX: don't know how to configure arch" agent-arch)]))
;; manifest is the raw form ;; manifest is the raw form
(define (make-task name manifest tar) (define (make-task manifest tar)
(define manifest-data (s-exp->fasl manifest)) (define manifest-data (s-exp->fasl manifest))
(define name (second (assoc 'name manifest-data)))
(define id (query/insert-id (current-db) q-new-task name manifest-data)) (define id (query/insert-id (current-db) q-new-task name manifest-data))
(server-commit-file id tar) (server-commit-file id tar)
id) id)
@ -191,20 +225,24 @@
[(== (symbol->string type)) (void)] [(== (symbol->string type)) (void)]
[x (error "wrong node type" x)])) [x (error "wrong node type" x)]))
;; client rpcs ;; client rpcs
(define (get-agents) (define (get-agents)
(enforce-subject 'client) (enforce-subject 'client)
(get-nodes 'agent)) (get-nodes 'agent))
(define (new-agent name arch resources) (define/contract (new-agent name arch resources)
(-> string? string? (listof string?) integer?)
(enforce-subject 'client) (enforce-subject 'client)
(define-values [id public] (make-node name arch 'agent resources)) (define-values [id public] (make-node name arch 'agent resources))
(define comms-node (node id name 'agent public #f #f #f)) (define comms-node (node id name 'agent public #f #f #f))
(comms-set-node-info (current-comms) comms-node) (comms-set-node-info (current-comms) comms-node)
(agent-handler-new-agent id resources)
id) id)
(define (edit-agent id name resources) (define/contract (edit-agent id name resources)
(-> integer? string? (listof string?) void?)
(enforce-subject 'client) (enforce-subject 'client)
(call-with-transaction (current-db) (lambda () (call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent) (enforce-object id 'agent)
@ -216,9 +254,14 @@
(for ([res (in-set (set-subtract existing-resource new-resource))]) (for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res)))) (query-exec (current-db) q-del-node-res id res))))
(define comms-node (comms-get-node-info (current-comms) id)) (define comms-node (comms-get-node-info (current-comms) id))
(comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))) (comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))
(agent-handler-delete-agent id)
(agent-handler-new-agent id resources)
(void))
(define (get-agent-deployment id) (define/contract (get-agent-deployment id)
(-> integer? bytes?)
;; TODO : streaming interface
(enforce-subject 'client) (enforce-subject 'client)
(match (query-maybe-row (current-db) q-get-node-info id) (match (query-maybe-row (current-db) q-get-node-info id)
[(vector name arch "agent" secret) [(vector name arch "agent" secret)
@ -226,19 +269,27 @@
arch (current-server-public-node))] arch (current-server-public-node))]
[_ (error "invalid id or wrong node type")])) [_ (error "invalid id or wrong node type")]))
(define (delete-agent id) (define/contract (delete-agent id)
(-> integer? void?)
(enforce-subject 'client) (enforce-subject 'client)
(call-with-transaction (current-db) (lambda () (call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent) (enforce-object id 'agent)
(query-exec (current-db) q-delete-node id))) (query-exec (current-db) q-delete-node id)))
(comms-delete-node (current-comms) id)) (comms-delete-node (current-comms) id)
(agent-handler-delete-agent id)
(void))
;; client rpcs :: projects ;; client rpcs :: projects
(define (new-project name manifest tar) (define/contract (new-project manifest tar)
(-> list? bytes? integer?)
;; TODO : streaming interface
(enforce-subject 'client) (enforce-subject 'client)
(define id (make-task name manifest tar)) ;; check validity
(void id) (parse-manifest manifest)
(define id (make-task manifest tar))
;; notify agent handler
(agent-handler-new-task id manifest)
id) id)
(define (get-projects) (define (get-projects)
@ -246,18 +297,28 @@
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)]) (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?))) (project-info id name (fasl->s-exp manifest) complete?)))
(define/contract (get-project-file taskid)
(-> integer? bytes?)
;; TODO : streaming interface
(with-handlers ([exn? (lambda (ex) (error "unable to fetch the requested file"))])
(server-get-file taskid)))
;; agent handling ;; agent handling
;; distributing subtasks
;; minimum batch of inputs that will be scheduled for a node ;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch ;; the batch size scales to how fast the node completes a batch
(define MIN-SUBTASK-SIZE 16) (define *min-subtask-size* 16)
;; aim to batch every 5 minutes ;; aim to batch every 5 minutes
;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved ;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
(define OPTIMAL-COMPLETION-SECS 300) (define *optimal-completion-secs* 300)
;; tasks will be reassigned if not completed within this time ;; tasks will be reassigned if not completed within this time
(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS)) (define *subtask-timeout* (* 3 *optimal-completion-secs*))
;; constatns for agent rpc retries
(define *min-retry-delay* 2)
(define *max-retry-delay* 120)
(define *retry-delay-ratio* 2)
(define (agent-handler) (define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
@ -265,15 +326,21 @@
(define this-thread (current-thread)) (define this-thread (current-thread))
;; make an auto cleanup thread :P ;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust))) (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; setup agent rpcs
(define agent-wrapper@ (make-rpc-wrapper-unit agent^))
(define-values/invoke-unit agent-wrapper@ (import) (export agent^))
;; tasks ;; tasks
;; semaphore guarding mutation (sigh) ;; semaphore guarding mutation (sigh)
;; manifest: task manifest (parsed version) ;; manifest: task manifest (parsed version)
;; work-pattern: the integer-set representing the work left to do ;; work-pattern: the integer-set representing the work left to do
;; agent-todo: hash of agent id to integer-set representing work the agent is working on ;; agent-todo: hash of agent id to integer-set representing work the agent is working on
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent) ;; file-hash: the hash to send to agents to identify the contents of the project file more
;; precisely than just the taskid, which allows them to cache the file locally
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo file-hash] #:transparent)
(define (initialize-task! id mf) (define (initialize-task id mf)
(define file-hash (server-hash-file id))
(define sema (make-semaphore 1)) (define sema (make-semaphore 1))
(define agent-todo (make-hash)) (define agent-todo (make-hash))
(define init-pattern-range (range->integer-set 0 (manifest-psize mf))) (define init-pattern-range (range->integer-set 0 (manifest-psize mf)))
@ -283,7 +350,7 @@
([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)]) ([(_1 _2 _3 pat-fasl) (in-query (current-db) q-get-task-log id)])
(define sub (make-integer-set (fasl->s-exp pat-fasl))) (define sub (make-integer-set (fasl->s-exp pat-fasl)))
(integer-set-subtract pattern-range sub))) (integer-set-subtract pattern-range sub)))
(task-state id sema mf pattern-range agent-todo)) (task-state id sema mf pattern-range agent-todo file-hash))
;; this doesn't update the database - that only gets updated when the work is complete ;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount) (define (task-assign! ts agent-id requested-amount)
@ -313,18 +380,22 @@
(set-task-state-work-pattern! ts new-wp) (set-task-state-work-pattern! ts new-wp)
(hash-remove! (task-state-agent-todo ts) agent-id)])))) (hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (task-complete! ts agent-id time-start time-end) ;; adds to task log, then updates work pool with task completion
(define (task-complete! ts agent-id time-wall-start duration)
(call-with-semaphore (task-state-sema ts) (lambda () (call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f) (match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)] [#f (void)]
[assignment [assignment
;; add a new work log ;; add a new work log
(query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end (query-exec
(current-db) q-add-task-log (task-state-id ts) agent-id time-wall-start duration
(s-exp->fasl (integer-set-contents assignment))) (s-exp->fasl (integer-set-contents assignment)))
;; remove tracking - this work is now done ;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)])))) (hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (agent-thd id resources-in) (define (agent-thd id resources-in)
;; initialize to-node for rpcs
(current-to-node (comms-get-node-info (current-comms) id))
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
;; what they're currently working on) ;; what they're currently working on)
(define make-assignment-id (define make-assignment-id
@ -335,53 +406,194 @@
(set! resources-in #f) (set! resources-in #f)
;; tracks the sizes of task pieces given to the agent ;; tracks the sizes of task pieces given to the agent
;; task id to count of assignment size
(define task-size (make-hash)) (define task-size (make-hash))
;; current tasks, mirrors parent but only contains tasks we have resources for ;; current tasks, mirrors parent but only contains tasks we have resources for
(define current-tasks (make-hash)) (define current-tasks (make-hash))
;; assignments ;; assignments
(struct assignment [id taskid assign-data start-time handler] #:transparent) (struct assignment [id taskid assign-data start-time-utc start-time-monotonic] #:transparent)
;; active assignments, by assignment id ;; active assignments, by assignment id
(define assigned-tasks (make-hash)) (define assigned-tasks (make-hash))
;; keep running? ;; keep running?
(define run-agent-thd? #t) (define run-agent-thd? #t)
(define (send-assignment assignment) ;; helper to repeatedly invoke an agent rpc
(void "TODO")) (define (invoke/retry-forever proc)
(let init-loop ([retry-delay *min-retry-delay*])
(with-handlers ([exn? (lambda (ex)
(displayln (format "agent ~a encountered error ~a" id ex))
(sleep retry-delay)
(init-loop (min *max-retry-delay*
(* *retry-delay-ratio* retry-delay))))])
(proc))))
(define (cancel-assignment assignment) (define (create-assignment! ts)
(void "TODO")) (define requested-amount (hash-ref! task-size (task-state-id ts) *min-subtask-size*))
(define assign-data (task-assign! ts id requested-amount))
(define start-time-utc (current-seconds-utc))
(define start-time-monotonic (current-seconds-monotonic))
(define aid (make-assignment-id))
(define mf-parsed (task-state-manifest ts))
(define file-hash (task-state-file-hash ts))
(define mf-raw (serialize-manifest mf-parsed))
;; add to local tracking
(hash-set! assigned-tasks aid (assignment id (task-state-id ts) assign-data start-time-utc
start-time-monotonic))
;; send agent rpc
(invoke/retry-forever (lambda () (push-assignment aid mf-raw file-hash assign-data))))
(define (cancel-assignment! assignment)
;; tell the agent to cancel work, unassign the assignment
(define ts (hash-ref current-tasks (assignment-taskid assignment) #f))
;; update manager tracking
(unless (false? ts)
(task-unassign! ts id))
(hash-remove! assigned-tasks (assignment-id assignment))
;; send agent rpc
(invoke/retry-forever (lambda () (cancel-assignment (assignment-id assignment)))))
;; records a match for a certain assignment
(define (add-assignment-match! assignment success-input)
;; TODO : notify other things that a match occurred maybe?
(query-exec
(current-db) q-add-task-match
(assignment-taskid assignment) id (current-seconds-utc) (s-exp->fasl success-input)))
(define (complete-assignment! assignment)
(define end-time-monotonic (current-seconds-monotonic))
(define duration (- end-time-monotonic (assignment-start-time-monotonic assignment)))
(define ts (hash-ref current-tasks (assignment-taskid assignment) #f))
(hash-remove! assigned-tasks (assignment-id assignment))
(unless (false? ts)
;; mark complete
(task-complete! ts id (assignment-start-time-utc assignment) duration)
;; update next task size request
(cond
[(< duration *optimal-completion-secs*)
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (* 2 v)) *min-subtask-size*)]
[(> duration (* 2 *optimal-completion-secs*))
(hash-update! task-size (assignment-taskid assignment)
(lambda (v) (max *min-subtask-size* (/ 2 v))) *min-subtask-size*)]
[else (void)])))
(define (update-assignments!)
;; detect set of used resources
;; see set of current inactive tasks
;; if a task uses all free resources, assign it
(define used-resources
(for/fold ([used-resources (set)]) ([(aid av) (in-hash assigned-tasks)])
(define task (hash-ref current-tasks (assignment-taskid av)))
(define manifest (task-state-manifest task))
(set-union used-resources (list->set (manifest-data-ref manifest 'resources '())))))
(define available-resources (set-subtract resources used-resources))
(define assigned-taskids
(for/set ([(aid av) (in-hash assigned-tasks)]) (assignment-taskid av)))
;; the scheduling prioritizer is currently:
;; yolo just pick randomly
;; this should be good enough for all basic use cases
(define task-list
(shuffle
(for ([(tid ts) (in-hash current-tasks)]
#:unless (set-member? assigned-taskids tid))
ts)))
;; assigns the first task in the list that we are capable of doing
;; returns if we could maybe assign more
(define (select-task! task-list)
(match task-list
['() #f]
[(cons head tail)
(define manifest (task-state-manifest head))
(define needed-resources (list->set (manifest-data-ref manifest 'resources '())))
(if (subset? available-resources needed-resources)
(create-assignment! head)
(begin (select-task! tail) #t))]))
(when (select-task! task-list)
(update-assignments!)))
(define (handle-thd-msg) (define (handle-thd-msg)
(match (thread-receive) (match (thread-receive)
[(cons 'new-task ts) [(cons 'new-task ts)
(void "TODO")] (hash-set! current-tasks (task-state-id ts) ts)
[(cons 'cancel-task ts) (void "TODO")] (update-assignments!)]
[(cons 'cancel-task ts)
(define assignments
(for/list ([(_ v) (in-hash assigned-tasks)]
#:when (= (task-state-id ts) (assignment-taskid v)))
v))
(for ([av (in-list assignments)])
(cancel-assignment! av))
(hash-remove! current-tasks (task-state-id ts) ts)
(hash-remove! task-size (task-state-id ts) ts)]
;; got completion report from agent ;; got completion report from agent
[(cons 'agent-report (cons assignment-id state)) [(cons 'agent-report status)
(match state (match status
['complete (error "TODO")] ;; no current assignment
[success-input (error "TODO")])] [(cons #f _) (void)] ;; do nothing
;; current assignment incomplete
[(cons assignment-id 'incomplete) (void)] ;; also do nothing
;; current assignment complete, no success
[(cons assignment-id 'complete)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (complete-assignment! av #f))]
;; complete, success
[(cons assignment-id success-input)
(define av (hash-ref assigned-tasks assignment-id #f))
(unless (false? av) (add-assignment-match! av success-input))])]
['shutdown (set! run-agent-thd? #f)])) ['shutdown (set! run-agent-thd? #f)]))
;; TODO : agent rpc to cancel current task, with error handling (define (handle-assignment-timeout)
(define time (current-seconds-monotonic))
(define overdue-assignments
(filter (lambda (av)
(define st (assignment-start-time-monotonic av))
(>= (- time st) *subtask-timeout*))
(hash-values assigned-tasks)))
(for ([overdue (in-list overdue-assignments)])
(define taskid (assignment-taskid overdue))
;; revert to the smallest possible task size, just in case
;; we don't want to spin forever assigning potentially gigantic tasks and timing out
;; although ideally this case wouldn't occur because the timeout is three times the target
;; subtask duration
(hash-set! task-size taskid *min-subtask-size*)
(displayln (format "agent ~a timed out on task ~a" id taskid))
(cancel-assignment! overdue)))
;; cancel whatever the agent is currently working on, in case the server crashed and came back
;; up or something
;; we want agent state synchronized with what we think it should be
(invoke/retry-forever (lambda () (cancel-all-assignments)))
(let loop () (let loop ()
;; handle events ;; handle events
(define time (current-seconds-monotonic))
(define thd-evt (thread-receive-evt)) (define thd-evt (thread-receive-evt))
; (define task-timeout (define nearest-timeout
; (if task-start-time (apply min (for/list ([(aid av) (in-hash assigned-tasks)])
; (- TASK-TIMEOUT (- (current-seconds) task-start-time)) (define st (assignment-start-time-monotonic av))
; #f)) (max 0 (- *subtask-timeout* (- time st))))))
(define task-timeout #f) (match (sync/timeout nearest-timeout thd-evt)
(match (sync/timeout task-timeout thd-evt) [#f (handle-assignment-timeout)]
[#f (void "TODO: task timeout")]
[(== thd-evt) (handle-thd-msg)]) [(== thd-evt) (handle-thd-msg)])
(when run-agent-thd? (when run-agent-thd?
(loop))) (loop)))
;; TODO : cleanup, cancel current task, shut down running tasks ;; unassign all tasks
(for ([(aid av) (in-hash assigned-tasks)])
(define taskid (assignment-taskid av))
(define ts (hash-ref current-tasks taskid #f))
(unless (false? ts)
(task-unassign! ts id)))
;; call agent rpc to cancel everything
(cancel-all-assignments)
(void)) (void))
;; id to task-state ;; id to task-state
@ -394,12 +606,17 @@
(match (thread-receive) (match (thread-receive)
[(cons 'new-agent (cons id resources)) [(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust]) (parameterize ([current-custodian cust])
;; TODO : add monitor thread that detects crashes and unassigns tasks
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))] (hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'delete-agent id) [(cons 'delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f) (thread-send (hash-ref agents id) 'shutdown #f)
;; TODO : wait for thread a bit, then kill it
;; TODO : cleanup assigned tasks after thread ended by unassigning all things assigned to
;; this agent
(error "TODO: this function is half unimplemented lol")
(hash-remove! agents id)] (hash-remove! agents id)]
[(cons 'new-task (cons id manifest)) [(cons 'new-task (cons id manifest))
(define ts (initialize-task! id manifest)) (define ts (initialize-task id manifest))
(hash-set! current-tasks id ts) (hash-set! current-tasks id ts)
;; notify agents ;; notify agents
(for ([(id thd) (in-hash agents)]) (for ([(id thd) (in-hash agents)])
@ -407,10 +624,10 @@
[(cons 'agent-report (cons agent-id (cons assignment-id state))) [(cons 'agent-report (cons agent-id (cons assignment-id state)))
(thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)] (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
[(cons 'cancel-task task-id) [(cons 'cancel-task task-id)
(hash-remove! current-tasks task-id)
;; notify agents ;; notify agents
(for ([(id thd) (in-hash agents)]) (for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task task-id) #f))] (thread-send thd (cons 'cancel-task task-id) #f))
(hash-remove! current-tasks task-id)]
[_ (error "unknown agent handler message")])) [_ (error "unknown agent handler message")]))
(let loop () (let loop ()
@ -424,12 +641,28 @@
(define current-agent-handler (make-parameter #f)) (define current-agent-handler (make-parameter #f))
(define (agent-handler-new-agent id resources [ah (current-agent-handler)])
(thread-send ah (cons 'new-agent (cons id resources))))
(define (agent-handler-delete-agent id [ah (current-agent-handler)])
(thread-send ah (cons 'delete-agent id)))
(define (agent-handler-new-task id manifest [ah (current-agent-handler)])
(thread-send ah (cons 'new-task (cons id manifest))))
(define (agent-handler-cancel-task task-id [ah (current-agent-handler)])
(thread-send ah (cons 'cancel-task task-id)))
;; agent rpcs ;; agent rpcs
;; report state 'complete or a list of integer representing a success result ;; report state 'incomplete 'complete or a list of integer representing a success result
;; assignment-id is a numeric ID or #f if no currently assigned task
;; agents can call this as many times as they want -- it can serve as a sort of "ping"
(define (agent-report-state assignment-id state) (define (agent-report-state assignment-id state)
(-> (or/c false? integer?) (or/c 'incomplete 'complete (listof integer?)))
(enforce-subject 'agent) (enforce-subject 'agent)
(define agent-id (node-id (current-from-node))) (define agent-id (node-id (current-from-node)))
;; TODO : maybe wait for an actual completion here? idk
(thread-send (current-agent-handler) (thread-send (current-agent-handler)
(cons 'agent-report (cons agent-id (cons assignment-id state))))) (cons 'agent-report (cons agent-id (cons assignment-id state)))))

View File

@ -90,7 +90,7 @@
(provide port-fsync) (provide port-fsync)
(define port-fsync.unix (define port-fsync/unix
;; lazy time ;; lazy time
(let ([fsync-call #f]) (let ([fsync-call #f])
(lambda (port) (lambda (port)
@ -109,5 +109,5 @@
(define (port-fsync port) (define (port-fsync port)
(match (system-type 'os) (match (system-type 'os)
['unix (port-fsync.unix port)] ['unix (port-fsync/unix port)]
[x (error "don't know how to fsync on" x)]))) [x (error "don't know how to fsync on" x)])))