Compare commits

...

5 Commits

7 changed files with 113 additions and 44 deletions

View File

@ -25,6 +25,12 @@ let create () =
let is_empty seq = seq.next == seq
let reset seq =
begin
seq.next <- seq;
seq.prev <- seq;
end
let remove node =
if node.node_active then begin
node.node_active <- false;
@ -74,6 +80,15 @@ let fold_r f seq acc =
in
loop seq.prev acc
let iter_l f seq =
let rec loop curr =
if curr != seq then
let node = node_of_seq curr in
if node.node_active then f node.node_data;
loop node.node_next
in
loop seq.next
let find f seq =
let rec loop curr =
if curr == seq then

View File

@ -21,6 +21,9 @@ val create : unit -> 'a t
val is_empty : 'a t -> bool
(** Returns [true] iff the given sequence is empty *)
val reset : 'a t -> unit
(** [reset ()] is a lazy way to remove all the elements from the sequence *)
val add_l : 'a -> 'a t -> 'a node
(** [add_l x s] adds [x] to the left of the sequence [s] *)
@ -57,6 +60,10 @@ val fold_r : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b
where [e1], [e2], ..., [en] are the elements of [s]
*)
val iter_l : ('a -> unit) -> 'a t -> unit
(** [iter_l f s] applies [f] on all elements of [s] starting from
the left *)
val find : ('a -> bool) -> 'a t -> 'a
(** [find_node_l f s] returns the first element of [s] starting from the left
that satisfies [f] or raises [Not_found] if none exists. *)

View File

@ -25,6 +25,11 @@ let%expect_test _ =
Wheel.add wh 6 |> ignore;
(* t=3 *)
print_ints_nl (Wheel.tick wh); [%expect {| [] |}];
let every = ref [] in
Wheel.iter (fun x -> every := x :: !every) wh;
print_ints_nl (List.sort compare !every); [%expect {| [1;2;3;4;5;6] |}];
(* t=0 *)
print_ints_nl (Wheel.tick wh); [%expect {| [1;2;3] |}];
(* t=1 *)

View File

@ -22,3 +22,8 @@ let[@tail_mod_cons] rec empty t =
let tick t =
t.index <- (t.index + 1) mod Array.length t.entries;
empty t
let iter f t =
for i = 0 to Array.length t.entries - 1 do
Dllist.iter_l f t.entries.(i)
done

View File

@ -28,6 +28,16 @@ let find_chan t name =
let whowas t nick =
Cache.find_all t.whowas (string_ci nick)
let nuke t =
begin
Hashtbl.iter (fun _ u -> Dllist.reset u.membership) t.users;
Hashtbl.iter (fun _ c -> Dllist.reset c.members) t.channels;
Hashtbl.clear t.users;
Hashtbl.clear t.channels;
t.lusers <- 0;
t.luserchannels <- 0;
end
let relay ~(from : user) (msg : Msg.t) tgts =
let msg =
if msg.prefix = No_prefix then

View File

@ -14,25 +14,21 @@ type config = {
notify : [`ready | `stopping] -> unit;
}
type ping_wheel = Connection.t Wheel.t
let listener
let bind_server
~(port : int)
~(listen_backlog : int)
~(on_ready : unit -> unit)
: (fd * sockaddr) Lwt_stream.t =
let sock : fd Lwt.t =
: fd Lwt.t =
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
Lwt_unix.setsockopt fd SO_REUSEPORT false;
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let* () = Lwt_unix.bind fd srv_adr in
Lwt_unix.listen fd listen_backlog;
on_ready ();
info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
Lwt.return fd
in
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
let accepts (fd : fd) : (fd * sockaddr) Lwt_stream.t =
let accept () = Lwt_unix.accept fd >>= Lwt.return_some in
Lwt_stream.from accept
let reader (fd : fd) : Msg.t Lwt_stream.t =
@ -81,7 +77,7 @@ let handle_client
(conn_addr : sockaddr)
~(server_info : Server_info.t)
~(router : Router.t)
~(ping_wheel : ping_wheel)
~(ping_wheel : Connection.t Wheel.t)
=
info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
let conn : Connection.t =
@ -107,6 +103,24 @@ let handle_client
(fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e));
end
let interval dt =
let tick () =
let* () = Lwt_unix.sleep dt in
Lwt.return_some ()
in
Lwt_stream.from tick
let interrupt () =
let signal, signal_waiter = Lwt.wait () in
let on_signal num =
trace (fun m -> m "caught signal %d" num);
try Lwt.wakeup signal_waiter () with
Invalid_argument _ -> failwith "unceremoniously exiting"
in
Lwt_unix.on_signal (2 (* SIGINT *)) on_signal |> ignore;
Lwt_unix.on_signal (15 (* SIGTERM *)) on_signal |> ignore;
signal
let run {
port;
listen_backlog;
@ -137,6 +151,13 @@ let run {
info (fun m -> m "version:@ %s" server_info.version);
info (fun m -> m "created:@ %s" server_info.created);
let* server : fd =
bind_server
~port
~listen_backlog
in
notify `ready;
let router : Router.t =
Router.make
~whowas_history_len
@ -147,40 +168,45 @@ let run {
ping_interval
in
let on_tick () =
(* trace (fun m -> m "tick"); *)
List.iter
(fun conn ->
let ping conn =
match Connection.on_ping conn with
| Ok () -> Wheel.add ping_wheel conn
| Error () -> Connection.close conn ~reason:"Connection timed out")
(Wheel.tick ping_wheel)
| Error _ -> Connection.close conn ~reason:"Connection timeout"
in
let pinger_promise =
Lwt_stream.iter
on_tick
(Lwt_stream.from @@ fun () ->
let* () = Lwt_unix.sleep 1.0 in
Lwt.return_some ())
in
let on_con (fd, adr) =
handle_client fd adr
~server_info
~router
~ping_wheel
(fun () ->
List.iter ping
(Wheel.tick ping_wheel))
(interval 1.0)
in
let listener_promise =
Lwt_stream.iter
on_con
(listener
~port
~listen_backlog
~on_ready:(fun () -> notify `ready))
(fun (fd, addr) ->
handle_client fd addr
~server_info
~router
~ping_wheel)
(accepts server)
in
(* TODO: graceful cleanup on ctrl-c *)
let* () =
Lwt.pick [
listener_promise <?> pinger_promise;
interrupt ()
]
in
notify `stopping;
listener_promise <&> pinger_promise
info (fun m -> m "shutting down");
let* () = Lwt_unix.close server in
Router.nuke router;
Wheel.iter (fun conn -> Connection.close conn ~reason:"Server shutting down")
(* ping wheel should contain every active connection *)
ping_wheel;
(* give some time for the messages to send *)
Lwt_unix.sleep 0.5

View File

@ -8,4 +8,5 @@ Environment=IRC_MOTD=/usr/local/share/talircd/motd
#Environment=LOG_LEVEL=DEBUG
Type=notify
KillSignal=SIGTERM
ExecStart=/usr/local/bin/talircd