gist/racket/queue.rkt

60 lines
1.4 KiB
Racket

#lang racket/base
(require racket/async-channel racket/function racket/match racket/set)
(define (multiqueue-thd)
(break-enabled #f)
(define clients (weak-seteq))
(define (handle-break _)
(void "do nothing, for now"))
(with-handlers ([exn:break? handle-break])
(let loop ()
(define evt (thread-receive-evt))
(sync/enable-break evt)
(match (thread-receive)
[(? channel? ch)
(define ach (make-async-channel))
(set-add! clients ach)
(channel-put ch ach)]
[v (for ([ach (in-set clients)])
(printf "sending message ~a\n" v)
(async-channel-put ach v))])
(loop))))
(define (make-mq)
(thread multiqueue-thd))
(define (mq-destroy q)
(break-thread q))
(define (mq-register q)
(define ch (make-channel))
(thread-send q ch)
(channel-get ch))
(define (mq-send q v)
(thread-send q v))
(module+ main
(define (test-thread q x)
(define ach (mq-register q))
(let loop ()
(printf "~a got ~a\n" x (async-channel-get ach))
(loop)))
(define q (make-mq))
(define q1 (thread (thunk (test-thread q 1))))
(define q2 (thread (thunk (test-thread q 2))))
(define q3 (thread (thunk (test-thread q 3))))
(sleep 1)
(mq-send q "meow")
(mq-send q 'shonks)
(sleep 1)
(kill-thread q1)
(thread-wait q1)
(set! q1 #f)
(collect-garbage)
(mq-send q "meow 2")
(sleep 1))