implement reasonable task queue
This commit is contained in:
parent
89b3c817de
commit
658f581272
|
@ -2,24 +2,177 @@
|
||||||
|
|
||||||
(require db crypto racket/fasl)
|
(require db crypto racket/fasl)
|
||||||
|
|
||||||
(provide make-taskq taskq-close)
|
(provide make-taskq taskq-shutdown taskq-enqueue taskq-dequeue taskq-complete taskq-resched)
|
||||||
|
|
||||||
(struct taskq [dbname db stmts waiters lock] #:transparent)
|
|
||||||
|
|
||||||
|
;; task states
|
||||||
(define STATE-QUEUED 0)
|
(define STATE-QUEUED 0)
|
||||||
(define STATE-EXECUTING 1)
|
(define STATE-EXECUTING 1)
|
||||||
(define STATE-DONE 2)
|
(define STATE-DONE 2)
|
||||||
|
|
||||||
(define TASKQ-INIT "create table taskq (id blob(16) primary key, task blob, state integer)")
|
;; message struct for task queue
|
||||||
(define (init-stmts c)
|
(struct qmsg [from type data] #:transparent)
|
||||||
(hash
|
;; sql to initialize db
|
||||||
'delete-by-state (prepare c "delete from taskq where state=?")
|
(define TASKQ-INIT "create table taskq (id blob primary key, task blob, notbefore int, state integer, completed int)")
|
||||||
'update-state-by-state (prepare c "update taskq set state=? where state=?")
|
|
||||||
'create (prepare c "insert into taskq (id, task, state) values (?, ?, ?)")
|
|
||||||
'update-state (prepare c "update taskq set state=? where id=? and state=?; select changes()")
|
|
||||||
'delete-by-id (prepare c "delete from taskq where id=?")
|
|
||||||
'get-first-by-state (prepare c "select * from taskq where state=? limit 1")))
|
|
||||||
|
|
||||||
|
;; runs the task queue
|
||||||
|
(define (taskq-service c init-thread)
|
||||||
|
;; define some prepared statements
|
||||||
|
(define delete-all-done (bind-prepared-statement
|
||||||
|
(prepare c "delete from taskq where state=?")
|
||||||
|
(list STATE-DONE)))
|
||||||
|
(define delete-gc (prepare c "delete from taskq where state=? and completed<=?"))
|
||||||
|
(define reset-executing (bind-prepared-statement
|
||||||
|
(prepare c "update taskq set state=? where state=?")
|
||||||
|
(list STATE-QUEUED STATE-EXECUTING)))
|
||||||
|
(define create (prepare c "insert into taskq (id, task, notbefore, state, completed) values (?, ?, ?, ?, ?)"))
|
||||||
|
(define update-state (prepare c "update taskq set state=? where id=?"))
|
||||||
|
(define update-state-completed (prepare c "update taskq set state=?, completed=? where id=?"))
|
||||||
|
(define update-notbefore (prepare c "update taskq set notbefore=?, state=? where id=?"))
|
||||||
|
(define delete-by-id (prepare c "delete from taskq where id=?"))
|
||||||
|
(define get-by-id (prepare c "select * from taskq where id=?"))
|
||||||
|
(define get-next (prepare c "select * from taskq where state=? and notbefore<=? limit 1"))
|
||||||
|
(define earliest-wakeup
|
||||||
|
(bind-prepared-statement
|
||||||
|
(prepare c "select notbefore from taskq where state=? order by notbefore asc limit 1")
|
||||||
|
(list STATE-QUEUED)))
|
||||||
|
;; cleanup: if we crash and come back up, there might be done tasks and executing tasks
|
||||||
|
;; delete the done tasks, we don't have waiters for those anymore and any code that creates
|
||||||
|
;; waiters should check for the result it wanted and re-queue if needed
|
||||||
|
;; then, move executing tasks (which must have been interrupted by the crash) back to
|
||||||
|
;; queued so we can run them again
|
||||||
|
(query-exec c delete-all-done)
|
||||||
|
(query-exec c reset-executing)
|
||||||
|
|
||||||
|
;; done with init
|
||||||
|
(thread-send init-thread #t #f)
|
||||||
|
(set! init-thread #f)
|
||||||
|
|
||||||
|
;; waiters for a given task
|
||||||
|
(define task-waiters (make-hash))
|
||||||
|
;; workers that are waiting for work
|
||||||
|
(define dequeue-waiters (mutable-set))
|
||||||
|
|
||||||
|
;; adds a waiter for a task with given id
|
||||||
|
(define (add-task-waiter! id from)
|
||||||
|
(hash-update! task-waiters id (lambda (s) (set-add s from)) set))
|
||||||
|
|
||||||
|
;; invokes and removes all waiters for a given id
|
||||||
|
(define (invoke-task-waiters! id)
|
||||||
|
(for ([w (in-set (hash-ref task-waiters id set))])
|
||||||
|
(thread-send w id #f))
|
||||||
|
(hash-remove! task-waiters id))
|
||||||
|
|
||||||
|
;; enqueues data. wait? specifies whether the queuer should get a message back immediately with
|
||||||
|
;; the task id, or only upon completion
|
||||||
|
(define (enqueue from data wait?)
|
||||||
|
(match-define (list id-tmp task notbefore) data)
|
||||||
|
(define id (or id-tmp (crypto-random-bytes 16)))
|
||||||
|
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
|
||||||
|
(match (query-maybe-row c get-by-id id)
|
||||||
|
[#f
|
||||||
|
(query-exec c create id (s-exp->fasl task) notbefore STATE-QUEUED 0)
|
||||||
|
(if wait?
|
||||||
|
(add-task-waiter! id from)
|
||||||
|
(thread-send from id #f))]
|
||||||
|
[(vector id task notbefore state completed)
|
||||||
|
(if (and wait? (not (= state STATE-DONE)))
|
||||||
|
(add-task-waiter! id from)
|
||||||
|
(thread-send from id #f))]))
|
||||||
|
(try-dequeue))
|
||||||
|
|
||||||
|
;; adds from to the dequeue list, attempts immediate dequeue if possible
|
||||||
|
(define (dequeue from)
|
||||||
|
(set-add! dequeue-waiters from)
|
||||||
|
(try-dequeue))
|
||||||
|
|
||||||
|
;; errors all current waiters and shuts down the queue
|
||||||
|
(define (shutdown)
|
||||||
|
(for ([w (in-set dequeue-waiters)])
|
||||||
|
(thread-send w #f #f))
|
||||||
|
(for ([(k v) (in-hash task-waiters)])
|
||||||
|
(thread-send v (error "queue shutdown") #f)))
|
||||||
|
|
||||||
|
;; attempts to dequeue the next work
|
||||||
|
;; if there is work, set it to executing and invoke and remove one of the workers
|
||||||
|
;; returns #t if there might be more work available, #f if there is guaranteed no work right now
|
||||||
|
(define (try-dequeue)
|
||||||
|
(if (set-empty? dequeue-waiters)
|
||||||
|
#f
|
||||||
|
(let ([w (set-first dequeue-waiters)])
|
||||||
|
(with-handlers ([exn:fail:sql? (lambda (e) (displayln e) #f)])
|
||||||
|
(match (query-maybe-row c get-next STATE-QUEUED (current-seconds))
|
||||||
|
[#f #f]
|
||||||
|
[(vector id task notbefore state completed)
|
||||||
|
(query-exec c update-state STATE-EXECUTING id)
|
||||||
|
(set-remove! dequeue-waiters w)
|
||||||
|
(thread-send w (list id (fasl->s-exp task)) #f)
|
||||||
|
#t])))))
|
||||||
|
|
||||||
|
;; marks a task as completed. invokes and removes any waiters
|
||||||
|
(define (complete from id)
|
||||||
|
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
|
||||||
|
(query-exec c update-state-completed STATE-DONE (current-seconds) id)
|
||||||
|
(invoke-task-waiters! id)
|
||||||
|
(thread-send from #t #f)))
|
||||||
|
|
||||||
|
;; reschedules a task for a later time
|
||||||
|
(define (resched from data)
|
||||||
|
(match-define (list id notbefore) data)
|
||||||
|
(with-handlers ([exn:fail:sql? (lambda (e) (thread-send from e #f))])
|
||||||
|
(query-exec c update-notbefore notbefore STATE-QUEUED id)
|
||||||
|
(thread-send from #t #f)))
|
||||||
|
|
||||||
|
;; collects old done tasks
|
||||||
|
(define (collect-garbage now)
|
||||||
|
(with-handlers ([exn:fail:sql? (lambda (e) (displayln e))])
|
||||||
|
(query-exec c delete-gc STATE-DONE (- now 300))))
|
||||||
|
|
||||||
|
;; when was collect-garbage run last
|
||||||
|
(define last-garbage (box (current-seconds)))
|
||||||
|
|
||||||
|
;; main task queue loop
|
||||||
|
(let loop ()
|
||||||
|
(define now (current-seconds))
|
||||||
|
;; collect garbage if needed
|
||||||
|
(when (> now (+ (unbox last-garbage) 300))
|
||||||
|
(collect-garbage)
|
||||||
|
(set-box! last-garbage now))
|
||||||
|
;; get the next wakeup time
|
||||||
|
(define wakeup (with-handlers ([exn:fail:sql? (lambda (e) 0)])
|
||||||
|
(or (query-maybe-value c earliest-wakeup) +inf.0)))
|
||||||
|
;; get thread mail event
|
||||||
|
(define recv-evt (thread-receive-evt))
|
||||||
|
;; create a wakeup event if wakeup is needed sometime
|
||||||
|
(define wakeup-evt (if (>= wakeup now)
|
||||||
|
(alarm-evt (* 1000 wakeup))
|
||||||
|
always-evt))
|
||||||
|
;; wait for either a message or a wakeup
|
||||||
|
;; if both events are ready, one will be pseudorandomly chosen so it should be fair
|
||||||
|
(define sync-result (sync recv-evt wakeup-evt))
|
||||||
|
(cond
|
||||||
|
[(equal? sync-result wakeup-evt)
|
||||||
|
;; run dequeue
|
||||||
|
(let dequeue-loop ()
|
||||||
|
(when (try-dequeue)
|
||||||
|
(dequeue-loop)))
|
||||||
|
(loop)]
|
||||||
|
[else
|
||||||
|
;; handle message
|
||||||
|
(match-define (qmsg from type data) (thread-receive))
|
||||||
|
(match type
|
||||||
|
['shutdown (shutdown)]
|
||||||
|
['enqueue (enqueue from data #f) (loop)]
|
||||||
|
['enqueue-wait (enqueue from data #t) (loop)]
|
||||||
|
['dequeue (dequeue from) (loop)]
|
||||||
|
['complete (complete from data) (loop)]
|
||||||
|
['resched (resched from data) (loop)]
|
||||||
|
[_ (thread-send from (error "unknown message") #f) (loop)])]))
|
||||||
|
|
||||||
|
;; disconnect and exit
|
||||||
|
(disconnect c)
|
||||||
|
(void))
|
||||||
|
|
||||||
|
;; creates a new taskq with db at given path, and optionally initializes it
|
||||||
(define (make-taskq path [initialize? #f])
|
(define (make-taskq path [initialize? #f])
|
||||||
(define c
|
(define c
|
||||||
(if initialize?
|
(if initialize?
|
||||||
|
@ -27,84 +180,42 @@
|
||||||
(sqlite3-connect #:database path)))
|
(sqlite3-connect #:database path)))
|
||||||
(when initialize?
|
(when initialize?
|
||||||
(query-exec c TASKQ-INIT))
|
(query-exec c TASKQ-INIT))
|
||||||
(define stmts (init-stmts c))
|
(define t (current-thread))
|
||||||
;; cleanup: if we crash and come back up, there might be done tasks and executing tasks
|
(define q (thread (lambda () (taskq-service c t))))
|
||||||
;; delete the done tasks, we don't have waiters for those anymore and any code that creates
|
(thread-receive)
|
||||||
;; waiters should check for the result it wanted and re-queue if needed
|
q)
|
||||||
;; then, move executing tasks (which must have been interrupted by the crash) back to
|
|
||||||
;; queued so we can run them again
|
|
||||||
(query-exec c (hash-ref stmts 'delete-by-state) STATE-DONE)
|
|
||||||
(query-exec c (hash-ref stmts 'update-state-by-state) STATE-QUEUED STATE-EXECUTING)
|
|
||||||
(taskq path c stmts (make-hash) (make-semaphore 1)))
|
|
||||||
|
|
||||||
(define (taskq-close q)
|
;; shuts down the task queue
|
||||||
(disconnect (taskq-db q)))
|
(define (taskq-shutdown q)
|
||||||
|
(thread-send q (qmsg (current-thread) 'shutdown #f))
|
||||||
|
(thread-wait q))
|
||||||
|
|
||||||
(define (update-waiters! q id)
|
;; if the next mail is an exn, raise it, otherwise return the value
|
||||||
;; ok basically fuck racket and its fucking "caveats concerning concurrent modification"
|
(define (receive-check)
|
||||||
;; preemptive green threads are why we can't have nice things
|
(match (thread-receive)
|
||||||
;; it turns out in practice actually this probably won't be an issue ever but Just To Be Safe
|
[(? exn? e) (raise e)]
|
||||||
;; we use a semaphore as a mutex protecting writes on taskq-waiters
|
[result result]))
|
||||||
(call-with-semaphore
|
|
||||||
(taskq-lock q)
|
|
||||||
(lambda ()
|
|
||||||
(hash-ref! (taskq-waiters q) id (make-semaphore)))))
|
|
||||||
|
|
||||||
(define (taskq-queue q what [num-retries 8] [retry-delay-secs 1/4])
|
;; enqueues a task
|
||||||
(define id (crypto-random-bytes 16))
|
(define (taskq-enqueue q task [id #f] [notbefore 0] [wait? #t])
|
||||||
(with-handlers ([exn:fail:sql? (lambda (e)
|
(thread-send q (qmsg (current-thread) (if wait? 'enqueue-wait 'enqueue)
|
||||||
(cond
|
(list id task notbefore)))
|
||||||
[(zero? num-retries) (raise e)]
|
(receive-check))
|
||||||
[else (sleep retry-delay-secs)
|
|
||||||
(taskq-queue q what (sub1 num-retries)
|
|
||||||
retry-delay-secs)]))])
|
|
||||||
(query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'create)
|
|
||||||
id (s-exp->fasl what) STATE-QUEUED))
|
|
||||||
(define sema (update-waiters! q id))
|
|
||||||
(semaphore-wait sema))
|
|
||||||
|
|
||||||
;; blocks until the next task is available, then calls executor
|
;; dequeues a task
|
||||||
(define (taskq-execute-next q executor)
|
;; returns the next task (now marked as executing) or #f
|
||||||
;; this is a bad hack
|
(define (taskq-dequeue q)
|
||||||
;; use filesystem-change-evt to detect writes to the database
|
(thread-send q (qmsg (current-thread) 'dequeue #f))
|
||||||
;; initialize the event first before query to avoid deadlock
|
(receive-check))
|
||||||
(define evt (filesystem-change-evt (taskq-dbname q)))
|
|
||||||
(match (query-maybe-row (taskq-db q) (hash-ref (taskq-stmts q) 'get-first-by-state) STATE-QUEUED)
|
|
||||||
[#f (sync evt) (taskq-execute-next q executor)]
|
|
||||||
[(vector id task state)
|
|
||||||
(filesystem-change-evt-cancel evt)
|
|
||||||
;; update state, match expected existing state so that on concurrent modification, the query
|
|
||||||
;; fails and we retry with the next available row
|
|
||||||
(with-handlers ([exn:fail:sql? (lambda (_) (taskq-execute-next q executor))])
|
|
||||||
(when (zero?
|
|
||||||
(query-value (taskq-db q) (hash-ref (taskq-stmts q) 'update-state)
|
|
||||||
STATE-EXECUTING id state))
|
|
||||||
(taskq-execute-next q executor)))
|
|
||||||
;; now we have a unique lock to execute this row
|
|
||||||
(define (cleanup)
|
|
||||||
(query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state)
|
|
||||||
STATE-QUEUED id STATE-EXECUTING))
|
|
||||||
(with-handlers ([exn? (lambda (e) (cleanup) (raise e))])
|
|
||||||
(executor (fasl->s-exp task)))
|
|
||||||
(query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state)
|
|
||||||
STATE-DONE id STATE-EXECUTING)
|
|
||||||
(define sema (hash-ref (taskq-waiters q) id #f))
|
|
||||||
(unless (false? sema)
|
|
||||||
(semaphore-post sema))
|
|
||||||
(query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'delete-by-id) id)]))
|
|
||||||
|
|
||||||
(define q (make-taskq "/tmp/s.sqlite3" #t))
|
;; marks a task complete
|
||||||
|
(define (taskq-complete q id)
|
||||||
|
(thread-send q (qmsg (current-thread) 'complete id))
|
||||||
|
(receive-check))
|
||||||
|
|
||||||
(define (producer [n 0])
|
;; reschedules a task for later
|
||||||
(printf "producing ~a\n" n)
|
;; cannot be combined with taskq-complete: workers must call exactly one or the other, then call
|
||||||
(taskq-queue q n)
|
;; taskq-dequeue for the next assignment
|
||||||
(sleep 1)
|
(define (taskq-resched q id notbefore)
|
||||||
(producer (add1 n)))
|
(thread-send q (qmsg (current-thread) 'resched (list id notbefore)))
|
||||||
|
(receive-check))
|
||||||
(define (consumer)
|
|
||||||
(taskq-execute-next q (lambda (val) (printf "consuming ~a\n" val)
|
|
||||||
(sleep 2) (printf "done consuming\n")))
|
|
||||||
(consumer))
|
|
||||||
|
|
||||||
(thread producer)
|
|
||||||
(consumer)
|
|
||||||
|
|
Loading…
Reference in New Issue