add initial framework for distributing subtasks

This commit is contained in:
xenia 2020-11-17 02:12:13 -05:00
parent 2d45fe666b
commit 737230c433
4 changed files with 98 additions and 15 deletions

View File

@ -240,6 +240,8 @@
from (or (= data (node-id my-node)) (hash-has-key? peer-registry data) #f))]
['get-node-info (thread-send from (hash-ref node-registry data #f) #f)]
['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)]
;; also call destroy-channel
['raw-delete-node (hash-remove! node-registry data) (thread-send from (void) #f)]
['register-channel
(match-define (cons peer-id thd) data)
(when (hash-has-key? peer-registry peer-id)
@ -319,6 +321,10 @@
(define (comms-get-node-info comms id)
(thread-sendrecv comms 'get-node-info id))
(define (comms-delete-node comms id)
(thread-sendrecv comms 'destroy-channel id)
(thread-sendrecv comms 'raw-delete-node id))
(define (comms-local-channel comms)
(thread-sendrecv comms 'local-channel (void)))
@ -344,7 +350,7 @@
(comms-dispatch-msg comms to-id msg)))
(provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info
comms-channel-available?)
comms-delete-node comms-channel-available?)
;; transactional messages support

View File

@ -14,7 +14,8 @@ create table node_resource(nodeid integer not null, resource text not null,
-- }
-- @up {
create table task(id integer primary key, name text not null, manifest blob not null,
committed boolean not null);
committed boolean not null default false,
complete boolean not null default false);
-- }
-- @up {
create table task_log(taskid integer not null, worker integer not null,

View File

@ -17,13 +17,17 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require (only-in data/integer-set make-integer-set integer-set-contents [union integer-set-union]
[count integer-set-count])
[count integer-set-count] [intersect integer-set-intersect]
[subtract integer-set-subtract])
racket/list racket/match racket/vector)
(provide pos->integer-set-pos
char->integer-set string->integer-set range->integer-set
(provide char->integer-set string->integer-set range->integer-set
;; re-export renamed integer-set accessors
make-integer-set integer-set-contents integer-set-count integer-set-union
integer-set-intersect integer-set-subtract
builtin-isets
pattern-count pos->pattern-pos resolve-pattern-pos)
pattern-count pos->pattern-pos resolve-pattern-pos
pattern-range-take)
;; pattern processing
;; NOTE: data/integer-set WFS intervals are INCLUSIVE
@ -101,3 +105,18 @@
(define (resolve-pattern-pos pattern pp)
(for/vector ([iset (in-vector pattern)] [pos (in-vector pp)])
(pos->integer-set-pos iset pos)))
;; utils for pattern ranges [0, pattern-count) mapped to patterns with the above functions
;; a pattern range is represented by an integer-set, the wfs item boundaries would get converted
;; into resolved pattern pos, etc
;; takes up to n elements from the pattern range
;; returns the subrange and pr minus subrange
(define (pattern-range-take pr n)
(cond
[(>= n (integer-set-count pr)) (values pr (make-integer-set '()))]
[else
(match-define (cons _ bound) (pos->integer-set-pos pr (sub1 n)))
(define subrange (integer-set-intersect (make-integer-set `((0 . ,bound))) pr))
(define new-pr (integer-set-subtract pr subrange))
(values subrange new-pr)]))

View File

@ -17,8 +17,8 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
racket/bool racket/fasl racket/file racket/match racket/runtime-path racket/path
racket/set racket/string
data/queue racket/bool racket/fasl racket/file racket/match
racket/path racket/runtime-path racket/set racket/string
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "not-crypto.rkt"
;; port-fsync
@ -77,14 +77,15 @@
where node.type = ?")
(define-stmt q-get-node-resources "select resource from node_resource where nodeid=?")
(define-stmt q-edit-node "update node set name=? where id=?")
(define-stmt q-delete-node "delete from node where id=?")
(define-stmt q-get-node-type "select type from node where id=?")
(define-stmt q-get-node-info "select name, arch, type, secret from node where id=?")
(define-stmt q-new-task "insert into task (name, manifest, committed) values (?, ?, 0)")
(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)")
(define-stmt q-get-task-id-commit "select id, committed from task")
(define-stmt q-set-task-commit "update task set committed=1 where id=?")
(define-stmt q-delete-task "delete from task where id=?")
(define-stmt q-get-tasks "select id, name, manifest from task")
(define-stmt q-get-tasks "select id, name, manifest, complete from task")
;; utils
@ -134,7 +135,7 @@
(define-rpc-type server)
(struct node-info [id name arch type resources online?] #:prefab)
(struct project-info [id name manifest] #:prefab)
(struct project-info [id name manifest complete?] #:prefab)
(define (get-nodes type)
(define type-str (symbol->string type))
@ -208,7 +209,9 @@
(for ([res (in-set (set-subtract new-resource existing-resource))])
(query-exec (current-db) q-add-node-res id res))
(for ([res (in-set (set-subtract existing-resource new-resource))])
(query-exec (current-db) q-del-node-res id res)))))
(query-exec (current-db) q-del-node-res id res))))
(define comms-node (comms-get-node-info (current-comms) id))
(comms-set-node-info (current-comms) (struct-copy node comms-node [name name])))
(define-rpc server (get-agent-deployment id)
(enforce-subject 'client)
@ -218,16 +221,25 @@
arch (current-server-public-node))]
[_ (error "invalid id or wrong node type")]))
(define-rpc server (delete-agent id)
(enforce-subject 'client)
(call-with-transaction (current-db) (lambda ()
(enforce-object id 'agent)
(query-exec (current-db) q-delete-node id)))
(comms-delete-node (current-comms) id))
;; client rpcs :: projects
(define-rpc server (new-project name manifest tar)
(enforce-subject 'client)
(make-task name manifest tar))
(define id (make-task name manifest tar))
(void id)
id)
(define-rpc server (get-projects)
(enforce-subject 'client)
(for/list ([(id name manifest) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest))))
(for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)])
(project-info id name (fasl->s-exp manifest) complete?)))
;; agent rpcs
@ -236,6 +248,51 @@
(error "TODO"))
;; agent handling
;; distributing subtasks
;; minimum batch of inputs that will be scheduled for a node
;; the batch size scales to how fast the node completes a batch
(define MIN-SUBTASK-SIZE 10)
;; aim to batch every 5 minutes
(define OPTIMAL-COMPLETION-SECS 300)
(define (agent-handler)
;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way
(define cust (make-custodian))
(define this-thread (current-thread))
;; make an auto cleanup thread :P
(thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust)))
;; tasks
;; pending task queue
(define task-queue (make-queue))
;; id to manifest
(define current-tasks (make-hash))
;; agent state
(struct state [task-id pattern time-start errors] #:transparent #:mutable)
;; hash of agents and states
(define agents (make-hash))
(define (handle-thd-msg)
(match (thread-receive)
[(cons 'new-agent (cons id resources)) (error "TODO")]
[(cons 'delete-agent id) (error "TODO")]
[(cons 'new-task (cons id manifest)) (error "TODO")]
[(cons 'cancel-task id) (error "TODO")]
[_ (error "unknown agent handler message")]))
(let loop ()
(define thd-evt (thread-receive-evt))
(match (sync thd-evt)
[(== thd-evt) (handle-thd-msg)])
(loop)))
(define (make-agent-handler)
(thread agent-handler))
;; command line usage
(module+ main
(require racket/cmdline)