talircd/lib/server/server.ml

153 lines
4.0 KiB
OCaml
Raw Normal View History

2024-01-07 20:54:39 +00:00
open! Import
2024-01-11 04:26:27 +00:00
open Lwt.Syntax
open Lwt.Infix
2024-01-07 20:54:39 +00:00
2024-01-12 02:49:48 +00:00
include (val Logging.sublogs logger "Server")
type ping_wheel = Connection.t Wheel.t
2024-01-07 20:54:39 +00:00
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
let sock : fd Lwt.t =
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
Lwt_unix.setsockopt fd SO_REUSEPORT true;
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let* () = Lwt_unix.bind fd srv_adr in
Lwt_unix.listen fd backlog;
2024-01-12 02:49:48 +00:00
info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
2024-01-07 20:54:39 +00:00
Lwt.return fd
in
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
Lwt_stream.from accept
2024-01-23 19:23:45 +00:00
let reader (fd : fd) : Msg.t Lwt_stream.t =
2024-01-07 20:54:39 +00:00
let chunk = Buffer.create 512 in
let rdbuf = Bytes.create 512 in
2024-01-23 19:23:45 +00:00
let gets () : Msg.t list option Lwt.t =
2024-01-07 20:54:39 +00:00
Lwt.catch
(fun () ->
Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function
| 0 -> Lwt.return_none
| n ->
Buffer.add_subbytes chunk rdbuf 0 n;
(* if Buffer.length chunk > 200_000 then panic *)
2024-01-23 19:23:45 +00:00
let msgs, rest = Msg.parse (Buffer.contents chunk) in
2024-01-07 20:54:39 +00:00
Buffer.clear chunk;
Buffer.add_string chunk rest;
Lwt.return_some msgs)
(function
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none
| exn -> Lwt.fail exn)
in
Lwt_stream.from gets |> Lwt_stream.map_list Fun.id
2024-01-23 19:23:45 +00:00
let writer (fd : fd) (obox : Msg.t Lwt_stream.t) : unit Lwt.t =
2024-01-07 20:54:39 +00:00
let rec writeall bs i =
if i >= Bytes.length bs then
Lwt.return_unit
else
let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in
writeall bs (i + n)
in
let buf = Buffer.create 512 in
let on_msg msg =
Buffer.clear buf;
2024-01-23 19:23:45 +00:00
Msg.write buf msg;
2024-01-07 20:54:39 +00:00
writeall (Buffer.to_bytes buf) 0
in
Lwt.catch
(fun () -> Lwt_stream.iter_s on_msg obox)
(function
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit
| exn -> Lwt.fail exn)
let handle_client
(conn_fd : fd)
(conn_addr : sockaddr)
~(server_info : Server_info.t)
~(router : Router.t)
~(ping_wheel : ping_wheel)
=
2024-01-12 02:49:48 +00:00
info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
2024-01-07 20:54:39 +00:00
let conn : Connection.t =
Connection.make
2024-01-08 03:28:31 +00:00
~router
~server_info
2024-01-07 20:54:39 +00:00
~addr:conn_addr
in
Wheel.add ping_wheel conn;
let reader = Lwt_stream.iter (Connection.on_msg conn) (reader conn_fd) in
let writer = writer conn_fd (Outbox.stream (Connection.outbox conn)) in
let both =
Lwt.finalize
(fun () -> reader <&> writer)
(fun () -> Lwt_unix.close conn_fd)
in
begin
Lwt.on_termination reader (fun () -> Connection.close conn);
Lwt.on_termination writer (fun () -> Connection.close conn);
Lwt.on_termination both
(fun () -> info (fun m -> m "connection closed %a" pp_sockaddr conn_addr));
Lwt.on_failure both
(fun e -> error (fun m -> m "%a:@ %a" pp_sockaddr conn_addr Fmt.exn e));
end
2024-01-07 20:54:39 +00:00
type config = {
port : int;
tcp_listen_backlog : int;
ping_interval : int;
hostname : string;
(* TODO: motd *)
2024-01-07 20:54:39 +00:00
}
let run (cfg : config) : unit Lwt.t =
let server_info =
Server_info.make
~hostname:cfg.hostname
(* ~motd *)
in
2024-01-08 03:28:31 +00:00
let router : Router.t =
Router.make ()
in
2024-01-07 20:54:39 +00:00
let ping_wheel : _ Wheel.t =
Wheel.make
cfg.ping_interval
in
let on_tick () =
(* trace (fun m -> m "tick"); *)
List.iter
(fun conn ->
match Connection.on_ping conn with
| Ok () -> Wheel.add ping_wheel conn
| Error () -> Connection.close conn ~reason:"Connection timed out")
(Wheel.tick ping_wheel)
in
let pinger_promise =
Lwt_stream.iter
on_tick
(Lwt_stream.from @@ fun () ->
let* () = Lwt_unix.sleep 1.0 in
Lwt.return_some ())
in
2024-01-07 20:54:39 +00:00
let on_con (fd, adr) =
handle_client fd adr
~server_info
~router
~ping_wheel
in
let listener_promise =
Lwt_stream.iter
on_con
(listener
~port:cfg.port
~backlog:cfg.tcp_listen_backlog)
2024-01-07 20:54:39 +00:00
in
listener_promise <&> pinger_promise