diff --git a/private/taskq.rkt b/private/taskq.rkt index e1beed0..883e4bb 100644 --- a/private/taskq.rkt +++ b/private/taskq.rkt @@ -2,24 +2,177 @@ (require db crypto racket/fasl) -(provide make-taskq taskq-close) - -(struct taskq [dbname db stmts waiters lock] #:transparent) +(provide make-taskq taskq-shutdown taskq-enqueue taskq-dequeue taskq-complete taskq-resched) +;; task states (define STATE-QUEUED 0) (define STATE-EXECUTING 1) (define STATE-DONE 2) -(define TASKQ-INIT "create table taskq (id blob(16) primary key, task blob, state integer)") -(define (init-stmts c) - (hash - 'delete-by-state (prepare c "delete from taskq where state=?") - 'update-state-by-state (prepare c "update taskq set state=? where state=?") - 'create (prepare c "insert into taskq (id, task, state) values (?, ?, ?)") - 'update-state (prepare c "update taskq set state=? where id=? and state=?; select changes()") - 'delete-by-id (prepare c "delete from taskq where id=?") - 'get-first-by-state (prepare c "select * from taskq where state=? limit 1"))) +;; message struct for task queue +(struct qmsg [from type data] #:transparent) +;; sql to initialize db +(define TASKQ-INIT "create table taskq (id blob primary key, task blob, notbefore int, state integer, completed int)") +;; runs the task queue +(define (taskq-service c init-thread) + ;; define some prepared statements + (define delete-all-done (bind-prepared-statement + (prepare c "delete from taskq where state=?") + (list STATE-DONE))) + (define delete-gc (prepare c "delete from taskq where state=? and completed<=?")) + (define reset-executing (bind-prepared-statement + (prepare c "update taskq set state=? where state=?") + (list STATE-QUEUED STATE-EXECUTING))) + (define create (prepare c "insert into taskq (id, task, notbefore, state, completed) values (?, ?, ?, ?, ?)")) + (define update-state (prepare c "update taskq set state=? where id=?")) + (define update-state-completed (prepare c "update taskq set state=?, completed=? where id=?")) + (define update-notbefore (prepare c "update taskq set notbefore=?, state=? where id=?")) + (define delete-by-id (prepare c "delete from taskq where id=?")) + (define get-by-id (prepare c "select * from taskq where id=?")) + (define get-next (prepare c "select * from taskq where state=? and notbefore<=? limit 1")) + (define earliest-wakeup + (bind-prepared-statement + (prepare c "select notbefore from taskq where state=? order by notbefore asc limit 1") + (list STATE-QUEUED))) + ;; cleanup: if we crash and come back up, there might be done tasks and executing tasks + ;; delete the done tasks, we don't have waiters for those anymore and any code that creates + ;; waiters should check for the result it wanted and re-queue if needed + ;; then, move executing tasks (which must have been interrupted by the crash) back to + ;; queued so we can run them again + (query-exec c delete-all-done) + (query-exec c reset-executing) + + ;; done with init + (thread-send init-thread #t #f) + (set! init-thread #f) + + ;; waiters for a given task + (define task-waiters (make-hash)) + ;; workers that are waiting for work + (define dequeue-waiters (mutable-set)) + + ;; adds a waiter for a task with given id + (define (add-task-waiter! id from) + (hash-update! task-waiters id (lambda (s) (set-add s from)) set)) + + ;; invokes and removes all waiters for a given id + (define (invoke-task-waiters! id) + (for ([w (in-set (hash-ref task-waiters id set))]) + (thread-send w id #f)) + (hash-remove! task-waiters id)) + + ;; enqueues data. wait? specifies whether the queuer should get a message back immediately with + ;; the task id, or only upon completion + (define (enqueue from data wait?) + (match-define (list id-tmp task notbefore) data) + (define id (or id-tmp (crypto-random-bytes 16))) + (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) + (match (query-maybe-row c get-by-id id) + [#f + (query-exec c create id (s-exp->fasl task) notbefore STATE-QUEUED 0) + (if wait? + (add-task-waiter! id from) + (thread-send from id #f))] + [(vector id task notbefore state completed) + (if (and wait? (not (= state STATE-DONE))) + (add-task-waiter! id from) + (thread-send from id #f))])) + (try-dequeue)) + + ;; adds from to the dequeue list, attempts immediate dequeue if possible + (define (dequeue from) + (set-add! dequeue-waiters from) + (try-dequeue)) + + ;; errors all current waiters and shuts down the queue + (define (shutdown) + (for ([w (in-set dequeue-waiters)]) + (thread-send w #f #f)) + (for ([(k v) (in-hash task-waiters)]) + (thread-send v (error "queue shutdown") #f))) + + ;; attempts to dequeue the next work + ;; if there is work, set it to executing and invoke and remove one of the workers + ;; returns #t if there might be more work available, #f if there is guaranteed no work right now + (define (try-dequeue) + (if (set-empty? dequeue-waiters) + #f + (let ([w (set-first dequeue-waiters)]) + (with-handlers ([exn:fail:sql? (lambda (e) (displayln e) #f)]) + (match (query-maybe-row c get-next STATE-QUEUED (current-seconds)) + [#f #f] + [(vector id task notbefore state completed) + (query-exec c update-state STATE-EXECUTING id) + (set-remove! dequeue-waiters w) + (thread-send w (list id (fasl->s-exp task)) #f) + #t]))))) + + ;; marks a task as completed. invokes and removes any waiters + (define (complete from id) + (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) + (query-exec c update-state-completed STATE-DONE (current-seconds) id) + (invoke-task-waiters! id) + (thread-send from #t #f))) + + ;; reschedules a task for a later time + (define (resched from data) + (match-define (list id notbefore) data) + (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) + (query-exec c update-notbefore notbefore STATE-QUEUED id) + (thread-send from #t #f))) + + ;; collects old done tasks + (define (collect-garbage now) + (with-handlers ([exn:fail:sql? (lambda (e) (displayln e))]) + (query-exec c delete-gc STATE-DONE (- now 300)))) + + ;; when was collect-garbage run last + (define last-garbage (box (current-seconds))) + + ;; main task queue loop + (let loop () + (define now (current-seconds)) + ;; collect garbage if needed + (when (> now (+ (unbox last-garbage) 300)) + (collect-garbage) + (set-box! last-garbage now)) + ;; get the next wakeup time + (define wakeup (with-handlers ([exn:fail:sql? (lambda (e) 0)]) + (or (query-maybe-value c earliest-wakeup) +inf.0))) + ;; get thread mail event + (define recv-evt (thread-receive-evt)) + ;; create a wakeup event if wakeup is needed sometime + (define wakeup-evt (if (>= wakeup now) + (alarm-evt (* 1000 wakeup)) + always-evt)) + ;; wait for either a message or a wakeup + ;; if both events are ready, one will be pseudorandomly chosen so it should be fair + (define sync-result (sync recv-evt wakeup-evt)) + (cond + [(equal? sync-result wakeup-evt) + ;; run dequeue + (let dequeue-loop () + (when (try-dequeue) + (dequeue-loop))) + (loop)] + [else + ;; handle message + (match-define (qmsg from type data) (thread-receive)) + (match type + ['shutdown (shutdown)] + ['enqueue (enqueue from data #f) (loop)] + ['enqueue-wait (enqueue from data #t) (loop)] + ['dequeue (dequeue from) (loop)] + ['complete (complete from data) (loop)] + ['resched (resched from data) (loop)] + [_ (thread-send from (error "unknown message") #f) (loop)])])) + + ;; disconnect and exit + (disconnect c) + (void)) + +;; creates a new taskq with db at given path, and optionally initializes it (define (make-taskq path [initialize? #f]) (define c (if initialize? @@ -27,84 +180,42 @@ (sqlite3-connect #:database path))) (when initialize? (query-exec c TASKQ-INIT)) - (define stmts (init-stmts c)) - ;; cleanup: if we crash and come back up, there might be done tasks and executing tasks - ;; delete the done tasks, we don't have waiters for those anymore and any code that creates - ;; waiters should check for the result it wanted and re-queue if needed - ;; then, move executing tasks (which must have been interrupted by the crash) back to - ;; queued so we can run them again - (query-exec c (hash-ref stmts 'delete-by-state) STATE-DONE) - (query-exec c (hash-ref stmts 'update-state-by-state) STATE-QUEUED STATE-EXECUTING) - (taskq path c stmts (make-hash) (make-semaphore 1))) + (define t (current-thread)) + (define q (thread (lambda () (taskq-service c t)))) + (thread-receive) + q) -(define (taskq-close q) - (disconnect (taskq-db q))) +;; shuts down the task queue +(define (taskq-shutdown q) + (thread-send q (qmsg (current-thread) 'shutdown #f)) + (thread-wait q)) -(define (update-waiters! q id) - ;; ok basically fuck racket and its fucking "caveats concerning concurrent modification" - ;; preemptive green threads are why we can't have nice things - ;; it turns out in practice actually this probably won't be an issue ever but Just To Be Safe - ;; we use a semaphore as a mutex protecting writes on taskq-waiters - (call-with-semaphore - (taskq-lock q) - (lambda () - (hash-ref! (taskq-waiters q) id (make-semaphore))))) +;; if the next mail is an exn, raise it, otherwise return the value +(define (receive-check) + (match (thread-receive) + [(? exn? e) (raise e)] + [result result])) -(define (taskq-queue q what [num-retries 8] [retry-delay-secs 1/4]) - (define id (crypto-random-bytes 16)) - (with-handlers ([exn:fail:sql? (lambda (e) - (cond - [(zero? num-retries) (raise e)] - [else (sleep retry-delay-secs) - (taskq-queue q what (sub1 num-retries) - retry-delay-secs)]))]) - (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'create) - id (s-exp->fasl what) STATE-QUEUED)) - (define sema (update-waiters! q id)) - (semaphore-wait sema)) +;; enqueues a task +(define (taskq-enqueue q task [id #f] [notbefore 0] [wait? #t]) + (thread-send q (qmsg (current-thread) (if wait? 'enqueue-wait 'enqueue) + (list id task notbefore))) + (receive-check)) -;; blocks until the next task is available, then calls executor -(define (taskq-execute-next q executor) - ;; this is a bad hack - ;; use filesystem-change-evt to detect writes to the database - ;; initialize the event first before query to avoid deadlock - (define evt (filesystem-change-evt (taskq-dbname q))) - (match (query-maybe-row (taskq-db q) (hash-ref (taskq-stmts q) 'get-first-by-state) STATE-QUEUED) - [#f (sync evt) (taskq-execute-next q executor)] - [(vector id task state) - (filesystem-change-evt-cancel evt) - ;; update state, match expected existing state so that on concurrent modification, the query - ;; fails and we retry with the next available row - (with-handlers ([exn:fail:sql? (lambda (_) (taskq-execute-next q executor))]) - (when (zero? - (query-value (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) - STATE-EXECUTING id state)) - (taskq-execute-next q executor))) - ;; now we have a unique lock to execute this row - (define (cleanup) - (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) - STATE-QUEUED id STATE-EXECUTING)) - (with-handlers ([exn? (lambda (e) (cleanup) (raise e))]) - (executor (fasl->s-exp task))) - (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) - STATE-DONE id STATE-EXECUTING) - (define sema (hash-ref (taskq-waiters q) id #f)) - (unless (false? sema) - (semaphore-post sema)) - (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'delete-by-id) id)])) +;; dequeues a task +;; returns the next task (now marked as executing) or #f +(define (taskq-dequeue q) + (thread-send q (qmsg (current-thread) 'dequeue #f)) + (receive-check)) -(define q (make-taskq "/tmp/s.sqlite3" #t)) +;; marks a task complete +(define (taskq-complete q id) + (thread-send q (qmsg (current-thread) 'complete id)) + (receive-check)) -(define (producer [n 0]) - (printf "producing ~a\n" n) - (taskq-queue q n) - (sleep 1) - (producer (add1 n))) - -(define (consumer) - (taskq-execute-next q (lambda (val) (printf "consuming ~a\n" val) - (sleep 2) (printf "done consuming\n"))) - (consumer)) - -(thread producer) -(consumer) +;; reschedules a task for later +;; cannot be combined with taskq-complete: workers must call exactly one or the other, then call +;; taskq-dequeue for the next assignment +(define (taskq-resched q id notbefore) + (thread-send q (qmsg (current-thread) 'resched (list id notbefore))) + (receive-check))