wip implementation of task distribution
@ -18,7 +18,7 @@
(require db/base db/sqlite3
data/queue racket/bool racket/fasl racket/file racket/match
racket/path racket/runtime-path racket/set racket/string
racket/path racket/random racket/runtime-path racket/set racket/string
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "not-crypto.rkt"
;; port-fsync
@ -87,6 +87,11 @@
(define-stmt q-delete-task "delete from task where id=?")
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
(define-stmt q-get-task-log
"select worker, time_start, time_end, pattern from task_log where taskid=?")
(define-stmt q-add-task-log
"insert into task_log (taskid, worker, time_start, time_end, pattern) values (?, ?, ?, ?, ?)")
;; utils
(define (query/insert-id db stmt . args)
@ -241,21 +246,18 @@
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?)))
;; agent rpcs
(define-rpc server (agent-report something)
(enforce-subject 'agent)
(error "TODO"))
;; agent handling
;; distributing subtasks
;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch
(define MIN-SUBTASK-SIZE 10)
(define MIN-SUBTASK-SIZE 16)
;; aim to batch every 5 minutes
;; if a task takes shorter, the amount will be doubled, and if it takes longer it will be halved
;; tasks will be reassigned if not completed within this time
(define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
@ -265,22 +267,151 @@
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; tasks
;; pending task queue
(define task-queue (make-queue))
;; id to manifest
;; semaphore guarding mutation (sigh)
;; manifest: task manifest
;; work-pattern: the integer-set representing the work left to do
;; agent-todo: hash of agent id to integer-set representing work the agent is working on
(struct task-state [id sema manifest [work-pattern #:mutable] agent-todo] #:transparent)
(define (initialize-task! id manifest)
(define sema (make-semaphore 1))
(define agent-todo (make-hash))
;; TODO : manifest processing. 100 hardcoded here u___u
(define init-pattern-range (range->integer-set 0 100))
;; subtract the pattern ranges that were already logged as complete
(define pattern-range
(for/fold ([pattern-range init-pattern-range])
([(_ _ _ pat-fasl) (in-query (current-db) q-get-task-log id)])
(define sub (make-integer-set (fasl->s-exp pat-fasl)))
(integer-set-subtract pattern-range sub)))
(task-state id sema manifest pattern-range agent-todo))
;; this doesn't update the database - that only gets updated when the work is complete
(define (task-assign! ts agent-id requested-amount)
(unless (positive? requested-amount) (error "requested amount must be positive"))
(call-with-semaphore (task-state-sema ts) (lambda ()
(when (hash-has-key? (task-state-agent-todo) agent-id)
(error "agent already has work assigned"))
(define-values [assignment new-wp]
(pattern-range-take (task-state-work-pattern ts) requested-amount))
;; done! (maybe)
;; check other agents work
[(zero? (integer-set-count assignment)) #f]
;; update tracking
(hash-set! (task-state-agent-todo agent-id assignment))
(set-task-state-work-pattern! ts new-wp)
;; returns work from agent back to the regular work pool
(define (task-unassign! ts agent-id)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
(define new-wp (integer-set-union assignment (task-state-work-pattern ts)))
(set-task-state-work-pattern! ts new-wp)
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (task-complete! ts agent-id time-start time-end)
(call-with-semaphore (task-state-sema ts) (lambda ()
(match (hash-ref (task-state-agent-todo ts) agent-id #f)
[#f (void)]
;; add a new work log
(query-exec (current-db) q-add-task-log (task-state-id ts) agent-id time-start time-end
(s-exp->fasl (integer-set-contents assignment)))
;; remove tracking - this work is now done
(hash-remove! (task-state-agent-todo ts) agent-id)]))))
(define (agent-thd id resources-in)
;; helper to generate assignment ids (a kind of arbitrary number we pass to agents to track
;; what they're currently working on)
(define make-assignment-id
(let ([counter (random 0 1000)])
(lambda () (begin0 counter (set! counter (add1 counter))))))
;; set of resources
(define resources (list->set resources-in))
(set! resources-in #f)
;; tracks the sizes of task pieces given to the agent
(define task-size (make-hash))
;; current tasks, mirrors parent but only contains tasks we have resources for
(define current-tasks (make-hash))
;; assignments
(struct assignment [id taskid assign-data start-time handler] #:transparent)
;; active assignments, by assignment id
(define assigned-tasks (make-hash))
;; keep running?
(define run-agent-thd? #t)
(define (send-assignment assignment)
(void "TODO"))
(define (cancel-assignment assignment)
(void "TODO"))
(define (handle-thd-msg)
(match (thread-recieve)
[(cons 'new-task ts)
(void "TODO")]
[(cons 'cancel-task ts) (void "TODO")]
;; got completion report from agent
[(cons 'agent-report (cons assignment-id state))
(match state
['complete (error "TODO")]
[success-input (error "TODO")])]
['shutdown (set! run-agent-thd? #f)]))
;; TODO : agent rpc to cancel current task, with error handling
(let loop ()
;; handle events
(define thd-evt (thread-receive-evt))
; (define task-timeout
; (if task-start-time
; (- TASK-TIMEOUT (- (current-seconds) task-start-time))
; #f))
(define task-timeout #f)
(match (sync/timeout task-timeout thd-evt)
[#f (void "TODO: task timeout")]
[(== thd-evt) (handle-thd-msg)])
(when run-agent-thd?
;; TODO : cleanup, cancel current task, shut down running tasks
;; id to task-state
(define current-tasks (make-hash))
;; agent state
(struct state [task-id pattern time-start errors] #:transparent #:mutable)
;; hash of agents and states
;; hash of agents to handler thread
(define agents (make-hash))
(define (handle-thd-msg)
(match (thread-receive)
[(cons 'new-agent (cons id resources)) (error "TODO")]
[(cons 'delete-agent id) (error "TODO")]
[(cons 'new-task (cons id manifest)) (error "TODO")]
[(cons 'cancel-task id) (error "TODO")]
[(cons 'new-agent (cons id resources))
(parameterize ([current-custodian cust])
(hash-set! agents id (thread (lambda () (agent-thd id resources)))))]
[(cons 'delete-agent id)
(thread-send (hash-ref agents id) 'shutdown #f)
(hash-remove! agents id)]
[(cons 'new-task (cons id manifest))
(define ts (initialize-task! id manifest))
(hash-set! current-tasks id ts)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'new-task ts) #f))]
[(cons 'agent-report (cons agent-id (cons assignment-id state)))
(thread-send (hash-ref agents agent-id) (cons 'agent-report (cons assignment-id state)) #f)]
[(cons 'cancel-task id)
(hash-remove! current-tasks id)
;; notify agents
(for ([(id thd) (in-hash agents)])
(thread-send thd (cons 'cancel-task ts) #f))]
[_ (error "unknown agent handler message")]))
(let loop ()
@ -292,6 +423,17 @@
(define (make-agent-handler)
(thread agent-handler))
(define (current-agent-handler (make-parameter #f)))
;; agent rpcs
;; report state 'complete or a list of integer representing a success result
(define-rpc server (agent-report-state assignment-id state)
(enforce-subject 'agent)
(define agent-id (node-id (current-from-node)))
(thread-send (current-agent-handler)
(cons 'agent-report (cons agent-id (cons assignment-id state)))))
;; command line usage
(module+ main
