Compare commits
5 Commits
e355bac41c
...
aed3089995
Author | SHA1 | Date |
---|---|---|
tali | aed3089995 | |
tali | 04478ffa0e | |
tali | 69f182d0c2 | |
tali | 10fbd898c1 | |
tali | e160156b78 |
|
@ -25,6 +25,12 @@ let create () =
|
|||
|
||||
let is_empty seq = seq.next == seq
|
||||
|
||||
let reset seq =
|
||||
begin
|
||||
seq.next <- seq;
|
||||
seq.prev <- seq;
|
||||
end
|
||||
|
||||
let remove node =
|
||||
if node.node_active then begin
|
||||
node.node_active <- false;
|
||||
|
@ -74,6 +80,15 @@ let fold_r f seq acc =
|
|||
in
|
||||
loop seq.prev acc
|
||||
|
||||
let iter_l f seq =
|
||||
let rec loop curr =
|
||||
if curr != seq then
|
||||
let node = node_of_seq curr in
|
||||
if node.node_active then f node.node_data;
|
||||
loop node.node_next
|
||||
in
|
||||
loop seq.next
|
||||
|
||||
let find f seq =
|
||||
let rec loop curr =
|
||||
if curr == seq then
|
||||
|
|
|
@ -21,6 +21,9 @@ val create : unit -> 'a t
|
|||
val is_empty : 'a t -> bool
|
||||
(** Returns [true] iff the given sequence is empty *)
|
||||
|
||||
val reset : 'a t -> unit
|
||||
(** [reset ()] is a lazy way to remove all the elements from the sequence *)
|
||||
|
||||
val add_l : 'a -> 'a t -> 'a node
|
||||
(** [add_l x s] adds [x] to the left of the sequence [s] *)
|
||||
|
||||
|
@ -57,6 +60,10 @@ val fold_r : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b
|
|||
where [e1], [e2], ..., [en] are the elements of [s]
|
||||
*)
|
||||
|
||||
val iter_l : ('a -> unit) -> 'a t -> unit
|
||||
(** [iter_l f s] applies [f] on all elements of [s] starting from
|
||||
the left *)
|
||||
|
||||
val find : ('a -> bool) -> 'a t -> 'a
|
||||
(** [find_node_l f s] returns the first element of [s] starting from the left
|
||||
that satisfies [f] or raises [Not_found] if none exists. *)
|
||||
|
|
|
@ -25,6 +25,11 @@ let%expect_test _ =
|
|||
Wheel.add wh 6 |> ignore;
|
||||
(* t=3 *)
|
||||
print_ints_nl (Wheel.tick wh); [%expect {| [] |}];
|
||||
|
||||
let every = ref [] in
|
||||
Wheel.iter (fun x -> every := x :: !every) wh;
|
||||
print_ints_nl (List.sort compare !every); [%expect {| [1;2;3;4;5;6] |}];
|
||||
|
||||
(* t=0 *)
|
||||
print_ints_nl (Wheel.tick wh); [%expect {| [1;2;3] |}];
|
||||
(* t=1 *)
|
||||
|
|
|
@ -22,3 +22,8 @@ let[@tail_mod_cons] rec empty t =
|
|||
let tick t =
|
||||
t.index <- (t.index + 1) mod Array.length t.entries;
|
||||
empty t
|
||||
|
||||
let iter f t =
|
||||
for i = 0 to Array.length t.entries - 1 do
|
||||
Dllist.iter_l f t.entries.(i)
|
||||
done
|
||||
|
|
|
@ -28,6 +28,16 @@ let find_chan t name =
|
|||
let whowas t nick =
|
||||
Cache.find_all t.whowas (string_ci nick)
|
||||
|
||||
let nuke t =
|
||||
begin
|
||||
Hashtbl.iter (fun _ u -> Dllist.reset u.membership) t.users;
|
||||
Hashtbl.iter (fun _ c -> Dllist.reset c.members) t.channels;
|
||||
Hashtbl.clear t.users;
|
||||
Hashtbl.clear t.channels;
|
||||
t.lusers <- 0;
|
||||
t.luserchannels <- 0;
|
||||
end
|
||||
|
||||
let relay ~(from : user) (msg : Msg.t) tgts =
|
||||
let msg =
|
||||
if msg.prefix = No_prefix then
|
||||
|
|
|
@ -14,25 +14,21 @@ type config = {
|
|||
notify : [`ready | `stopping] -> unit;
|
||||
}
|
||||
|
||||
type ping_wheel = Connection.t Wheel.t
|
||||
|
||||
let listener
|
||||
let bind_server
|
||||
~(port : int)
|
||||
~(listen_backlog : int)
|
||||
~(on_ready : unit -> unit)
|
||||
: (fd * sockaddr) Lwt_stream.t =
|
||||
let sock : fd Lwt.t =
|
||||
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
|
||||
Lwt_unix.setsockopt fd SO_REUSEPORT false;
|
||||
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
|
||||
let* () = Lwt_unix.bind fd srv_adr in
|
||||
Lwt_unix.listen fd listen_backlog;
|
||||
on_ready ();
|
||||
info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
|
||||
Lwt.return fd
|
||||
in
|
||||
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
|
||||
: fd Lwt.t =
|
||||
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
|
||||
Lwt_unix.setsockopt fd SO_REUSEPORT false;
|
||||
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
|
||||
let* () = Lwt_unix.bind fd srv_adr in
|
||||
Lwt_unix.listen fd listen_backlog;
|
||||
info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
|
||||
Lwt.return fd
|
||||
|
||||
let accepts (fd : fd) : (fd * sockaddr) Lwt_stream.t =
|
||||
let accept () = Lwt_unix.accept fd >>= Lwt.return_some in
|
||||
Lwt_stream.from accept
|
||||
|
||||
let reader (fd : fd) : Msg.t Lwt_stream.t =
|
||||
|
@ -81,7 +77,7 @@ let handle_client
|
|||
(conn_addr : sockaddr)
|
||||
~(server_info : Server_info.t)
|
||||
~(router : Router.t)
|
||||
~(ping_wheel : ping_wheel)
|
||||
~(ping_wheel : Connection.t Wheel.t)
|
||||
=
|
||||
info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
|
||||
let conn : Connection.t =
|
||||
|
@ -107,6 +103,24 @@ let handle_client
|
|||
(fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e));
|
||||
end
|
||||
|
||||
let interval dt =
|
||||
let tick () =
|
||||
let* () = Lwt_unix.sleep dt in
|
||||
Lwt.return_some ()
|
||||
in
|
||||
Lwt_stream.from tick
|
||||
|
||||
let interrupt () =
|
||||
let signal, signal_waiter = Lwt.wait () in
|
||||
let on_signal num =
|
||||
trace (fun m -> m "caught signal %d" num);
|
||||
try Lwt.wakeup signal_waiter () with
|
||||
Invalid_argument _ -> failwith "unceremoniously exiting"
|
||||
in
|
||||
Lwt_unix.on_signal (2 (* SIGINT *)) on_signal |> ignore;
|
||||
Lwt_unix.on_signal (15 (* SIGTERM *)) on_signal |> ignore;
|
||||
signal
|
||||
|
||||
let run {
|
||||
port;
|
||||
listen_backlog;
|
||||
|
@ -137,6 +151,13 @@ let run {
|
|||
info (fun m -> m "version:@ %s" server_info.version);
|
||||
info (fun m -> m "created:@ %s" server_info.created);
|
||||
|
||||
let* server : fd =
|
||||
bind_server
|
||||
~port
|
||||
~listen_backlog
|
||||
in
|
||||
notify `ready;
|
||||
|
||||
let router : Router.t =
|
||||
Router.make
|
||||
~whowas_history_len
|
||||
|
@ -147,40 +168,45 @@ let run {
|
|||
ping_interval
|
||||
in
|
||||
|
||||
let on_tick () =
|
||||
(* trace (fun m -> m "tick"); *)
|
||||
List.iter
|
||||
(fun conn ->
|
||||
match Connection.on_ping conn with
|
||||
| Ok () -> Wheel.add ping_wheel conn
|
||||
| Error () -> Connection.close conn ~reason:"Connection timed out")
|
||||
(Wheel.tick ping_wheel)
|
||||
let ping conn =
|
||||
match Connection.on_ping conn with
|
||||
| Ok () -> Wheel.add ping_wheel conn
|
||||
| Error _ -> Connection.close conn ~reason:"Connection timeout"
|
||||
in
|
||||
|
||||
let pinger_promise =
|
||||
Lwt_stream.iter
|
||||
on_tick
|
||||
(Lwt_stream.from @@ fun () ->
|
||||
let* () = Lwt_unix.sleep 1.0 in
|
||||
Lwt.return_some ())
|
||||
in
|
||||
|
||||
let on_con (fd, adr) =
|
||||
handle_client fd adr
|
||||
~server_info
|
||||
~router
|
||||
~ping_wheel
|
||||
(fun () ->
|
||||
List.iter ping
|
||||
(Wheel.tick ping_wheel))
|
||||
(interval 1.0)
|
||||
in
|
||||
|
||||
let listener_promise =
|
||||
Lwt_stream.iter
|
||||
on_con
|
||||
(listener
|
||||
~port
|
||||
~listen_backlog
|
||||
~on_ready:(fun () -> notify `ready))
|
||||
(fun (fd, addr) ->
|
||||
handle_client fd addr
|
||||
~server_info
|
||||
~router
|
||||
~ping_wheel)
|
||||
(accepts server)
|
||||
in
|
||||
|
||||
(* TODO: graceful cleanup on ctrl-c *)
|
||||
let* () =
|
||||
Lwt.pick [
|
||||
listener_promise <?> pinger_promise;
|
||||
interrupt ()
|
||||
]
|
||||
in
|
||||
notify `stopping;
|
||||
|
||||
listener_promise <&> pinger_promise
|
||||
info (fun m -> m "shutting down");
|
||||
let* () = Lwt_unix.close server in
|
||||
|
||||
Router.nuke router;
|
||||
Wheel.iter (fun conn -> Connection.close conn ~reason:"Server shutting down")
|
||||
(* ping wheel should contain every active connection *)
|
||||
ping_wheel;
|
||||
|
||||
(* give some time for the messages to send *)
|
||||
Lwt_unix.sleep 0.5
|
||||
|
|
|
@ -8,4 +8,5 @@ Environment=IRC_MOTD=/usr/local/share/talircd/motd
|
|||
#Environment=LOG_LEVEL=DEBUG
|
||||
|
||||
Type=notify
|
||||
KillSignal=SIGTERM
|
||||
ExecStart=/usr/local/bin/talircd
|
||||
|
|
Loading…
Reference in New Issue