diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 995943d..ee53527 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -240,6 +240,8 @@ from (or (= data (node-id my-node)) (hash-has-key? peer-registry data) #f))] ['get-node-info (thread-send from (hash-ref node-registry data #f) #f)] ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)] + ;; also call destroy-channel + ['raw-delete-node (hash-remove! node-registry data) (thread-send from (void) #f)] ['register-channel (match-define (cons peer-id thd) data) (when (hash-has-key? peer-registry peer-id) @@ -319,6 +321,10 @@ (define (comms-get-node-info comms id) (thread-sendrecv comms 'get-node-info id)) +(define (comms-delete-node comms id) + (thread-sendrecv comms 'destroy-channel id) + (thread-sendrecv comms 'raw-delete-node id)) + (define (comms-local-channel comms) (thread-sendrecv comms 'local-channel (void))) @@ -344,7 +350,7 @@ (comms-dispatch-msg comms to-id msg))) (provide make-comms comms-listen comms-connect comms-get-node-info comms-set-node-info - comms-channel-available?) + comms-delete-node comms-channel-available?) ;; transactional messages support diff --git a/crossfire/migrations/20201113-add-initial-tables.sql b/crossfire/migrations/20201113-add-initial-tables.sql index e66d79a..41db506 100644 --- a/crossfire/migrations/20201113-add-initial-tables.sql +++ b/crossfire/migrations/20201113-add-initial-tables.sql @@ -14,7 +14,8 @@ create table node_resource(nodeid integer not null, resource text not null, -- } -- @up { create table task(id integer primary key, name text not null, manifest blob not null, - committed boolean not null); + committed boolean not null default false, + complete boolean not null default false); -- } -- @up { create table task_log(taskid integer not null, worker integer not null, diff --git a/crossfire/pattern.rkt b/crossfire/pattern.rkt index 256aa9f..2e79a93 100644 --- a/crossfire/pattern.rkt +++ b/crossfire/pattern.rkt @@ -17,13 +17,17 @@ ;; along with this program. If not, see . (require (only-in data/integer-set make-integer-set integer-set-contents [union integer-set-union] - [count integer-set-count]) + [count integer-set-count] [intersect integer-set-intersect] + [subtract integer-set-subtract]) racket/list racket/match racket/vector) -(provide pos->integer-set-pos - char->integer-set string->integer-set range->integer-set +(provide char->integer-set string->integer-set range->integer-set + ;; re-export renamed integer-set accessors + make-integer-set integer-set-contents integer-set-count integer-set-union + integer-set-intersect integer-set-subtract builtin-isets - pattern-count pos->pattern-pos resolve-pattern-pos) + pattern-count pos->pattern-pos resolve-pattern-pos + pattern-range-take) ;; pattern processing ;; NOTE: data/integer-set WFS intervals are INCLUSIVE @@ -101,3 +105,18 @@ (define (resolve-pattern-pos pattern pp) (for/vector ([iset (in-vector pattern)] [pos (in-vector pp)]) (pos->integer-set-pos iset pos))) + +;; utils for pattern ranges [0, pattern-count) mapped to patterns with the above functions +;; a pattern range is represented by an integer-set, the wfs item boundaries would get converted +;; into resolved pattern pos, etc + +;; takes up to n elements from the pattern range +;; returns the subrange and pr minus subrange +(define (pattern-range-take pr n) + (cond + [(>= n (integer-set-count pr)) (values pr (make-integer-set '()))] + [else + (match-define (cons _ bound) (pos->integer-set-pos pr (sub1 n))) + (define subrange (integer-set-intersect (make-integer-set `((0 . ,bound))) pr)) + (define new-pr (integer-set-subtract pr subrange)) + (values subrange new-pr)])) diff --git a/crossfire/protocol.rkt b/crossfire/protocol.rkt index bb0a86b..a4ea94e 100644 --- a/crossfire/protocol.rkt +++ b/crossfire/protocol.rkt @@ -17,8 +17,8 @@ ;; along with this program. If not, see . (require db/base db/sqlite3 - racket/bool racket/fasl racket/file racket/match racket/runtime-path racket/path - racket/set racket/string + data/queue racket/bool racket/fasl racket/file racket/match + racket/path racket/runtime-path racket/set racket/string north/base north/adapter/base north/adapter/sqlite "comms.rkt" "not-crypto.rkt" ;; port-fsync @@ -77,14 +77,15 @@ where node.type = ?") (define-stmt q-get-node-resources "select resource from node_resource where nodeid=?") (define-stmt q-edit-node "update node set name=? where id=?") +(define-stmt q-delete-node "delete from node where id=?") (define-stmt q-get-node-type "select type from node where id=?") (define-stmt q-get-node-info "select name, arch, type, secret from node where id=?") -(define-stmt q-new-task "insert into task (name, manifest, committed) values (?, ?, 0)") +(define-stmt q-new-task "insert into task (name, manifest) values (?, ?)") (define-stmt q-get-task-id-commit "select id, committed from task") (define-stmt q-set-task-commit "update task set committed=1 where id=?") (define-stmt q-delete-task "delete from task where id=?") -(define-stmt q-get-tasks "select id, name, manifest from task") +(define-stmt q-get-tasks "select id, name, manifest, complete from task") ;; utils @@ -134,7 +135,7 @@ (define-rpc-type server) (struct node-info [id name arch type resources online?] #:prefab) -(struct project-info [id name manifest] #:prefab) +(struct project-info [id name manifest complete?] #:prefab) (define (get-nodes type) (define type-str (symbol->string type)) @@ -208,7 +209,9 @@ (for ([res (in-set (set-subtract new-resource existing-resource))]) (query-exec (current-db) q-add-node-res id res)) (for ([res (in-set (set-subtract existing-resource new-resource))]) - (query-exec (current-db) q-del-node-res id res))))) + (query-exec (current-db) q-del-node-res id res)))) + (define comms-node (comms-get-node-info (current-comms) id)) + (comms-set-node-info (current-comms) (struct-copy node comms-node [name name]))) (define-rpc server (get-agent-deployment id) (enforce-subject 'client) @@ -218,16 +221,25 @@ arch (current-server-public-node))] [_ (error "invalid id or wrong node type")])) +(define-rpc server (delete-agent id) + (enforce-subject 'client) + (call-with-transaction (current-db) (lambda () + (enforce-object id 'agent) + (query-exec (current-db) q-delete-node id))) + (comms-delete-node (current-comms) id)) + ;; client rpcs :: projects (define-rpc server (new-project name manifest tar) (enforce-subject 'client) - (make-task name manifest tar)) + (define id (make-task name manifest tar)) + (void id) + id) (define-rpc server (get-projects) (enforce-subject 'client) - (for/list ([(id name manifest) (in-query (current-db) q-get-tasks)]) - (project-info id name (fasl->s-exp manifest)))) + (for/list ([(id name manifest complete?) (in-query (current-db) q-get-tasks)]) + (project-info id name (fasl->s-exp manifest) complete?))) ;; agent rpcs @@ -236,6 +248,51 @@ (error "TODO")) +;; agent handling +;; distributing subtasks + +;; minimum batch of inputs that will be scheduled for a node +;; the batch size scales to how fast the node completes a batch +(define MIN-SUBTASK-SIZE 10) +;; aim to batch every 5 minutes +(define OPTIMAL-COMPLETION-SECS 300) + +(define (agent-handler) + ;; unlike comms, messages to agent-handler have no responses. just thread-send, it's one-way + (define cust (make-custodian)) + (define this-thread (current-thread)) + ;; make an auto cleanup thread :P + (thread (lambda () (thread-wait this-thread) (custodian-shutdown-all cust))) + + ;; tasks + ;; pending task queue + (define task-queue (make-queue)) + ;; id to manifest + (define current-tasks (make-hash)) + + ;; agent state + (struct state [task-id pattern time-start errors] #:transparent #:mutable) + ;; hash of agents and states + (define agents (make-hash)) + + (define (handle-thd-msg) + (match (thread-receive) + [(cons 'new-agent (cons id resources)) (error "TODO")] + [(cons 'delete-agent id) (error "TODO")] + [(cons 'new-task (cons id manifest)) (error "TODO")] + [(cons 'cancel-task id) (error "TODO")] + [_ (error "unknown agent handler message")])) + + (let loop () + (define thd-evt (thread-receive-evt)) + (match (sync thd-evt) + [(== thd-evt) (handle-thd-msg)]) + (loop))) + +(define (make-agent-handler) + (thread agent-handler)) + + ;; command line usage (module+ main (require racket/cmdline)