user registration logic
This commit is contained in:
parent
883b37afe1
commit
150d25b1e0
2
bin/dune
2
bin/dune
|
@ -3,4 +3,4 @@
|
|||
(name main)
|
||||
(libraries
|
||||
lwt lwt.unix logs fmt
|
||||
irc_msg))
|
||||
irc_server))
|
||||
|
|
130
bin/main.ml
130
bin/main.ml
|
@ -1,126 +1,8 @@
|
|||
open Lwt.Syntax
|
||||
open Lwt.Infix
|
||||
Logs.set_level (Some Debug);
|
||||
Logs.set_reporter (Logs.format_reporter ());
|
||||
|
||||
type sockaddr = Unix.sockaddr
|
||||
type fd = Lwt_unix.file_descr
|
||||
|
||||
let pp_inet_addr = Fmt.of_to_string Unix.string_of_inet_addr
|
||||
|
||||
let pp_sock_addr ppf = function
|
||||
| Unix.ADDR_INET (adr, port) -> Fmt.pf ppf "%a:%d" pp_inet_addr adr port
|
||||
| Unix.ADDR_UNIX path -> Fmt.pf ppf "unix:%s" path
|
||||
|
||||
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
|
||||
let sock : fd Lwt.t =
|
||||
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
|
||||
Lwt_unix.setsockopt fd SO_REUSEPORT true;
|
||||
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
|
||||
let* () = Lwt_unix.bind fd srv_adr in
|
||||
Lwt_unix.listen fd backlog;
|
||||
Logs.info (fun m -> m "listening on %a" pp_sock_addr srv_adr);
|
||||
Lwt.return fd
|
||||
in
|
||||
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
|
||||
Lwt_stream.from accept
|
||||
|
||||
let reader (fd : fd) : Irc_msg.t Lwt_stream.t =
|
||||
let chunk = Buffer.create 512 in
|
||||
let rdbuf = Bytes.create 512 in
|
||||
let gets () : Irc_msg.t list option Lwt.t =
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function
|
||||
| 0 -> Lwt.return_none
|
||||
| n ->
|
||||
Buffer.add_subbytes chunk rdbuf 0 n;
|
||||
(* if Buffer.length chunk > 200_000 then panic *)
|
||||
let msgs, rest = Irc_msg.parse (Buffer.contents chunk) in
|
||||
Buffer.clear chunk;
|
||||
Buffer.add_string chunk rest;
|
||||
Lwt.return_some msgs)
|
||||
(function
|
||||
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none
|
||||
| exn -> Lwt.fail exn)
|
||||
in
|
||||
Lwt_stream.from gets |> Lwt_stream.map_list Fun.id
|
||||
|
||||
let writer (fd : fd) (obox : Irc_msg.t Lwt_stream.t) : unit Lwt.t =
|
||||
let rec writeall bs i =
|
||||
if i >= Bytes.length bs then
|
||||
Lwt.return_unit
|
||||
else
|
||||
let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in
|
||||
writeall bs (i + n)
|
||||
in
|
||||
let buf = Buffer.create 512 in
|
||||
let on_msg msg =
|
||||
Buffer.clear buf;
|
||||
Irc_msg.write buf msg;
|
||||
writeall (Buffer.to_bytes buf) 0
|
||||
in
|
||||
Lwt.catch
|
||||
(fun () -> Lwt_stream.iter_s on_msg obox)
|
||||
(function
|
||||
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit
|
||||
| exn -> Lwt.fail exn)
|
||||
|
||||
let handle_client (con_fd : fd) (con_adr : sockaddr) =
|
||||
Logs.info (fun m -> m "new connection %a" pp_sock_addr con_adr);
|
||||
let ibox, push_evt = Lwt_stream.create () in
|
||||
let obox, push_msg = Lwt_stream.create () in
|
||||
let send_evt m = push_evt (Some m) in
|
||||
let send_msg m = push_msg (Some m) in
|
||||
|
||||
let on_msg (msg : Irc_msg.t) =
|
||||
Logs.debug (fun m -> m "%a: %a" pp_sock_addr con_adr Irc_msg.pp msg);
|
||||
match msg.command, msg.params with
|
||||
| "NICK", [n] -> send_evt (`nick n)
|
||||
| "QUIT", _ -> send_evt `quit
|
||||
| c, _ -> send_evt (`invalid_cmd c)
|
||||
in
|
||||
|
||||
let on_evt = function
|
||||
| `nick n ->
|
||||
send_msg (Irc_msg.make "001" [n; "Welcome to the IRC network"])
|
||||
| `invalid_cmd c ->
|
||||
send_msg (Irc_msg.make "421" [c; "Unknown command"])
|
||||
| `quit ->
|
||||
push_evt None
|
||||
in
|
||||
|
||||
let rd = Lwt_stream.iter on_msg (reader con_fd) in
|
||||
let wr = writer con_fd obox in
|
||||
let eh = Lwt_stream.iter on_evt ibox in
|
||||
|
||||
Lwt.finalize
|
||||
(fun () -> Lwt.choose [rd; wr; eh])
|
||||
(fun () ->
|
||||
Logs.info (fun m -> m "connection closed %a" pp_sock_addr con_adr);
|
||||
Lwt_unix.close con_fd)
|
||||
|
||||
type config = {
|
||||
port : int;
|
||||
tcp_listen_backlog : int;
|
||||
}
|
||||
|
||||
let run_server (cfg : config) =
|
||||
let on_con (fd, adr) =
|
||||
Lwt.on_failure
|
||||
(handle_client fd adr)
|
||||
(fun exn -> Logs.err (fun m -> m "%a: %a" pp_sock_addr adr Fmt.exn exn))
|
||||
in
|
||||
Lwt_stream.iter
|
||||
on_con
|
||||
(listener
|
||||
~port:cfg.port
|
||||
~backlog:cfg.tcp_listen_backlog)
|
||||
|
||||
let () =
|
||||
Logs.set_level (Some Info);
|
||||
Logs.set_reporter (Logs.format_reporter ());
|
||||
Lwt_main.run @@
|
||||
run_server {
|
||||
Lwt_main.run
|
||||
(Irc_server.run {
|
||||
port = 6667;
|
||||
tcp_listen_backlog = 8;
|
||||
}
|
||||
tcp_listen_backlog = 8
|
||||
})
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
open! Import
|
||||
|
||||
type t = {
|
||||
addr : sockaddr;
|
||||
userbase : Userbase.t;
|
||||
user : Userbase.user;
|
||||
mutable regis : string option * (string * string) option;
|
||||
outbox : Irc_msg.t Lwt_stream.t;
|
||||
push_outbox : (Irc_msg.t option -> unit);
|
||||
quit : unit Lwt_condition.t;
|
||||
}
|
||||
|
||||
let make ~(userbase : Userbase.t) ~(addr : sockaddr) : t =
|
||||
let user = Userbase.make_user () in
|
||||
let regis = None, None in
|
||||
let outbox, push_outbox = Lwt_stream.create () in
|
||||
let quit = Lwt_condition.create () in
|
||||
{ addr; userbase; user; regis; outbox; push_outbox; quit }
|
||||
|
||||
let quitting t = Lwt_condition.wait t.quit
|
||||
let outbox t = t.outbox
|
||||
let send t msg = t.push_outbox (Some msg)
|
||||
|
||||
let cleanup t =
|
||||
Userbase.leave t.userbase t.user
|
||||
|
||||
(* message handlers *)
|
||||
|
||||
(* > user registration *)
|
||||
|
||||
let update_regis t nick username =
|
||||
t.regis <- (nick, username);
|
||||
match nick, username with
|
||||
| Some nick, Some _ ->
|
||||
begin match Userbase.register t.userbase ~nick ~user:t.user with
|
||||
| `inuse -> `nicknameinuse nick
|
||||
| `ok -> `ok
|
||||
end
|
||||
| _, _ ->
|
||||
(* wait for remaining credentials *)
|
||||
`ok
|
||||
|
||||
let on_nick_msg t new_nick =
|
||||
(* TODO: validate nickname string *)
|
||||
let _, username = t.regis in
|
||||
update_regis t (Some new_nick) username
|
||||
|
||||
let on_user_msg t new_username _mode =
|
||||
(* TODO: validate user string *)
|
||||
(* TODO: validate mode string *)
|
||||
match t.regis with
|
||||
| nick, None ->
|
||||
update_regis t nick (Some new_username)
|
||||
| _, Some _ ->
|
||||
`alreadyregistered
|
||||
|
||||
(* > misc *)
|
||||
|
||||
let on_quit_msg t why =
|
||||
Logs.debug (fun m -> m "%a: quit: %S" pp_sockaddr t.addr (String.concat " " why));
|
||||
Lwt_condition.broadcast t.quit ();
|
||||
`ok
|
||||
|
||||
(* message transmission *)
|
||||
|
||||
module Rpl = struct
|
||||
open Irc_msg
|
||||
let unknowncommand cmd = make "421" [cmd; "Unknown command"]
|
||||
let needmoreparams cmd = make "461" [cmd; "Not enough parameters"]
|
||||
let tryagain cmd = make "263" [cmd; "Please wait a while and try again."]
|
||||
let alreadyregistered () = make "462" ["Unauthorized command (already registered)"]
|
||||
let nicknameinuse nick = make "433" [nick; "Nickname is already in use"]
|
||||
end
|
||||
|
||||
let on_msg t (msg : Irc_msg.t) : unit =
|
||||
Logs.debug (fun m -> m "%a: %a" pp_sockaddr t.addr Irc_msg.pp msg);
|
||||
let result =
|
||||
match msg.command, msg.params with
|
||||
| "NICK", new_nick :: _ ->
|
||||
on_nick_msg t new_nick
|
||||
| "USER", uname :: modestr :: _host :: rname :: _ ->
|
||||
on_user_msg t (uname, rname) modestr
|
||||
| "QUIT", why ->
|
||||
on_quit_msg t why
|
||||
| "NICK", _ | "USER", _ -> `needmoreparams
|
||||
| _, _ -> `unknowncommand
|
||||
in
|
||||
match result with
|
||||
| `ok -> ()
|
||||
| `unknowncommand -> send t (Rpl.unknowncommand msg.command)
|
||||
| `needmoreparams -> send t (Rpl.needmoreparams msg.command)
|
||||
| `tryagain -> send t (Rpl.tryagain msg.command)
|
||||
| `alreadyregistered -> send t (Rpl.alreadyregistered ())
|
||||
| `nicknameinuse n -> send t (Rpl.nicknameinuse n)
|
|
@ -0,0 +1,8 @@
|
|||
(library
|
||||
(package talircd)
|
||||
(name irc_server)
|
||||
; (inline_tests)
|
||||
; (preprocess (pps ppx_expect ppx_deriving.show))
|
||||
(libraries
|
||||
lwt lwt.unix logs fmt
|
||||
irc_msg))
|
|
@ -0,0 +1,9 @@
|
|||
include Lwt.Syntax
|
||||
include Lwt.Infix
|
||||
|
||||
type sockaddr = Unix.sockaddr
|
||||
type fd = Lwt_unix.file_descr
|
||||
|
||||
let pp_sockaddr ppf = function
|
||||
| Unix.ADDR_INET (adr, port) -> Fmt.pf ppf "%s:%d" (Unix.string_of_inet_addr adr) port
|
||||
| Unix.ADDR_UNIX path -> Fmt.string ppf path
|
|
@ -0,0 +1,98 @@
|
|||
open! Import
|
||||
|
||||
let listener ~(port : int) ~(backlog : int) : (fd * sockaddr) Lwt_stream.t =
|
||||
let sock : fd Lwt.t =
|
||||
let fd = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
|
||||
Lwt_unix.setsockopt fd SO_KEEPALIVE false;
|
||||
Lwt_unix.setsockopt fd SO_REUSEPORT true;
|
||||
let srv_adr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
|
||||
let* () = Lwt_unix.bind fd srv_adr in
|
||||
Lwt_unix.listen fd backlog;
|
||||
Logs.info (fun m -> m "listening on %a" pp_sockaddr srv_adr);
|
||||
Lwt.return fd
|
||||
in
|
||||
let accept () = sock >>= Lwt_unix.accept >|= Option.some in
|
||||
Lwt_stream.from accept
|
||||
|
||||
let reader (fd : fd) : Irc_msg.t Lwt_stream.t =
|
||||
let chunk = Buffer.create 512 in
|
||||
let rdbuf = Bytes.create 512 in
|
||||
let gets () : Irc_msg.t list option Lwt.t =
|
||||
Lwt.catch
|
||||
(fun () ->
|
||||
Lwt_unix.read fd rdbuf 0 (Bytes.length rdbuf) >>= function
|
||||
| 0 -> Lwt.return_none
|
||||
| n ->
|
||||
Buffer.add_subbytes chunk rdbuf 0 n;
|
||||
(* if Buffer.length chunk > 200_000 then panic *)
|
||||
let msgs, rest = Irc_msg.parse (Buffer.contents chunk) in
|
||||
Buffer.clear chunk;
|
||||
Buffer.add_string chunk rest;
|
||||
Lwt.return_some msgs)
|
||||
(function
|
||||
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_none
|
||||
| exn -> Lwt.fail exn)
|
||||
in
|
||||
Lwt_stream.from gets |> Lwt_stream.map_list Fun.id
|
||||
|
||||
let writer (fd : fd) (obox : Irc_msg.t Lwt_stream.t) : unit Lwt.t =
|
||||
let rec writeall bs i =
|
||||
if i >= Bytes.length bs then
|
||||
Lwt.return_unit
|
||||
else
|
||||
let* n = Lwt_unix.write fd bs i (Bytes.length bs - i) in
|
||||
writeall bs (i + n)
|
||||
in
|
||||
let buf = Buffer.create 512 in
|
||||
let on_msg msg =
|
||||
Buffer.clear buf;
|
||||
Irc_msg.write buf msg;
|
||||
writeall (Buffer.to_bytes buf) 0
|
||||
in
|
||||
Lwt.catch
|
||||
(fun () -> Lwt_stream.iter_s on_msg obox)
|
||||
(function
|
||||
| Unix.Unix_error (ECONNRESET, _, _) -> Lwt.return_unit
|
||||
| exn -> Lwt.fail exn)
|
||||
|
||||
let handle_client cx (conn_fd : fd) (conn_addr : sockaddr) =
|
||||
let conn : Connection.t =
|
||||
Connection.make
|
||||
~addr:conn_addr
|
||||
~userbase:cx#get_userbase
|
||||
in
|
||||
let rd = Lwt_stream.iter (Connection.on_msg conn) (reader conn_fd) in
|
||||
let wr = writer conn_fd (Connection.outbox conn) in
|
||||
let qt = Connection.quitting conn in
|
||||
Lwt.finalize
|
||||
(fun () ->
|
||||
Logs.info (fun m -> m "new connection %a" pp_sockaddr conn_addr);
|
||||
Lwt.choose [rd; wr; qt])
|
||||
(fun () ->
|
||||
Logs.info (fun m -> m "connection closed %a" pp_sockaddr conn_addr);
|
||||
Connection.cleanup conn;
|
||||
Lwt_unix.close conn_fd)
|
||||
|
||||
type config = {
|
||||
port : int;
|
||||
tcp_listen_backlog : int;
|
||||
}
|
||||
|
||||
let run (cfg : config) : unit Lwt.t =
|
||||
let cx = object
|
||||
val userbase = Userbase.make ()
|
||||
method get_userbase = userbase
|
||||
end in
|
||||
|
||||
let on_con (fd, adr) =
|
||||
Lwt.on_failure
|
||||
(handle_client cx fd adr)
|
||||
(fun exn ->
|
||||
Logs.err (fun m -> m "%a: %a" pp_sockaddr adr Fmt.exn exn))
|
||||
in
|
||||
|
||||
Lwt_stream.iter
|
||||
on_con
|
||||
(listener
|
||||
~port:cfg.port
|
||||
~backlog:cfg.tcp_listen_backlog)
|
|
@ -0,0 +1,39 @@
|
|||
type privmsg = {
|
||||
pm_from : string;
|
||||
pm_text : string;
|
||||
}
|
||||
|
||||
type user = {
|
||||
mutable nick : string option;
|
||||
inbox : notif Lwt_stream.t;
|
||||
push_inbox : (notif option -> unit);
|
||||
}
|
||||
|
||||
and notif = privmsg
|
||||
|
||||
let make_user () =
|
||||
let inbox, push_inbox = Lwt_stream.create () in
|
||||
{ nick = None; inbox; push_inbox }
|
||||
|
||||
let notify u no = u.push_inbox (Some no)
|
||||
|
||||
type t = {
|
||||
users : (string, user) Hashtbl.t
|
||||
(* TODO: channels *)
|
||||
}
|
||||
|
||||
let make () =
|
||||
{ users = Hashtbl.create 4096 }
|
||||
|
||||
let register t ~nick ~user =
|
||||
if Hashtbl.mem t.users nick then
|
||||
`inuse
|
||||
else begin
|
||||
Option.iter (Hashtbl.remove t.users) user.nick;
|
||||
Hashtbl.add t.users nick user;
|
||||
user.nick <- Some nick;
|
||||
`ok
|
||||
end
|
||||
|
||||
let leave t user =
|
||||
Option.iter (Hashtbl.remove t.users) user.nick
|
Loading…
Reference in New Issue