diff --git a/lib/server/connection.ml b/lib/server/connection.ml index f6cafe5..cc22ff4 100644 --- a/lib/server/connection.ml +++ b/lib/server/connection.ml @@ -685,7 +685,8 @@ let quit t me ~reason = let close ?(reason = "Client closed") t = Option.iter (quit t ~reason) t.user; - Outbox.close t.outbox + Outbox.close t.outbox; + t.user <- None let on_msg_quit t reason = let reason = match reason with diff --git a/lib/server/server.ml b/lib/server/server.ml index d5294c7..c15d8f3 100644 --- a/lib/server/server.ml +++ b/lib/server/server.ml @@ -75,17 +75,22 @@ let handle_client ~server_info ~addr:conn_addr in + Wheel.add ping_wheel conn; let reader = Lwt_stream.iter (Connection.on_msg conn) (reader conn_fd) in let writer = writer conn_fd (Outbox.stream (Connection.outbox conn)) in - let close () = Connection.close conn in - Lwt.on_termination reader close; - Lwt.on_termination writer close; - Wheel.add ping_wheel conn; - Lwt.finalize - (fun () -> writer) - (fun () -> - Lwt_unix.close conn_fd >|= fun () -> - info (fun m -> m "connection closed %a" pp_sockaddr conn_addr)) + let both = + Lwt.finalize + (fun () -> reader <&> writer) + (fun () -> Lwt_unix.close conn_fd) + in + begin + Lwt.on_termination reader (fun () -> Connection.close conn); + Lwt.on_termination writer (fun () -> Connection.close conn); + Lwt.on_termination both + (fun () -> info (fun m -> m "connection closed %a" pp_sockaddr conn_addr)); + Lwt.on_failure both + (fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e)); + end type config = { port : int; @@ -130,13 +135,10 @@ let run (cfg : config) : unit Lwt.t = in let on_con (fd, adr) = - Lwt.on_failure - (handle_client fd adr - ~server_info - ~router - ~ping_wheel) - (fun exn -> - error (fun m -> m "%a:@ %a" pp_sockaddr adr Fmt.exn exn)) + handle_client fd adr + ~server_info + ~router + ~ping_wheel in let listener_promise =