#lang racket (require db crypto racket/fasl) (provide make-taskq taskq-close) (struct taskq [dbname db stmts waiters lock] #:transparent) (define STATE-QUEUED 0) (define STATE-EXECUTING 1) (define STATE-DONE 2) (define TASKQ-INIT "create table taskq (id blob(16) primary key, task blob, state integer)") (define (init-stmts c) (hash 'delete-by-state (prepare c "delete from taskq where state=?") 'update-state-by-state (prepare c "update taskq set state=? where state=?") 'create (prepare c "insert into taskq (id, task, state) values (?, ?, ?)") 'update-state (prepare c "update taskq set state=? where id=? and state=?; select changes()") 'delete-by-id (prepare c "delete from taskq where id=?") 'get-first-by-state (prepare c "select * from taskq where state=? limit 1"))) (define (make-taskq path [initialize? #f]) (define c (if initialize? (sqlite3-connect #:database path #:mode 'create) (sqlite3-connect #:database path))) (when initialize? (query-exec c TASKQ-INIT)) (define stmts (init-stmts c)) ;; cleanup: if we crash and come back up, there might be done tasks and executing tasks ;; delete the done tasks, we don't have waiters for those anymore and any code that creates ;; waiters should check for the result it wanted and re-queue if needed ;; then, move executing tasks (which must have been interrupted by the crash) back to ;; queued so we can run them again (query-exec c (hash-ref stmts 'delete-by-state) STATE-DONE) (query-exec c (hash-ref stmts 'update-state-by-state) STATE-QUEUED STATE-EXECUTING) (taskq path c stmts (make-hash) (make-semaphore 1))) (define (taskq-close q) (disconnect (taskq-db q))) (define (update-waiters! q id) ;; ok basically fuck racket and its fucking "caveats concerning concurrent modification" ;; preemptive green threads are why we can't have nice things ;; it turns out in practice actually this probably won't be an issue ever but Just To Be Safe ;; we use a semaphore as a mutex protecting writes on taskq-waiters (call-with-semaphore (taskq-lock q) (lambda () (hash-ref! (taskq-waiters q) id (make-semaphore))))) (define (taskq-queue q what [num-retries 8] [retry-delay-secs 1/4]) (define id (crypto-random-bytes 16)) (with-handlers ([exn:fail:sql? (lambda (e) (cond [(zero? num-retries) (raise e)] [else (sleep retry-delay-secs) (taskq-queue q what (sub1 num-retries) retry-delay-secs)]))]) (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'create) id (s-exp->fasl what) STATE-QUEUED)) (define sema (update-waiters! q id)) (semaphore-wait sema)) ;; blocks until the next task is available, then calls executor (define (taskq-execute-next q executor) ;; this is a bad hack ;; use filesystem-change-evt to detect writes to the database ;; initialize the event first before query to avoid deadlock (define evt (filesystem-change-evt (taskq-dbname q))) (match (query-maybe-row (taskq-db q) (hash-ref (taskq-stmts q) 'get-first-by-state) STATE-QUEUED) [#f (sync evt) (taskq-execute-next q executor)] [(vector id task state) (filesystem-change-evt-cancel evt) ;; update state, match expected existing state so that on concurrent modification, the query ;; fails and we retry with the next available row (with-handlers ([exn:fail:sql? (lambda (_) (taskq-execute-next q executor))]) (when (zero? (query-value (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) STATE-EXECUTING id state)) (taskq-execute-next q executor))) ;; now we have a unique lock to execute this row (define (cleanup) (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) STATE-QUEUED id STATE-EXECUTING)) (with-handlers ([exn? (lambda (e) (cleanup) (raise e))]) (executor (fasl->s-exp task))) (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) STATE-DONE id STATE-EXECUTING) (define sema (hash-ref (taskq-waiters q) id #f)) (unless (false? sema) (semaphore-post sema)) (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'delete-by-id) id)])) (define q (make-taskq "/tmp/s.sqlite3" #t)) (define (producer [n 0]) (printf "producing ~a\n" n) (taskq-queue q n) (sleep 1) (producer (add1 n))) (define (consumer) (taskq-execute-next q (lambda (val) (printf "consuming ~a\n" val) (sleep 2) (printf "done consuming\n"))) (consumer)) (thread producer) (consumer)