open! Import open Lwt.Syntax open Lwt.Infix include (val Logging.sublogs logger "Server") let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t = let sock : fd Lwt.t = let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in Lwt_unix.setsockopt fd SO_KEEPALIVE false; Lwt_unix.setsockopt fd SO_REUSEPORT true; let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in let* () = Lwt_unix.bind fd srv_adr in Lwt_unix.listen fd backlog; info (fun m -> m "listening on %a" pp_sockaddr srv_adr); Lwt.return fd in let accept () = sock >>= Lwt_unix.accept >|= Option.some in Lwt_stream.from accept let reader (fd : fd) : Msg.t Lwt_stream.t = let chunk = Buffer.create 512 in let rdbuf = Bytes.create 512 in let gets () : Msg.t list option Lwt.t = Lwt.catch (fun () -> Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function | 0 -> Lwt.return_none | n -> Buffer.add_subbytes chunk rdbuf 0 n; (* if Buffer.length chunk > 200_000 then panic *) let msgs, rest = Msg.parse (Buffer.contents chunk) in Buffer.clear chunk; Buffer.add_string chunk rest; Lwt.return_some msgs) (function | Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none | exn -> Lwt.fail exn) in Lwt_stream.from gets |> Lwt_stream.map_list Fun.id let writer (fd : fd) (obox : Msg.t Lwt_stream.t) : unit Lwt.t = let rec writeall bs i = if i >= Bytes.length bs then Lwt.return_unit else let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in writeall bs (i + n) in let buf = Buffer.create 512 in let on_msg msg = Buffer.clear buf; Msg.write buf msg; writeall (Buffer.to_bytes buf) 0 in Lwt.catch (fun () -> Lwt_stream.iter_s on_msg obox) (function | Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit | exn -> Lwt.fail exn) let handle_client (conn_fd : fd) (conn_addr : sockaddr) ~(router : Router.t) ~(server_info : Server_info.t) = info (fun m -> m "new connection %a" pp_sockaddr conn_addr); let conn : Connection.t = Connection.make ~router ~server_info ~addr:conn_addr in let reader = Lwt_stream.iter (Connection.on_msg conn) (reader conn_fd) in let writer = writer conn_fd (Outbox.stream (Connection.outbox conn)) in let close () = Connection.close conn in Lwt.on_termination reader close; Lwt.on_termination writer close; Lwt.finalize (fun () -> writer) (fun () -> Lwt_unix.close conn_fd >|= fun () -> info (fun m -> m "connection closed %a" pp_sockaddr conn_addr)) type config = { port : int; tcp_listen_backlog : int; hostname : string; (* TODO: motd *) } let run (cfg : config) : unit Lwt.t = let server_info = Server_info.make ~hostname:cfg.hostname (* ~motd *) in let router : Router.t = Router.make () in let on_con (fd, adr) = Lwt.on_failure (handle_client fd adr ~router ~server_info) (fun exn -> error (fun m -> m "%a: %a" pp_sockaddr adr Fmt.exn exn)) in Lwt_stream.iter on_con (listener ~port:cfg.port ~backlog:cfg.tcp_listen_backlog)