From 94e72e699dec6aeb6edb0bebbe9e6703d61f0bfa Mon Sep 17 00:00:00 2001 From: haskal Date: Wed, 18 Nov 2020 01:47:18 -0500 Subject: [PATCH] wip implementation of task distribution --- crossfire/protocol.rkt | 178 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 160 insertions(+), 18 deletions(-) diff --git a/crossfire/protocol.rkt b/crossfire/protocol.rkt index a4ea94e..1c0efff 100644 --- a/crossfire/protocol.rkt +++ b/crossfire/protocol.rkt @@ -18,7 +18,7 @@ (require db/base db/sqlite3 data/queue racket/bool racket/fasl racket/file racket/match - racket/path racket/runtime-path racket/set racket/string + racket/path racket/random racket/runtime-path racket/set racket/string north/base north/adapter/base north/adapter/sqlite "comms.rkt" "not-crypto.rkt" ;; port-fsync @@ -87,6 +87,11 @@ (define-stmt q-delete-task "delete from task where id=?") (define-stmt q-get-tasks "select id, name, manifest, complete from task") +(define-stmt q-get-task-log + "select worker, time_start, time_end, pattern from task_log where taskid=?") +(define-stmt q-add-task-log + "insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)") + ;; utils (define (query/insert-id db stmt . args) @@ -241,21 +246,18 @@ (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)]) (project-info id name (fasl->s-exp manifest) complete?))) -;; agent rpcs - -(define-rpc server (agent-report something) - (enforce-subject 'agent) - (error "TODO")) - ;; agent handling ;; distributing subtasks ;; minimum batch of inputs that will be scheduled for a node ;; the batch size scales to how fast the node completes a batch -(define MIN-SUBTASK-SIZE 10) +(define MIN-SUBTASK-SIZE 16) ;; aim to batch every 5 minutes +;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved (define OPTIMAL-COMPLETION-SECS 300) +;; tasks will be reassigned if not completed within this time +(define TASK-TIMEOUT (* 3 OPTIMAL-COMPLETION-SECS)) (define (agent-handler) ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way @@ -265,22 +267,151 @@ (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust))) ;; tasks - ;; pending task queue - (define task-queue (make-queue)) - ;; id to manifest + ;; semaphore guarding mutation (sigh) + ;; manifest: task manifest + ;; work-pattern: the integer-set representing the work left to do + ;; agent-todo: hash of agent id to integer-set representing work the agent is working on + (struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent) + + (define (initialize-task! id manifest) + (define sema (make-semaphore 1)) + (define agent-todo (make-hash)) + ;; TODO : manifest processing. 100 hardcoded here u___u + (define init-pattern-range (range->integer-set 0 100)) + ;; subtract the pattern ranges that were already logged as complete + (define pattern-range + (for/fold ([pattern-range init-pattern-range]) + ([(_ _ _ pat-fasl) (in-query (current-db) q-get-task-log id)]) + (define sub (make-integer-set (fasl->s-exp pat-fasl))) + (integer-set-subtract pattern-range sub))) + (task-state id sema manifest pattern-range agent-todo)) + + ;; this doesn't update the database - that only gets updated when the work is complete + (define (task-assign! ts agent-id requested-amount) + (unless (positive? requested-amount) (error "requested amount must be positive")) + (call-with-semaphore (task-state-sema ts) (lambda () + (when (hash-has-key? (task-state-agent-todo) agent-id) + (error "agent already has work assigned")) + (define-values [assignment new-wp] + (pattern-range-take (task-state-work-pattern ts) requested-amount)) + (cond + ;; done! (maybe) + ;; check other agents work + [(zero? (integer-set-count assignment)) #f] + ;; update tracking + [else + (hash-set! (task-state-agent-todo agent-id assignment)) + (set-task-state-work-pattern! ts new-wp) + assignment])))) + + ;; returns work from agent back to the regular work pool + (define (task-unassign! ts agent-id) + (call-with-semaphore (task-state-sema ts) (lambda () + (match (hash-ref (task-state-agent-todo ts) agent-id #f) + [#f (void)] + [assignment + (define new-wp (integer-set-union assignment (task-state-work-pattern ts))) + (set-task-state-work-pattern! ts new-wp) + (hash-remove! (task-state-agent-todo ts) agent-id)])))) + + (define (task-complete! ts agent-id time-start time-end) + (call-with-semaphore (task-state-sema ts) (lambda () + (match (hash-ref (task-state-agent-todo ts) agent-id #f) + [#f (void)] + [assignment + ;; add a new work log + (query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end + (s-exp->fasl (integer-set-contents assignment))) + ;; remove tracking - this work is now done + (hash-remove! (task-state-agent-todo ts) agent-id)])))) + + (define (agent-thd id resources-in) + ;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track + ;; what they're currently working on) + (define make-assignment-id + (let ([counter (random 0 1000)]) + (lambda () (begin0 counter (set! counter (add1 counter)))))) + ;; set of resources + (define resources (list->set resources-in)) + (set! resources-in #f) + + ;; tracks the sizes of task pieces given to the agent + (define task-size (make-hash)) + ;; current tasks, mirrors parent but only contains tasks we have resources for + (define current-tasks (make-hash)) + + ;; assignments + (struct assignment [id taskid assign-data start-time handler] #:transparent) + ;; active assignments, by assignment id + (define assigned-tasks (make-hash)) + + ;; keep running? + (define run-agent-thd? #t) + + (define (send-assignment assignment) + (void "TODO")) + + (define (cancel-assignment assignment) + (void "TODO")) + + (define (handle-thd-msg) + (match (thread-recieve) + [(cons 'new-task ts) + (void "TODO")] + [(cons 'cancel-task ts) (void "TODO")] + ;; got completion report from agent + [(cons 'agent-report (cons assignment-id state)) + (match state + ['complete (error "TODO")] + [success-input (error "TODO")])] + ['shutdown (set! run-agent-thd? #f)])) + + ;; TODO : agent rpc to cancel current task, with error handling + + (let loop () + ;; handle events + (define thd-evt (thread-receive-evt)) + ; (define task-timeout + ; (if task-start-time + ; (- TASK-TIMEOUT (- (current-seconds) task-start-time)) + ; #f)) + (define task-timeout #f) + (match (sync/timeout task-timeout thd-evt) + [#f (void "TODO: task timeout")] + [(== thd-evt) (handle-thd-msg)]) + (when run-agent-thd? + (loop))) + + ;; TODO : cleanup, cancel current task, shut down running tasks + (void)) + + ;; id to task-state (define current-tasks (make-hash)) - ;; agent state - (struct state [task-id pattern time-start errors] #:transparent #:mutable) - ;; hash of agents and states + ;; hash of agents to handler thread (define agents (make-hash)) (define (handle-thd-msg) (match (thread-receive) - [(cons 'new-agent (cons id resources)) (error "TODO")] - [(cons 'delete-agent id) (error "TODO")] - [(cons 'new-task (cons id manifest)) (error "TODO")] - [(cons 'cancel-task id) (error "TODO")] + [(cons 'new-agent (cons id resources)) + (parameterize ([current-custodian cust]) + (hash-set! agents id (thread (lambda () (agent-thd id resources)))))] + [(cons 'delete-agent id) + (thread-send (hash-ref agents id) 'shutdown #f) + (hash-remove! agents id)] + [(cons 'new-task (cons id manifest)) + (define ts (initialize-task! id manifest)) + (hash-set! current-tasks id ts) + ;; notify agents + (for ([(id thd) (in-hash agents)]) + (thread-send thd (cons 'new-task ts) #f))] + [(cons 'agent-report (cons agent-id (cons assignment-id state))) + (thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)] + [(cons 'cancel-task id) + (hash-remove! current-tasks id) + ;; notify agents + (for ([(id thd) (in-hash agents)]) + (thread-send thd (cons 'cancel-task ts) #f))] [_ (error "unknown agent handler message")])) (let loop () @@ -292,6 +423,17 @@ (define (make-agent-handler) (thread agent-handler)) +(define (current-agent-handler (make-parameter #f))) + +;; agent rpcs + +;; report state 'complete or a list of integer representing a success result +(define-rpc server (agent-report-state assignment-id state) + (enforce-subject 'agent) + (define agent-id (node-id (current-from-node))) + (thread-send (current-agent-handler) + (cons 'agent-report (cons agent-id (cons assignment-id state))))) + ;; command line usage (module+ main