100 lines
3.0 KiB
OCaml
100 lines
3.0 KiB
OCaml
open! Import
|
|
open Lwt.Syntax
|
|
open Lwt.Infix
|
|
|
|
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
|
|
let sock : fd Lwt.t =
|
|
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
|
|
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
|
|
Lwt_unix.setsockopt fd SO_REUSEPORT true;
|
|
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
|
|
let* () = Lwt_unix.bind fd srv_adr in
|
|
Lwt_unix.listen fd backlog;
|
|
Logs.info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
|
|
Lwt.return fd
|
|
in
|
|
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
|
|
Lwt_stream.from accept
|
|
|
|
let reader (fd : fd) : Irc.Msg.t Lwt_stream.t =
|
|
let chunk = Buffer.create 512 in
|
|
let rdbuf = Bytes.create 512 in
|
|
let gets () : Irc.Msg.t list option Lwt.t =
|
|
Lwt.catch
|
|
(fun () ->
|
|
Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function
|
|
| 0 -> Lwt.return_none
|
|
| n ->
|
|
Buffer.add_subbytes chunk rdbuf 0 n;
|
|
(* if Buffer.length chunk > 200_000 then panic *)
|
|
let msgs, rest = Irc.Msg.parse (Buffer.contents chunk) in
|
|
Buffer.clear chunk;
|
|
Buffer.add_string chunk rest;
|
|
Lwt.return_some msgs)
|
|
(function
|
|
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none
|
|
| exn -> Lwt.fail exn)
|
|
in
|
|
Lwt_stream.from gets |> Lwt_stream.map_list Fun.id
|
|
|
|
let writer (fd : fd) (obox : Irc.Msg.t Lwt_stream.t) : unit Lwt.t =
|
|
let rec writeall bs i =
|
|
if i >= Bytes.length bs then
|
|
Lwt.return_unit
|
|
else
|
|
let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in
|
|
writeall bs (i + n)
|
|
in
|
|
let buf = Buffer.create 512 in
|
|
let on_msg msg =
|
|
Buffer.clear buf;
|
|
Irc.Msg.write buf msg;
|
|
writeall (Buffer.to_bytes buf) 0
|
|
in
|
|
Lwt.catch
|
|
(fun () -> Lwt_stream.iter_s on_msg obox)
|
|
(function
|
|
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit
|
|
| exn -> Lwt.fail exn)
|
|
|
|
let handle_client (router : Router.t) (conn_fd : fd) (conn_addr : sockaddr) =
|
|
Logs.info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
|
|
let conn : Connection.t =
|
|
Connection.make
|
|
~router
|
|
~addr:conn_addr
|
|
in
|
|
let reader = Lwt_stream.iter (Connection.on_msg conn) (reader conn_fd) in
|
|
let writer = writer conn_fd (Outbox.stream (Connection.outbox conn)) in
|
|
let shutdown () = Connection.shutdown conn in
|
|
Lwt.on_termination reader shutdown;
|
|
Lwt.on_termination writer shutdown;
|
|
Lwt.finalize
|
|
(fun () -> writer)
|
|
(fun () ->
|
|
Lwt_unix.close conn_fd >|= fun () ->
|
|
Logs.info (fun m -> m "connection closed %a" pp_sockaddr conn_addr))
|
|
|
|
type config = {
|
|
port : int;
|
|
tcp_listen_backlog : int;
|
|
}
|
|
|
|
let run (cfg : config) : unit Lwt.t =
|
|
let router : Router.t =
|
|
Router.make ()
|
|
in
|
|
|
|
let on_con (fd, adr) =
|
|
Lwt.on_failure
|
|
(handle_client router fd adr)
|
|
(fun exn ->
|
|
Logs.err (fun m -> m "%a: %a" pp_sockaddr adr Fmt.exn exn))
|
|
in
|
|
|
|
Lwt_stream.iter
|
|
on_con
|
|
(listener
|
|
~port:cfg.port
|
|
~backlog:cfg.tcp_listen_backlog)
|