40 lines
1.1 KiB
OCaml
40 lines
1.1 KiB
OCaml
open! Import
|
|
|
|
(* include (val Logging.sublogs logger "Outbox") *)
|
|
|
|
type t = {
|
|
stream : Msg.t Lwt_stream.t;
|
|
push : Msg.t option -> unit;
|
|
mutable bcc_incl : bool;
|
|
}
|
|
|
|
let make () =
|
|
let stream, push = Lwt_stream.create () in
|
|
{ stream; push; bcc_incl = false }
|
|
|
|
let stream t = t.stream
|
|
let send t msg = try t.push (Some msg) with Lwt_stream.Closed -> ()
|
|
let close t = try t.push None with Lwt_stream.Closed -> ()
|
|
|
|
module Bcc = struct
|
|
(** this module is used to send a message to a number of outboxes at once without sending
|
|
to the same destination twice. it uses a global linked list of recipients with an
|
|
intrusive bool ([outbox.bcc_incl]) to prevent enqueuing the same destination twice.
|
|
|
|
[Bcc.add a; Bcc.add b; Bcc.send_all msg;
|
|
(* sends [msg] to [a] and [b]. *)]
|
|
*)
|
|
|
|
let _recipients = Dllist.create ()
|
|
|
|
let add obx =
|
|
if not obx.bcc_incl then
|
|
let _ = Dllist.add_r obx _recipients in
|
|
obx.bcc_incl <- true
|
|
|
|
let rec send_all msg =
|
|
match Dllist.take_l _recipients with
|
|
| obx -> obx.bcc_incl <- false; send obx msg; send_all msg
|
|
| exception Dllist.Empty -> ()
|
|
end
|