#lang racket ;; taskq: ;; infrastructure for a persistent task queue that guarantees each entry will execute to completion ;; at least once. entries are responsible for implementing idempotency in the case of interrupted ;; operations (due to a crash, for example) (require db crypto racket/fasl) (provide make-taskq taskq-shutdown taskq-enqueue taskq-dequeue taskq-complete taskq-resched) ;; task states (define STATE-QUEUED 0) (define STATE-EXECUTING 1) (define STATE-DONE 2) ;; message struct for task queue (struct qmsg [from type data] #:transparent) ;; sql to initialize db (define TASKQ-INIT "create table taskq (id blob primary key, task blob, notbefore int, state integer, completed int)") ;; runs the task queue (define (taskq-service c init-thread) ;; define some prepared statements (define delete-all-done (bind-prepared-statement (prepare c "delete from taskq where state=?") (list STATE-DONE))) (define delete-gc (prepare c "delete from taskq where state=? and completed<=?")) (define reset-executing (bind-prepared-statement (prepare c "update taskq set state=? where state=?") (list STATE-QUEUED STATE-EXECUTING))) (define create (prepare c "insert into taskq (id, task, notbefore, state, completed) values (?, ?, ?, ?, ?)")) (define update-state (prepare c "update taskq set state=? where id=?")) (define update-state-completed (prepare c "update taskq set state=?, completed=? where id=?")) (define update-notbefore (prepare c "update taskq set notbefore=?, state=? where id=?")) (define delete-by-id (prepare c "delete from taskq where id=?")) (define get-by-id (prepare c "select * from taskq where id=?")) (define get-next (prepare c "select * from taskq where state=? and notbefore<=? limit 1")) (define earliest-wakeup (bind-prepared-statement (prepare c "select notbefore from taskq where state=? order by notbefore asc limit 1") (list STATE-QUEUED))) ;; cleanup: if we crash and come back up, there might be done tasks and executing tasks ;; delete the done tasks, we don't have waiters for those anymore and any code that creates ;; waiters should check for the result it wanted and re-queue if needed ;; then, move executing tasks (which must have been interrupted by the crash) back to ;; queued so we can run them again (query-exec c delete-all-done) (query-exec c reset-executing) ;; done with init (thread-send init-thread #t #f) (set! init-thread #f) ;; waiters for a given task (define task-waiters (make-hash)) ;; workers that are waiting for work (define dequeue-waiters (mutable-set)) ;; create an exn:fail but don't raise it (define (make-error str) (exn:fail (format "error: ~a" str) (current-continuation-marks))) ;; adds a waiter for a task with given id (define (add-task-waiter! id from) (hash-update! task-waiters id (lambda (s) (set-add s from)) set)) ;; invokes and removes all waiters for a given id (define (invoke-task-waiters! id) (for ([w (in-set (hash-ref task-waiters id set))]) (thread-send w id #f)) (hash-remove! task-waiters id)) ;; enqueues data. wait? specifies whether the queuer should get a message back immediately with ;; the task id, or only upon completion (define (enqueue from data wait?) (match-define (list id-tmp task notbefore) data) (define id (or id-tmp (crypto-random-bytes 16))) (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) (match (query-maybe-row c get-by-id id) [#f (query-exec c create id (s-exp->fasl task) notbefore STATE-QUEUED 0) (if wait? (add-task-waiter! id from) (thread-send from id #f))] [(vector id task notbefore state completed) (if (and wait? (not (= state STATE-DONE))) (add-task-waiter! id from) (thread-send from id #f))])) (try-dequeue)) ;; adds from to the dequeue list, attempts immediate dequeue if possible (define (dequeue from) (set-add! dequeue-waiters from) (try-dequeue)) ;; errors all current waiters and shuts down the queue (define (shutdown) (for ([w (in-set dequeue-waiters)]) (thread-send w #f #f)) (for ([(k v) (in-hash task-waiters)]) (for ([w (in-set v)]) (thread-send w (make-error "queue shutdown") #f)))) ;; attempts to dequeue the next work ;; if there is work, set it to executing and invoke and remove one of the workers ;; returns #t if there might be more work available, #f if there is guaranteed no work right now (define (try-dequeue) (if (set-empty? dequeue-waiters) #f (let ([w (set-first dequeue-waiters)]) (with-handlers ([exn:fail:sql? (lambda (e) (displayln e) #f)]) (match (query-maybe-row c get-next STATE-QUEUED (current-seconds)) [#f #f] [(vector id task notbefore state completed) (query-exec c update-state STATE-EXECUTING id) (set-remove! dequeue-waiters w) (thread-send w (list id (fasl->s-exp task)) #f) #t]))))) ;; marks a task as completed. invokes and removes any waiters (define (complete from id) (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) (query-exec c update-state-completed STATE-DONE (current-seconds) id) (invoke-task-waiters! id) (thread-send from #t #f))) ;; reschedules a task for a later time (define (resched from data) (match-define (list id notbefore) data) (with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))]) (query-exec c update-notbefore notbefore STATE-QUEUED id) (thread-send from #t #f))) ;; collects old done tasks (define (collect-garbage now) (with-handlers ([exn:fail:sql? (lambda (e) (displayln e))]) (query-exec c delete-gc STATE-DONE (- now 300)))) ;; when was collect-garbage run last (define last-garbage (box (current-seconds))) ;; main task queue loop (let loop () (define now (current-seconds)) ;; collect garbage if needed (when (> now (+ (unbox last-garbage) 300)) (collect-garbage) (set-box! last-garbage now)) ;; get the next wakeup time (define wakeup (with-handlers ([exn:fail:sql? (lambda (e) 0)]) (or (query-maybe-value c earliest-wakeup) +inf.0))) ;; get thread mail event (define recv-evt (thread-receive-evt)) ;; create a wakeup event if wakeup is needed sometime (define wakeup-evt (if (>= wakeup now) (alarm-evt (* 1000 wakeup)) always-evt)) ;; wait for either a message or a wakeup ;; if both events are ready, one will be pseudorandomly chosen so it should be fair (define sync-result (sync recv-evt wakeup-evt)) (cond [(equal? sync-result wakeup-evt) ;; run dequeue (let dequeue-loop () (when (try-dequeue) (dequeue-loop))) (loop)] [else ;; handle message (match-define (qmsg from type data) (thread-receive)) (match type ['shutdown (shutdown)] ['enqueue (enqueue from data #f) (loop)] ['enqueue-wait (enqueue from data #t) (loop)] ['dequeue (dequeue from) (loop)] ['complete (complete from data) (loop)] ['resched (resched from data) (loop)] [_ (thread-send from (make-error "unknown message") #f) (loop)])])) ;; disconnect and exit (disconnect c) (void)) ;; creates a new taskq with db at given path, and optionally initializes it (define (make-taskq path [initialize? #f]) (define c (if initialize? (sqlite3-connect #:database path #:mode 'create) (sqlite3-connect #:database path))) (when initialize? (query-exec c TASKQ-INIT)) (define t (current-thread)) (define q (thread (lambda () (taskq-service c t)))) (thread-receive) q) ;; shuts down the task queue (define (taskq-shutdown q) (thread-send q (qmsg (current-thread) 'shutdown #f)) (thread-wait q)) ;; if the next mail is an exn, raise it, otherwise return the value (define (receive-check) (match (thread-receive) [(? exn? e) (raise e)] [result result])) ;; enqueues a task (define (taskq-enqueue q task [id #f] [notbefore 0] [wait? #t]) (thread-send q (qmsg (current-thread) (if wait? 'enqueue-wait 'enqueue) (list id task notbefore))) (receive-check)) ;; dequeues a task ;; returns the next task (now marked as executing) or #f (define (taskq-dequeue q) (thread-send q (qmsg (current-thread) 'dequeue #f)) (receive-check)) ;; marks a task complete (define (taskq-complete q id) (thread-send q (qmsg (current-thread) 'complete id)) (receive-check)) ;; reschedules a task for later ;; cannot be combined with taskq-complete: workers must call exactly one or the other, then call ;; taskq-dequeue for the next assignment (define (taskq-resched q id notbefore) (thread-send q (qmsg (current-thread) 'resched (list id notbefore))) (receive-check))