diff --git a/private/taskq.rkt b/private/taskq.rkt new file mode 100644 index 0000000..e1beed0 --- /dev/null +++ b/private/taskq.rkt @@ -0,0 +1,110 @@ +#lang racket + +(require db crypto racket/fasl) + +(provide make-taskq taskq-close) + +(struct taskq [dbname db stmts waiters lock] #:transparent) + +(define STATE-QUEUED 0) +(define STATE-EXECUTING 1) +(define STATE-DONE 2) + +(define TASKQ-INIT "create table taskq (id blob(16) primary key, task blob, state integer)") +(define (init-stmts c) + (hash + 'delete-by-state (prepare c "delete from taskq where state=?") + 'update-state-by-state (prepare c "update taskq set state=? where state=?") + 'create (prepare c "insert into taskq (id, task, state) values (?, ?, ?)") + 'update-state (prepare c "update taskq set state=? where id=? and state=?; select changes()") + 'delete-by-id (prepare c "delete from taskq where id=?") + 'get-first-by-state (prepare c "select * from taskq where state=? limit 1"))) + +(define (make-taskq path [initialize? #f]) + (define c + (if initialize? + (sqlite3-connect #:database path #:mode 'create) + (sqlite3-connect #:database path))) + (when initialize? + (query-exec c TASKQ-INIT)) + (define stmts (init-stmts c)) + ;; cleanup: if we crash and come back up, there might be done tasks and executing tasks + ;; delete the done tasks, we don't have waiters for those anymore and any code that creates + ;; waiters should check for the result it wanted and re-queue if needed + ;; then, move executing tasks (which must have been interrupted by the crash) back to + ;; queued so we can run them again + (query-exec c (hash-ref stmts 'delete-by-state) STATE-DONE) + (query-exec c (hash-ref stmts 'update-state-by-state) STATE-QUEUED STATE-EXECUTING) + (taskq path c stmts (make-hash) (make-semaphore 1))) + +(define (taskq-close q) + (disconnect (taskq-db q))) + +(define (update-waiters! q id) + ;; ok basically fuck racket and its fucking "caveats concerning concurrent modification" + ;; preemptive green threads are why we can't have nice things + ;; it turns out in practice actually this probably won't be an issue ever but Just To Be Safe + ;; we use a semaphore as a mutex protecting writes on taskq-waiters + (call-with-semaphore + (taskq-lock q) + (lambda () + (hash-ref! (taskq-waiters q) id (make-semaphore))))) + +(define (taskq-queue q what [num-retries 8] [retry-delay-secs 1/4]) + (define id (crypto-random-bytes 16)) + (with-handlers ([exn:fail:sql? (lambda (e) + (cond + [(zero? num-retries) (raise e)] + [else (sleep retry-delay-secs) + (taskq-queue q what (sub1 num-retries) + retry-delay-secs)]))]) + (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'create) + id (s-exp->fasl what) STATE-QUEUED)) + (define sema (update-waiters! q id)) + (semaphore-wait sema)) + +;; blocks until the next task is available, then calls executor +(define (taskq-execute-next q executor) + ;; this is a bad hack + ;; use filesystem-change-evt to detect writes to the database + ;; initialize the event first before query to avoid deadlock + (define evt (filesystem-change-evt (taskq-dbname q))) + (match (query-maybe-row (taskq-db q) (hash-ref (taskq-stmts q) 'get-first-by-state) STATE-QUEUED) + [#f (sync evt) (taskq-execute-next q executor)] + [(vector id task state) + (filesystem-change-evt-cancel evt) + ;; update state, match expected existing state so that on concurrent modification, the query + ;; fails and we retry with the next available row + (with-handlers ([exn:fail:sql? (lambda (_) (taskq-execute-next q executor))]) + (when (zero? + (query-value (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) + STATE-EXECUTING id state)) + (taskq-execute-next q executor))) + ;; now we have a unique lock to execute this row + (define (cleanup) + (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) + STATE-QUEUED id STATE-EXECUTING)) + (with-handlers ([exn? (lambda (e) (cleanup) (raise e))]) + (executor (fasl->s-exp task))) + (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'update-state) + STATE-DONE id STATE-EXECUTING) + (define sema (hash-ref (taskq-waiters q) id #f)) + (unless (false? sema) + (semaphore-post sema)) + (query-exec (taskq-db q) (hash-ref (taskq-stmts q) 'delete-by-id) id)])) + +(define q (make-taskq "/tmp/s.sqlite3" #t)) + +(define (producer [n 0]) + (printf "producing ~a\n" n) + (taskq-queue q n) + (sleep 1) + (producer (add1 n))) + +(define (consumer) + (taskq-execute-next q (lambda (val) (printf "consuming ~a\n" val) + (sleep 2) (printf "done consuming\n"))) + (consumer)) + +(thread producer) +(consumer)