implement periodic server->client pings, timeouts

This commit is contained in:
tali 2024-01-30 22:29:51 -05:00
parent 277af4202f
commit b5bec6c601
4 changed files with 116 additions and 28 deletions

View File

@ -5,6 +5,7 @@ Lwt_main.run
(Server.run { (Server.run {
port = 6667; port = 6667;
tcp_listen_backlog = 8; tcp_listen_backlog = 8;
ping_interval = 60;
hostname = "irc.tali.software"; hostname = "irc.tali.software";
(* TODO: motd *) (* TODO: motd *)
}) })

View File

@ -11,11 +11,22 @@ type t = {
server_info : Server_info.t; server_info : Server_info.t;
addr : sockaddr; addr : sockaddr;
outbox : Outbox.t; outbox : Outbox.t;
mutable activity : activity_state;
mutable user : User.t option; mutable user : User.t option;
mutable pending_nick : name option; mutable pending_nick : name option;
mutable pending_userinfo : userinfo option; mutable pending_userinfo : userinfo option;
} }
and activity_state =
(* enters this state whenever the client interacts with the server *)
| Active
(* enters this state after the ping interval has elapsed *)
| Inactive
(* enters this after the ping interval has elapsed again. the client
must respond with a PONG within the next interval or else the connection
will be closed *)
| Pinged of string
let make ~router ~server_info ~addr = { let make ~router ~server_info ~addr = {
router; router;
server_info; server_info;
@ -24,6 +35,7 @@ let make ~router ~server_info ~addr = {
user = None; user = None;
pending_nick = None; pending_nick = None;
pending_userinfo = None; pending_userinfo = None;
activity = Active;
} }
let outbox t = t.outbox let outbox t = t.outbox
@ -541,21 +553,6 @@ let on_msg_motd t =
motd t; motd t;
Ok () Ok ()
let on_msg_ping t token =
let* _me = require_registered t in
match token with
| None -> Ok ()
| Some token ->
let prefix = Server_info.prefix t.server_info in
Outbox.send t.outbox
(Msg.make ~prefix "PONG" [t.server_info.hostname; token]
~always_trailing:true);
Ok ()
let on_msg_pong t _token =
let* _me = require_registered t in
Ok ()
let welcome t me = let welcome t me =
let whoami = Msg.prefix_string (User.prefix me) in let whoami = Msg.prefix_string (User.prefix me) in
let s_hostname = t.server_info.hostname in let s_hostname = t.server_info.hostname in
@ -658,6 +655,59 @@ let on_msg_user t username realname =
attempt_to_register t attempt_to_register t
(* ping *)
let on_msg_ping t token =
let* _me = require_registered t in
match token with
| None -> Ok ()
| Some token ->
let prefix = Server_info.prefix t.server_info in
Outbox.send t.outbox
(Msg.make ~prefix "PONG" [t.server_info.hostname; token]
~always_trailing:true);
Ok ()
let on_msg_pong t token =
let* _me = require_registered t in
match t.activity with
| Active | Inactive ->
trace (fun m -> m "%a:@ ignored pong" pp_sockaddr t.addr);
Ok ()
| Pinged token' ->
if token <> Some token' then
debug (fun m -> m "%a:@ got weird PONG token" pp_sockaddr t.addr);
Ok ()
let on_ping t =
match t.activity with
| _ when Outbox.is_closed t.outbox ->
trace (fun m -> m "%a:@ connection stale" pp_sockaddr t.addr);
Error ()
| Active ->
trace (fun m -> m "%a:@ inactive" pp_sockaddr t.addr);
t.activity <- Inactive;
Ok ()
| Inactive ->
let token = "meow" in
trace (fun m -> m "%a:@ ping %S" pp_sockaddr t.addr token);
t.activity <- Pinged token;
if Option.is_some t.user then
begin
let prefix = Server_info.prefix t.server_info in
Outbox.send t.outbox
(Msg.make ~prefix "PING" [token]
~always_trailing:true);
end;
Ok ()
| Pinged _ ->
debug (fun m -> m "%a:@ timed out" pp_sockaddr t.addr);
Error ()
(* message parsing *) (* message parsing *)
let concat_args = function let concat_args = function
@ -721,11 +771,12 @@ let on_msg t (msg : Msg.t) : unit =
let results = let results =
List.map List.map
(fun cmd -> (fun cmd ->
trace (fun m -> m "@[%a:@ %a@]" pp_sockaddr t.addr pp_args cmd); trace (fun m -> m "%a@ ->@ %a" pp_sockaddr t.addr pp_args cmd);
dispatch t cmd) dispatch t cmd)
(split_command_params (split_command_params
msg.command msg.command
msg.params) msg.params)
in in
List.iter (reply t) List.iter (reply t)
(List.flat_map list_of_errors results) (List.flat_map list_of_errors results);
t.activity <- Active

View File

@ -15,6 +15,7 @@ let make () =
let stream t = t.stream let stream t = t.stream
let send t msg = try t.push (Some msg) with Lwt_stream.Closed -> () let send t msg = try t.push (Some msg) with Lwt_stream.Closed -> ()
let close t = try t.push None with Lwt_stream.Closed -> () let close t = try t.push None with Lwt_stream.Closed -> ()
let is_closed t = Lwt_stream.is_closed t.stream
module Bcc = struct module Bcc = struct
(** this module is used to send a message to a number of outboxes at once without sending (** this module is used to send a message to a number of outboxes at once without sending

View File

@ -4,6 +4,8 @@ open Lwt.Infix
include (val Logging.sublogs logger "Server") include (val Logging.sublogs logger "Server")
type ping_wheel = Connection.t Wheel.t
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t = let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
let sock : fd Lwt.t = let sock : fd Lwt.t =
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
@ -62,8 +64,9 @@ let writer (fd : fd) (obox : Msg.t Lwt_stream.t) : unit Lwt.t =
let handle_client let handle_client
(conn_fd : fd) (conn_fd : fd)
(conn_addr : sockaddr) (conn_addr : sockaddr)
~(router : Router.t)
~(server_info : Server_info.t) ~(server_info : Server_info.t)
~(router : Router.t)
~(ping_wheel : ping_wheel)
= =
info (fun m -> m "new connection %a" pp_sockaddr conn_addr); info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
let conn : Connection.t = let conn : Connection.t =
@ -77,6 +80,7 @@ let handle_client
let close () = Connection.close conn in let close () = Connection.close conn in
Lwt.on_termination reader close; Lwt.on_termination reader close;
Lwt.on_termination writer close; Lwt.on_termination writer close;
Wheel.add ping_wheel conn;
Lwt.finalize Lwt.finalize
(fun () -> writer) (fun () -> writer)
(fun () -> (fun () ->
@ -86,6 +90,7 @@ let handle_client
type config = { type config = {
port : int; port : int;
tcp_listen_backlog : int; tcp_listen_backlog : int;
ping_interval : int;
hostname : string; hostname : string;
(* TODO: motd *) (* TODO: motd *)
} }
@ -101,15 +106,45 @@ let run (cfg : config) : unit Lwt.t =
Router.make () Router.make ()
in in
let on_con (fd, adr) = let ping_wheel : _ Wheel.t =
Lwt.on_failure Wheel.make
(handle_client fd adr ~router ~server_info) cfg.ping_interval
(fun exn ->
error (fun m -> m "%a: %a" pp_sockaddr adr Fmt.exn exn))
in in
Lwt_stream.iter let on_tick () =
on_con (* trace (fun m -> m "tick"); *)
(listener List.iter
~port:cfg.port (fun conn ->
~backlog:cfg.tcp_listen_backlog) match Connection.on_ping conn with
| Ok () -> Wheel.add ping_wheel conn
| Error () -> Connection.close conn ~reason:"Connection timed out")
(Wheel.tick ping_wheel)
in
let pinger_promise =
Lwt_stream.iter
on_tick
(Lwt_stream.from @@ fun () ->
let* () = Lwt_unix.sleep 1.0 in
Lwt.return_some ())
in
let on_con (fd, adr) =
Lwt.on_failure
(handle_client fd adr
~server_info
~router
~ping_wheel)
(fun exn ->
error (fun m -> m "%a:@ %a" pp_sockaddr adr Fmt.exn exn))
in
let listener_promise =
Lwt_stream.iter
on_con
(listener
~port:cfg.port
~backlog:cfg.tcp_listen_backlog)
in
listener_promise <&> pinger_promise