quit a little more gracefully when interrupted (SIGINT, SIGTERM)

This commit is contained in:
tali 2024-02-01 15:17:23 -05:00
parent e355bac41c
commit e160156b78
1 changed files with 65 additions and 44 deletions

View File

@ -14,25 +14,21 @@ type config = {
notify : [`ready | `stopping] -> unit;
}
type ping_wheel = Connection.t Wheel.t
let listener
let bind_server
~(port : int)
~(listen_backlog : int)
~(on_ready : unit -> unit)
: (fd * sockaddr) Lwt_stream.t =
let sock : fd Lwt.t =
: fd Lwt.t =
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
Lwt_unix.setsockopt fd SO_REUSEPORT false;
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let* () = Lwt_unix.bind fd srv_adr in
Lwt_unix.listen fd listen_backlog;
on_ready ();
info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
Lwt.return fd
in
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
let accepts (fd : fd) : (fd * sockaddr) Lwt_stream.t =
let accept () = Lwt_unix.accept fd >>= Lwt.return_some in
Lwt_stream.from accept
let reader (fd : fd) : Msg.t Lwt_stream.t =
@ -81,7 +77,7 @@ let handle_client
(conn_addr : sockaddr)
~(server_info : Server_info.t)
~(router : Router.t)
~(ping_wheel : ping_wheel)
~(ping_wheel : Connection.t Wheel.t)
=
info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
let conn : Connection.t =
@ -107,6 +103,24 @@ let handle_client
(fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e));
end
let interval dt =
let tick () =
let* () = Lwt_unix.sleep dt in
Lwt.return_some ()
in
Lwt_stream.from tick
let interrupt () =
let signal, signal_waiter = Lwt.wait () in
let on_signal num =
trace (fun m -> m "caught signal %d" num);
try Lwt.wakeup signal_waiter () with
Invalid_argument _ -> failwith "unceremoniously exiting"
in
Lwt_unix.on_signal (2 (* SIGINT *)) on_signal |> ignore;
Lwt_unix.on_signal (15 (* SIGTERM *)) on_signal |> ignore;
signal
let run {
port;
listen_backlog;
@ -137,6 +151,13 @@ let run {
info (fun m -> m "version:@ %s" server_info.version);
info (fun m -> m "created:@ %s" server_info.created);
let* server : fd =
bind_server
~port
~listen_backlog
in
notify `ready;
let router : Router.t =
Router.make
~whowas_history_len
@ -147,40 +168,40 @@ let run {
ping_interval
in
let on_tick () =
(* trace (fun m -> m "tick"); *)
List.iter
(fun conn ->
let ping conn =
match Connection.on_ping conn with
| Ok () -> Wheel.add ping_wheel conn
| Error () -> Connection.close conn ~reason:"Connection timed out")
(Wheel.tick ping_wheel)
| Error _ -> Connection.close conn ~reason:"Connection timeout"
in
let pinger_promise =
Lwt_stream.iter
on_tick
(Lwt_stream.from @@ fun () ->
let* () = Lwt_unix.sleep 1.0 in
Lwt.return_some ())
in
let on_con (fd, adr) =
handle_client fd adr
~server_info
~router
~ping_wheel
(fun () ->
List.iter ping
(Wheel.tick ping_wheel))
(interval 1.0)
in
let listener_promise =
Lwt_stream.iter
on_con
(listener
~port
~listen_backlog
~on_ready:(fun () -> notify `ready))
(fun (fd, addr) ->
handle_client fd addr
~server_info
~router
~ping_wheel)
(accepts server)
in
(* TODO: graceful cleanup on ctrl-c *)
let* () =
Lwt.pick [
listener_promise <?> pinger_promise;
interrupt ()
]
in
notify `stopping;
(* TODO: send QUIT to all connections *)
info (fun m -> m "shutting down");
Lwt_unix.close server
listener_promise <&> pinger_promise