From e160156b78b2e58c68b25d26ccb0cdea9f37532e Mon Sep 17 00:00:00 2001 From: tali Date: Thu, 1 Feb 2024 15:17:23 -0500 Subject: [PATCH] quit a little more gracefully when interrupted (SIGINT, SIGTERM) --- lib/server/server.ml | 109 ++++++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/lib/server/server.ml b/lib/server/server.ml index a8b781f..68102a8 100644 --- a/lib/server/server.ml +++ b/lib/server/server.ml @@ -14,25 +14,21 @@ type config = { notify : [`ready | `stopping] -> unit; } -type ping_wheel = Connection.t Wheel.t - -let listener +let bind_server ~(port : int) ~(listen_backlog : int) - ~(on_ready : unit -> unit) - : (fd * sockaddr) Lwt_stream.t = - let sock : fd Lwt.t = - let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in - Lwt_unix.setsockopt fd SO_KEEPALIVE false; - Lwt_unix.setsockopt fd SO_REUSEPORT false; - let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in - let* () = Lwt_unix.bind fd srv_adr in - Lwt_unix.listen fd listen_backlog; - on_ready (); - info (fun m -> m "listening on %a" pp_sockaddr srv_adr); - Lwt.return fd - in - let accept () = sock >>= Lwt_unix.accept >|= Option.some in + : fd Lwt.t = + let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in + Lwt_unix.setsockopt fd SO_KEEPALIVE false; + Lwt_unix.setsockopt fd SO_REUSEPORT false; + let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in + let* () = Lwt_unix.bind fd srv_adr in + Lwt_unix.listen fd listen_backlog; + info (fun m -> m "listening on %a" pp_sockaddr srv_adr); + Lwt.return fd + +let accepts (fd : fd) : (fd * sockaddr) Lwt_stream.t = + let accept () = Lwt_unix.accept fd >>= Lwt.return_some in Lwt_stream.from accept let reader (fd : fd) : Msg.t Lwt_stream.t = @@ -81,7 +77,7 @@ let handle_client (conn_addr : sockaddr) ~(server_info : Server_info.t) ~(router : Router.t) - ~(ping_wheel : ping_wheel) + ~(ping_wheel : Connection.t Wheel.t) = info (fun m -> m "new connection %a" pp_sockaddr conn_addr); let conn : Connection.t = @@ -107,6 +103,24 @@ let handle_client (fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e)); end +let interval dt = + let tick () = + let* () = Lwt_unix.sleep dt in + Lwt.return_some () + in + Lwt_stream.from tick + +let interrupt () = + let signal, signal_waiter = Lwt.wait () in + let on_signal num = + trace (fun m -> m "caught signal %d" num); + try Lwt.wakeup signal_waiter () with + Invalid_argument _ -> failwith "unceremoniously exiting" + in + Lwt_unix.on_signal (2 (* SIGINT *)) on_signal |> ignore; + Lwt_unix.on_signal (15 (* SIGTERM *)) on_signal |> ignore; + signal + let run { port; listen_backlog; @@ -137,6 +151,13 @@ let run { info (fun m -> m "version:@ %s" server_info.version); info (fun m -> m "created:@ %s" server_info.created); + let* server : fd = + bind_server + ~port + ~listen_backlog + in + notify `ready; + let router : Router.t = Router.make ~whowas_history_len @@ -147,40 +168,40 @@ let run { ping_interval in - let on_tick () = - (* trace (fun m -> m "tick"); *) - List.iter - (fun conn -> - match Connection.on_ping conn with - | Ok () -> Wheel.add ping_wheel conn - | Error () -> Connection.close conn ~reason:"Connection timed out") - (Wheel.tick ping_wheel) + let ping conn = + match Connection.on_ping conn with + | Ok () -> Wheel.add ping_wheel conn + | Error _ -> Connection.close conn ~reason:"Connection timeout" in let pinger_promise = Lwt_stream.iter - on_tick - (Lwt_stream.from @@ fun () -> - let* () = Lwt_unix.sleep 1.0 in - Lwt.return_some ()) - in - - let on_con (fd, adr) = - handle_client fd adr - ~server_info - ~router - ~ping_wheel + (fun () -> + List.iter ping + (Wheel.tick ping_wheel)) + (interval 1.0) in let listener_promise = Lwt_stream.iter - on_con - (listener - ~port - ~listen_backlog - ~on_ready:(fun () -> notify `ready)) + (fun (fd, addr) -> + handle_client fd addr + ~server_info + ~router + ~ping_wheel) + (accepts server) in - (* TODO: graceful cleanup on ctrl-c *) + let* () = + Lwt.pick [ + listener_promise pinger_promise; + interrupt () + ] + in + notify `stopping; + + (* TODO: send QUIT to all connections *) + + info (fun m -> m "shutting down"); + Lwt_unix.close server - listener_promise <&> pinger_promise