idk honestly
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
8.8 KiB

#lang racket
;; taskq:
;; infrastructure for a persistent task queue that guarantees each entry will execute to completion
;; at least once. entries are responsible for implementing idempotency in the case of interrupted
;; operations (due to a crash, for example)
(require db crypto racket/fasl)
(provide make-taskq taskq-shutdown taskq-enqueue taskq-dequeue taskq-complete taskq-resched)
;; task states
(define STATE-QUEUED 0)
(define STATE-DONE 2)
;; message struct for task queue
(struct qmsg [from type data] #:transparent)
;; sql to initialize db
(define TASKQ-INIT "create table taskq (id blob primary key, task blob, notbefore int, state integer, completed int)")
;; runs the task queue
(define (taskq-service c init-thread)
;; define some prepared statements
(define delete-all-done (bind-prepared-statement
(prepare c "delete from taskq where state=?")
(list STATE-DONE)))
(define delete-gc (prepare c "delete from taskq where state=? and completed<=?"))
(define reset-executing (bind-prepared-statement
(prepare c "update taskq set state=? where state=?")
(define create (prepare c "insert into taskq (id, task, notbefore, state, completed) values (?, ?, ?, ?, ?)"))
(define update-state (prepare c "update taskq set state=? where id=?"))
(define update-state-completed (prepare c "update taskq set state=?, completed=? where id=?"))
(define update-notbefore (prepare c "update taskq set notbefore=?, state=? where id=?"))
(define delete-by-id (prepare c "delete from taskq where id=?"))
(define get-by-id (prepare c "select * from taskq where id=?"))
(define get-next (prepare c "select * from taskq where state=? and notbefore<=? limit 1"))
(define earliest-wakeup
(prepare c "select notbefore from taskq where state=? order by notbefore asc limit 1")
;; cleanup: if we crash and come back up, there might be done tasks and executing tasks
;; delete the done tasks, we don't have waiters for those anymore and any code that creates
;; waiters should check for the result it wanted and re-queue if needed
;; then, move executing tasks (which must have been interrupted by the crash) back to
;; queued so we can run them again
(query-exec c delete-all-done)
(query-exec c reset-executing)
;; done with init
(thread-send init-thread #t #f)
(set! init-thread #f)
;; waiters for a given task
(define task-waiters (make-hash))
;; workers that are waiting for work
(define dequeue-waiters (mutable-set))
;; adds a waiter for a task with given id
(define (add-task-waiter! id from)
(hash-update! task-waiters id (lambda (s) (set-add s from)) set))
;; invokes and removes all waiters for a given id
(define (invoke-task-waiters! id)
(for ([w (in-set (hash-ref task-waiters id set))])
(thread-send w id #f))
(hash-remove! task-waiters id))
;; enqueues data. wait? specifies whether the queuer should get a message back immediately with
;; the task id, or only upon completion
(define (enqueue from data wait?)
(match-define (list id-tmp task notbefore) data)
(define id (or id-tmp (crypto-random-bytes 16)))
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
(match (query-maybe-row c get-by-id id)
(query-exec c create id (s-exp->fasl task) notbefore STATE-QUEUED 0)
(if wait?
(add-task-waiter! id from)
(thread-send from id #f))]
[(vector id task notbefore state completed)
(if (and wait? (not (= state STATE-DONE)))
(add-task-waiter! id from)
(thread-send from id #f))]))
;; adds from to the dequeue list, attempts immediate dequeue if possible
(define (dequeue from)
(set-add! dequeue-waiters from)
;; errors all current waiters and shuts down the queue
(define (shutdown)
(for ([w (in-set dequeue-waiters)])
(thread-send w #f #f))
(for ([(k v) (in-hash task-waiters)])
(for ([w (in-set v)])
(thread-send w (error "queue shutdown") #f))))
;; attempts to dequeue the next work
;; if there is work, set it to executing and invoke and remove one of the workers
;; returns #t if there might be more work available, #f if there is guaranteed no work right now
(define (try-dequeue)
(if (set-empty? dequeue-waiters)
(let ([w (set-first dequeue-waiters)])
(with-handlers ([exn:fail:sql? (lambda (e) (displayln e) #f)])
(match (query-maybe-row c get-next STATE-QUEUED (current-seconds))
[#f #f]
[(vector id task notbefore state completed)
(query-exec c update-state STATE-EXECUTING id)
(set-remove! dequeue-waiters w)
(thread-send w (list id (fasl->s-exp task)) #f)
;; marks a task as completed. invokes and removes any waiters
(define (complete from id)
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
(query-exec c update-state-completed STATE-DONE (current-seconds) id)
(invoke-task-waiters! id)
(thread-send from #t #f)))
;; reschedules a task for a later time
(define (resched from data)
(match-define (list id notbefore) data)
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
(query-exec c update-notbefore notbefore STATE-QUEUED id)
(thread-send from #t #f)))
;; collects old done tasks
(define (collect-garbage now)
(with-handlers ([exn:fail:sql? (lambda (e) (displayln e))])
(query-exec c delete-gc STATE-DONE (- now 300))))
;; when was collect-garbage run last
(define last-garbage (box (current-seconds)))
;; main task queue loop
(let loop ()
(define now (current-seconds))
;; collect garbage if needed
(when (> now (+ (unbox last-garbage) 300))
(set-box! last-garbage now))
;; get the next wakeup time
(define wakeup (with-handlers ([exn:fail:sql? (lambda (e) 0)])
(or (query-maybe-value c earliest-wakeup) +inf.0)))
;; get thread mail event
(define recv-evt (thread-receive-evt))
;; create a wakeup event if wakeup is needed sometime
(define wakeup-evt (if (>= wakeup now)
(alarm-evt (* 1000 wakeup))
;; wait for either a message or a wakeup
;; if both events are ready, one will be pseudorandomly chosen so it should be fair
(define sync-result (sync recv-evt wakeup-evt))
[(equal? sync-result wakeup-evt)
;; run dequeue
(let dequeue-loop ()
(when (try-dequeue)
;; handle message
(match-define (qmsg from type data) (thread-receive))
(match type
['shutdown (shutdown)]
['enqueue (enqueue from data #f) (loop)]
['enqueue-wait (enqueue from data #t) (loop)]
['dequeue (dequeue from) (loop)]
['complete (complete from data) (loop)]
['resched (resched from data) (loop)]
[_ (thread-send from (error "unknown message") #f) (loop)])]))
;; disconnect and exit
(disconnect c)
;; creates a new taskq with db at given path, and optionally initializes it
(define (make-taskq path [initialize? #f])
(define c
(if initialize?
(sqlite3-connect #:database path #:mode 'create)
(sqlite3-connect #:database path)))
(when initialize?
(query-exec c TASKQ-INIT))
(define t (current-thread))
(define q (thread (lambda () (taskq-service c t))))
;; shuts down the task queue
(define (taskq-shutdown q)
(thread-send q (qmsg (current-thread) 'shutdown #f))
(thread-wait q))
;; if the next mail is an exn, raise it, otherwise return the value
(define (receive-check)
(match (thread-receive)
[(? exn? e) (raise e)]
[result result]))
;; enqueues a task
(define (taskq-enqueue q task [id #f] [notbefore 0] [wait? #t])
(thread-send q (qmsg (current-thread) (if wait? 'enqueue-wait 'enqueue)
(list id task notbefore)))
;; dequeues a task
;; returns the next task (now marked as executing) or #f
(define (taskq-dequeue q)
(thread-send q (qmsg (current-thread) 'dequeue #f))
;; marks a task complete
(define (taskq-complete q id)
(thread-send q (qmsg (current-thread) 'complete id))
;; reschedules a task for later
;; cannot be combined with taskq-complete: workers must call exactly one or the other, then call
;; taskq-dequeue for the next assignment
(define (taskq-resched q id notbefore)
(thread-send q (qmsg (current-thread) 'resched (list id notbefore)))