open! Import (* include (val Logging.sublogs logger "Outbox") *) type t = { stream : Msg.t Lwt_stream.t; push : Msg.t option -> unit; mutable bcc_incl : bool; } let make () = let stream, push = Lwt_stream.create () in { stream; push; bcc_incl = false } let stream t = t.stream let send t msg = try t.push (Some msg) with Lwt_stream.Closed -> () let close t = try t.push None with Lwt_stream.Closed -> () let is_closed t = Lwt_stream.is_closed t.stream module Bcc = struct (** this module is used to send a message to a number of outboxes at once without sending to the same destination twice. it uses a global linked list of recipients with an intrusive bool ([outbox.bcc_incl]) to prevent enqueuing the same destination twice. [Bcc.add a; Bcc.add b; Bcc.send_all msg; (* sends [msg] to [a] and [b]. *)] *) let _recipients = Dllist.create () let add obx = if not obx.bcc_incl then let _ = Dllist.add_r obx _recipients in obx.bcc_incl <- true let rec send_all msg = match Dllist.take_l _recipients with | obx -> obx.bcc_incl <- false; send obx msg; send_all msg | exception Not_found -> () end