basic tcp server with irc msg send/recv

This commit is contained in:
tali 2024-01-07 13:00:34 -05:00
parent 4e43ecd20b
commit 883b37afe1
2 changed files with 127 additions and 2 deletions

View File

@ -2,4 +2,5 @@
(public_name talircd)
(name main)
(libraries
))
lwt lwt.unix logs fmt
irc_msg))

View File

@ -1,2 +1,126 @@
open Lwt.Syntax
open Lwt.Infix
type sockaddr = Unix.sockaddr
type fd = Lwt_unix.file_descr
let pp_inet_addr = Fmt.of_to_string Unix.string_of_inet_addr
let pp_sock_addr ppf = function
| Unix.ADDR_INET (adr, port) -> Fmt.pf ppf "%a:%d" pp_inet_addr adr port
| Unix.ADDR_UNIX path -> Fmt.pf ppf "unix:%s" path
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
let sock : fd Lwt.t =
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
Lwt_unix.setsockopt fd SO_REUSEPORT true;
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let* () = Lwt_unix.bind fd srv_adr in
Lwt_unix.listen fd backlog;
Logs.info (fun m -> m "listening on %a" pp_sock_addr srv_adr);
Lwt.return fd
in
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
Lwt_stream.from accept
let reader (fd : fd) : Irc_msg.t Lwt_stream.t =
let chunk = Buffer.create 512 in
let rdbuf = Bytes.create 512 in
let gets () : Irc_msg.t list option Lwt.t =
Lwt.catch
(fun () ->
Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function
| 0 -> Lwt.return_none
| n ->
Buffer.add_subbytes chunk rdbuf 0 n;
(* if Buffer.length chunk > 200_000 then panic *)
let msgs, rest = Irc_msg.parse (Buffer.contents chunk) in
Buffer.clear chunk;
Buffer.add_string chunk rest;
Lwt.return_some msgs)
(function
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none
| exn -> Lwt.fail exn)
in
Lwt_stream.from gets |> Lwt_stream.map_list Fun.id
let writer (fd : fd) (obox : Irc_msg.t Lwt_stream.t) : unit Lwt.t =
let rec writeall bs i =
if i >= Bytes.length bs then
Lwt.return_unit
else
let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in
writeall bs (i + n)
in
let buf = Buffer.create 512 in
let on_msg msg =
Buffer.clear buf;
Irc_msg.write buf msg;
writeall (Buffer.to_bytes buf) 0
in
Lwt.catch
(fun () -> Lwt_stream.iter_s on_msg obox)
(function
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit
| exn -> Lwt.fail exn)
let handle_client (con_fd : fd) (con_adr : sockaddr) =
Logs.info (fun m -> m "new connection %a" pp_sock_addr con_adr);
let ibox, push_evt = Lwt_stream.create () in
let obox, push_msg = Lwt_stream.create () in
let send_evt m = push_evt (Some m) in
let send_msg m = push_msg (Some m) in
let on_msg (msg : Irc_msg.t) =
Logs.debug (fun m -> m "%a: %a" pp_sock_addr con_adr Irc_msg.pp msg);
match msg.command, msg.params with
| "NICK", [n] -> send_evt (`nick n)
| "QUIT", _ -> send_evt `quit
| c, _ -> send_evt (`invalid_cmd c)
in
let on_evt = function
| `nick n ->
send_msg (Irc_msg.make "001" [n; "Welcome to the IRC network"])
| `invalid_cmd c ->
send_msg (Irc_msg.make "421" [c; "Unknown command"])
| `quit ->
push_evt None
in
let rd = Lwt_stream.iter on_msg (reader con_fd) in
let wr = writer con_fd obox in
let eh = Lwt_stream.iter on_evt ibox in
Lwt.finalize
(fun () -> Lwt.choose [rd; wr; eh])
(fun () ->
Logs.info (fun m -> m "connection closed %a" pp_sock_addr con_adr);
Lwt_unix.close con_fd)
type config = {
port : int;
tcp_listen_backlog : int;
}
let run_server (cfg : config) =
let on_con (fd, adr) =
Lwt.on_failure
(handle_client fd adr)
(fun exn -> Logs.err (fun m -> m "%a: %a" pp_sock_addr adr Fmt.exn exn))
in
Lwt_stream.iter
on_con
(listener
~port:cfg.port
~backlog:cfg.tcp_listen_backlog)
let () =
print_endline "hello world"
Logs.set_level (Some Info);
Logs.set_reporter (Logs.format_reporter ());
Lwt_main.run @@
run_server {
port = 6667;
tcp_listen_backlog = 8;
}