60 lines
1.4 KiB
Racket
60 lines
1.4 KiB
Racket
|
#lang racket/base
|
||
|
|
||
|
(require racket/async-channel racket/function racket/match racket/set)
|
||
|
|
||
|
(define (multiqueue-thd)
|
||
|
(break-enabled #f)
|
||
|
(define clients (weak-seteq))
|
||
|
(define (handle-break _)
|
||
|
(void "do nothing, for now"))
|
||
|
(with-handlers ([exn:break? handle-break])
|
||
|
(let loop ()
|
||
|
(define evt (thread-receive-evt))
|
||
|
(sync/enable-break evt)
|
||
|
(match (thread-receive)
|
||
|
[(? channel? ch)
|
||
|
(define ach (make-async-channel))
|
||
|
(set-add! clients ach)
|
||
|
(channel-put ch ach)]
|
||
|
[v (for ([ach (in-set clients)])
|
||
|
(printf "sending message ~a\n" v)
|
||
|
(async-channel-put ach v))])
|
||
|
(loop))))
|
||
|
|
||
|
(define (make-mq)
|
||
|
(thread multiqueue-thd))
|
||
|
|
||
|
(define (mq-destroy q)
|
||
|
(break-thread q))
|
||
|
|
||
|
(define (mq-register q)
|
||
|
(define ch (make-channel))
|
||
|
(thread-send q ch)
|
||
|
(channel-get ch))
|
||
|
|
||
|
(define (mq-send q v)
|
||
|
(thread-send q v))
|
||
|
|
||
|
(module+ main
|
||
|
(define (test-thread q x)
|
||
|
(define ach (mq-register q))
|
||
|
(let loop ()
|
||
|
(printf "~a got ~a\n" x (async-channel-get ach))
|
||
|
(loop)))
|
||
|
|
||
|
(define q (make-mq))
|
||
|
(define q1 (thread (thunk (test-thread q 1))))
|
||
|
(define q2 (thread (thunk (test-thread q 2))))
|
||
|
(define q3 (thread (thunk (test-thread q 3))))
|
||
|
|
||
|
(sleep 1)
|
||
|
(mq-send q "meow")
|
||
|
(mq-send q 'shonks)
|
||
|
(sleep 1)
|
||
|
(kill-thread q1)
|
||
|
(thread-wait q1)
|
||
|
(set! q1 #f)
|
||
|
(collect-garbage)
|
||
|
(mq-send q "meow 2")
|
||
|
(sleep 1))
|