From a79937a53217b8d451b033a49ecbe6778cd5ecd8 Mon Sep 17 00:00:00 2001 From: haskal Date: Tue, 28 Jan 2020 22:05:17 -0500 Subject: [PATCH] Add modified unix socket files and radix tree prototype --- .gitignore | 1 + private/unix-socket-ffi.rkt | 232 ++++++++++++++++++++++++++ radix-tree.rkt | 89 ++++++++++ unix-socket.rkt | 316 ++++++++++++++++++++++++++++++++++++ 4 files changed, 638 insertions(+) create mode 100644 .gitignore create mode 100644 private/unix-socket-ffi.rkt create mode 100644 radix-tree.rkt create mode 100644 unix-socket.rkt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b2f4ba5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.rkt~ diff --git a/private/unix-socket-ffi.rkt b/private/unix-socket-ffi.rkt new file mode 100644 index 0000000..9a7e2a4 --- /dev/null +++ b/private/unix-socket-ffi.rkt @@ -0,0 +1,232 @@ +;; https://github.com/racket/unix-socket +;; License: Apache 2.0/MIT +;; FFI functions, constants, and types for unix domain sockets (unsafe) +#lang racket/base +(require ffi/unsafe + ffi/unsafe/define + ffi/unsafe/port) +(provide (protect-out (all-defined-out))) + +;; platform : (U 'bsd 'linux #f) +;; Data structures and constants differ between platforms. +;; Mac OS X and the BSDs I tried seem to have consistent definitions. +(define platform + (case (system-type 'os) + [(macosx) 'bsd] + [(unix) + (define sys (path->string (system-library-subpath #f))) + (cond [(regexp-match? #rx"-linux$" sys) 'linux] + [(regexp-match? #rx"bsd$" sys) 'bsd] + [else #f])] + [else #f])) + +(define unix-socket-available? + (and platform #t)) + +;; ======================================== +;; Constants + +;; linux: bits/socket.h; bsd/macosx: sys/socket.h +(define AF-UNIX 1) +(define SOCK-STREAM 1) +(define SOCK-SEQPACKET 5) + +;; linux: sys/socket.h; bsd/macosx: sys/socket.h +(define SHUT_RD 0) +(define SHUT_WR 1) + +;; linux: asm-generic/{errno-base,errno}.h; bsd/macosx: sys/errno.h +(define EINTR 4) +(define EAGAIN (case platform [(linux) 11] [(bsd) 35])) +(define EWOULDBLOCK EAGAIN) +(define EINPROGRESS (case platform [(linux) 115] [(bsd) 36])) +(define ENOTCONN (case platform [(linux) 107] [(bsd) 57])) + +;; linux: asm-generic/fcntl.h; bsd/macosx: sys/fcntl.h +(define F_SETFL 4) +(define O_NONBLOCK (case platform [(linux) #o4000] [(bsd) 4])) + +;; linux: asm-generic/socket.h; bsd/macosx: sys/socket.h +(define SOL_SOCKET (case platform [(linux) 1] [(bsd) #xFFFF])) +(define SO_ERROR (case platform [(linux) 4] [(bsd) #x1007])) + +;; linux: sys/un.h; bsd/macosx: sys/un.h +(define UNIX-PATH-MAX (case platform [(linux) 108] [else 104])) + +;; linux: bits/sockaddr.h; bsd/macosx: sys/un.h +(define _sa_family (case platform [(linux) _ushort] [else _ubyte])) + +;; linux: bits/types.h; bsd/macosx: i386/_types.h +(define _socklen_t _uint32) + +(define-cstruct _linux_sockaddr_un + ([sun_family _sa_family] + [sun_path (make-array-type _byte UNIX-PATH-MAX)])) + +(define-cstruct _bsd_sockaddr_un + ([sun_len _ubyte] + [sun_family _sa_family] + [sun_path (make-array-type _byte UNIX-PATH-MAX)])) + +(define _sockaddr_un-pointer + (case platform + [(linux) _linux_sockaddr_un-pointer] + [(bsd) _bsd_sockaddr_un-pointer] + [else _pointer])) + +(define (make-sockaddr path-bytes) + (case platform + [(linux) + (make-linux_sockaddr_un AF-UNIX path-bytes)] + [(bsd) + (make-bsd_sockaddr_un (bytes-length path-bytes) AF-UNIX path-bytes)])) + +;; ======================================== +;; System functions + +(define-ffi-definer define-libc (ffi-lib #f) + #:default-make-fail make-not-available) + +(define-libc socket + (_fun #:save-errno 'posix + _int _int _int -> _int)) + +(define-libc connect + (_fun #:save-errno 'posix + _int _sockaddr_un-pointer _int -> _int)) + +(define-libc bind + (_fun #:save-errno 'posix + _int _sockaddr_un-pointer _int -> _int)) + +(define-libc listen + (_fun #:save-errno 'posix + _int _int -> _int)) + +(define-libc accept + (_fun #:save-errno 'posix + _int (_pointer = #f) (_pointer = #f) + -> _int)) + +(define-libc close + (_fun #:save-errno 'posix + _int -> _int)) + +(define-libc shutdown + (_fun #:save-errno 'posix + _int _int -> _int)) + +(define-libc fcntl + (_fun #:save-errno 'posix + _int _int _int -> _int)) + +(define-libc getsockopt + (_fun #:save-errno 'posix + _int _int _int (value : (_ptr io _int) = 0) (len : (_ptr io _uint32) = (ctype-sizeof _int)) + -> (result : _int) + -> (cond [(zero? result) + value] + [else + (error 'getsockopt "error~a" (errno-error-lines (saved-errno)))]))) + +(define strerror-name + (case platform + [(linux) "__xpg_strerror_r"] + [else "strerror_r"])) + +(define strerror_r + (get-ffi-obj strerror-name #f + (_fun (errno) :: + (errno : _int) + (buf : _bytes = (make-bytes 1000)) + (buf-len : _size = (bytes-length buf)) + -> _void + -> (cast buf _bytes _string/locale)) + (lambda () + (lambda (errno) #f)))) + +(define (errno-error-lines errno) + (define err (strerror_r errno)) + (format "\n errno: ~a~a" errno (if err (format "\n error: ~a" err) ""))) + + +;; ======================================== +;; Racket constants and functions + +;; indirection to support testing; see below +(define (fd->evt fd mode) + (unsafe-fd->evt fd mode #t)) + +;; ============================================================ +;; Testing + +;; The unix socket code is difficult to test completely, because there +;; are errors/conditions that the kernel may return that are +;; infeasible to deliberately provoke. So optionally replace certain +;; system calls here with mock versions just for testing. + +;; An alternative would be to use units; that would allow testing with +;; the mocked system calls without editing the source, but I don't +;; want the overhead of units :/ + +(when #f + ;; -- mock for connect returning EINPROGRESS + (let ([real-connect connect] + [real-fd->evt fd->evt]) + ;; connecting-fds : hash[nat => #t] + (define connecting-fds (make-hash)) + (set! connect + (lambda (s addr len) + (define r (real-connect s addr len)) + (cond [(zero? r) + (hash-set! connecting-fds s #t) + (saved-errno EINPROGRESS) + (eprintf "** mock connect: setting EINPROGRESS\n") + -1] + [else r]))) + (set! fd->evt + (lambda (fd kind) + (cond [(and (eq? kind 'write) + (hash-ref connecting-fds fd #f)) + (define sema (make-semaphore)) + (eprintf "** mock fd_to_sema: creating semaphore\n") + (thread (lambda () + (sleep 1) + (eprintf "** mock fd_to_sema: posting to semaphore\n") + (semaphore-post sema))) + (hash-remove! connecting-fds fd) + sema] + [else + (real-fd->evt fd kind)]))))) + +;; mock for accept returning EWOULDBLOCK/EAGAIN no longer works, +;; probably because doesn't intercept unsafe-poll-ctx-fd-wakeup +(when #f + ;; - mock for accept returning EWOULDBLOCK/EAGAIN + (let ([real-accept accept] + [real-fd->evt fd->evt]) + ;; accepting-fds : hash[nat => #t] + (define accepting-fds (make-hash)) + (set! accept + (lambda (s) + (cond [(hash-ref accepting-fds s #f) + (hash-remove! accepting-fds s) + (real-accept s)] + [else + (eprintf "** mock accept: setting EWOULDBLOCK\n") + (hash-set! accepting-fds s #t) + (saved-errno EWOULDBLOCK) + -1]))) + (set! fd->evt + (lambda (fd kind) + (cond [(and (eq? kind 'read) + (hash-ref accepting-fds fd #f)) + (define sema (make-semaphore)) + (eprintf "** mock fd_to_sema: creating semaphore\n") + (thread (lambda () + (sleep 1) + (eprintf "** mock fd_to_sema: posting to semaphore\n") + (semaphore-post sema))) + sema] + [else + (real-fd->evt fd kind)]))))) diff --git a/radix-tree.rkt b/radix-tree.rkt new file mode 100644 index 0000000..9f25af7 --- /dev/null +++ b/radix-tree.rkt @@ -0,0 +1,89 @@ +#lang racket + +(require data/bit-vector) + +(struct rt-node [edge0 edge1 data] #:transparent #:mutable) +(struct rt-edge [label target] #:transparent) + +(define (make-rt) + (rt-node #f #f #f)) + +;; Helper functions for rt-node struct access based on whether the edge is 1 or 0 +(define (rt-getter bit) (if bit rt-node-edge1 rt-node-edge0)) +(define (rt-setter bit) (if bit set-rt-node-edge1! set-rt-node-edge0!)) + +;; Bit vector functions but short +(define bv-ref bit-vector-ref) +(define bv-len bit-vector-length) +(define bv-copy bit-vector-copy) +(define bv bit-vector) + +(define (bv-common-len bv1 bv1-start bv2 bv2-start) + ;; How to avoid this bv copy?? Why is there no drop for sequences either + ;; Matthias pls + (for/sum ([b1 (in-bit-vector (bv-copy bv1 bv1-start))] + [b2 (in-bit-vector (bv-copy bv2 bv2-start))]) + #:break (not (equal? b1 b2)) + 1)) + +(define (rt-partial-iterate node key [start 0]) + (cond + [(>= start (bv-len key)) (list 'exact node)] + [else + (let* ([bit (bv-ref key start)] + [getter (rt-getter bit)] + [next-edge (getter node)] + [next-label (and next-edge (rt-edge-label next-edge))] + [next-target (and next-edge (rt-edge-target next-edge))] + [next-common-len (and next-label (bv-common-len next-label 0 key start))]) + (cond + [(and next-edge (= next-common-len (bv-len next-label))) + (rt-partial-iterate next-target key (+ start next-common-len))] + [next-edge (list 'partial node start next-edge next-common-len)] + [else (list 'no-match node start)]))])) + +(define (rt-insert! node key data) + (define (insert-node! node key start data) + (let* ([bit (bv-ref key start)] + [setter! (rt-setter bit)]) + (setter! node (rt-edge (bv-copy key start) (rt-node #f #f data))))) + (define (split-node! node key start orig-edge prefix-len data) + (let* ([bit (bv-ref key start)] + [setter! (rt-setter bit)] + [orig-label (rt-edge-label orig-edge)] + [orig-target (rt-edge-target orig-edge)] + [new-orig-edge (rt-edge (bv-copy orig-label prefix-len) orig-target)] + [new-insert-edge (rt-edge (bv-copy key (+ start prefix-len)) (rt-node #f #f data))] + [diff-bit (bv-ref key (+ start prefix-len))] + [common-node (rt-node (if diff-bit new-orig-edge new-insert-edge) + (if diff-bit new-insert-edge new-orig-edge) + #f)] + [common-edge (rt-edge (bv-copy key start (+ start prefix-len)) common-node)]) + (setter! node common-edge))) + (match (rt-partial-iterate node key) + [(list 'exact node) (set-rt-node-data! data)] + [(list 'partial node start orig-edge prefix-len) + (split-node! node key start orig-edge prefix-len data)] + [(list 'no-match node start) + (insert-node! node key start data)])) + +; (define test (make-rt)) +; (define (test-insert! x) +; (rt-insert! test (string->bit-vector x) x)) +; (test-insert! "0001") +; (test-insert! "1000") +; (test-insert! "1010") +; (test-insert! "0011") +; (test-insert! "0000") +; +; (define (rt-dump node [prefix ""]) +; (displayln (format "~a node ~a" prefix (rt-node-data node))) +; (define edge0 (rt-node-edge0 node)) +; (define edge1 (rt-node-edge1 node)) +; (when edge0 +; (displayln (format "~a edge0 ~a" prefix (bit-vector->string (rt-edge-label edge0)))) +; (rt-dump (rt-edge-target edge0) (string-append prefix " "))) +; (when edge1 +; (displayln (format "~a edge1 ~a" prefix (bit-vector->string (rt-edge-label edge1)))) +; (rt-dump (rt-edge-target edge1) (string-append prefix " ")))) +; (rt-dump test) diff --git a/unix-socket.rkt b/unix-socket.rkt new file mode 100644 index 0000000..1418e7e --- /dev/null +++ b/unix-socket.rkt @@ -0,0 +1,316 @@ +;; https://github.com/racket/unix-socket +;; License: Apache 2.0/MIT +;; Support for UNIX domain sockets. +#lang racket/base +(require racket/contract + racket/match + (rename-in ffi/unsafe (-> -->)) + ffi/unsafe/atomic + ffi/unsafe/custodian + ffi/unsafe/define + ffi/unsafe/schedule + ffi/unsafe/port + ffi/file + "private/unix-socket-ffi.rkt") +(provide unix-socket-available? + unix-socket-listener? + unix-socket-path? + unix-socket-connect + unix-socket-listen + (contract-out + [unix-socket-close-listener + (-> unix-socket-listener? any)] + [unix-socket-accept + (-> unix-socket-listener? (values input-port? output-port?))] + [unix-socket-accept-evt + (-> unix-socket-listener? evt?)])) + +(define (unix-socket-path? v) + (and (unix-socket-path->bytes v) #t)) + +(define (unix-socket-path->bytes path) + (if (path-string? path) + ;; On all platforms, normal path of up to UNIX-PATH-MAX bytes after + ;; conversion to absolute is considered valid and shall be accepted. + (let ([bstr (path->bytes (cleanse-path (path->complete-path path)))]) + (and (<= (bytes-length bstr) UNIX-PATH-MAX) bstr)) + + ;; On Linux, paths may be in so-called abstract namespace where they + ;; start with #\nul and do not have a corresponding socket file. + ;; We accept such paths only as byte strings because we don't know + ;; the correct encoding. + (and (eq? platform 'linux) + (bytes? path) + (> (bytes-length path) 0) + (<= (bytes-length path) UNIX-PATH-MAX) + (= (bytes-ref path 0) 0) + path))) + +(define (check-available who) + (unless unix-socket-available? + (error who "unix domain sockets are not supported on this platform"))) + +;; do-make-sockaddr : Symbol Path/String -> (values Sockaddr-Pointer Nat) +(define (do-make-sockaddr who path) + (when (path-string? path) + (security-guard-check-file who path '(read write))) + (define path-bytes (unix-socket-path->bytes path)) + (define sockaddr (make-sockaddr path-bytes)) + (define addrlen (+ (ctype-sizeof _ushort) (bytes-length path-bytes))) + (values sockaddr addrlen)) + +;; do-make-socket : Symbol Nat -> (values FD Cust-Reg) +;; Creates nonblocking socket, registers w/ custodian (returning registration). +;; Should be called in atomic mode. +(define (do-make-socket who mode) + (define socket-fd (socket AF-UNIX mode 0)) + (unless (positive? socket-fd) + (error who "failed to create socket~a" + (errno-error-lines (saved-errno)))) + (set-fd-nonblocking who socket-fd) + (values socket-fd (register-custodian-shutdown socket-fd close/unregister))) + +;; set-fd-nonblocking : Symbol Nat -> Void +(define (set-fd-nonblocking who fd) + (unless (zero? (fcntl fd F_SETFL O_NONBLOCK)) + (close fd) + (error who "failed to set non-blocking mode~a" + (errno-error-lines (saved-errno))))) + +;; close/unregister : Nat Cust-Reg/#f -> Void +(define (close/unregister fd [reg #f]) + (close fd) + (fd->evt fd 'remove) + (when reg (unregister-custodian-shutdown fd reg))) + +;; make-socket-ports : Symbol FD Cust-Reg/#f -> (values Input-Port Output-Port) +(define (make-socket-ports who socket-fd reg) + (with-handlers ([(lambda (e) #t) + (lambda (exn) + (close/unregister socket-fd reg) + (raise exn))]) + (define-values (in out) + (unsafe-file-descriptor->port socket-fd #"unix-socket-port" '(read write))) + ;; closing the ports closes socket-fd, so custodian no longer needs to manage directly + (when reg (unregister-custodian-shutdown socket-fd reg)) + (define fd+ports (list socket-fd in out)) + (values (wrap-input-port in fd+ports) (wrap-output-port out fd+ports)))) + +;; wrap-output-port : Output-Port (List FD Port Port) -> Output-Port +;; Wrap port, override close to shutdown write side of socket. +(define (wrap-output-port out fd+ports) + (define (close) + (when out (close-output-port out)) ;; may block, so avoid in custodian shutdown + (call-as-atomic + (lambda () + (when creg (unregister-custodian-shutdown out* creg) (set! creg #f)) + (when fd+ports (do-shutdown fd+ports #t) (set! fd+ports #f))))) + (define (get-write-evt buf start end) (write-bytes-avail-evt buf out start end)) + (define buffer-mode (make-buffer-mode-fun out)) + (define out* + (make-output-port 'unix-socket out out close #f get-write-evt #f #f void 1 buffer-mode)) + (define creg (register-custodian-shutdown out* (lambda (p) (set! out #f) (close-output-port p)))) + out*) + +;; wrap-input-port : Input-Port (List FD Port Port) -> Input-Port +(define (wrap-input-port in fd+ports) + (define (close) + (when in (close-input-port in)) + (call-as-atomic + (lambda () + (when creg (unregister-custodian-shutdown in* creg) (set! creg #f)) + (when fd+ports (do-shutdown fd+ports #f) (set! fd+ports #f))))) + (define (get-progress-evt) (port-progress-evt in)) + (define (commit k progress done) (port-commit-peeked k progress done in)) + (define buffer-mode (make-buffer-mode-fun in)) + (define in* + (make-input-port 'unix-socket in in close get-progress-evt commit #f void 1 buffer-mode)) + (define creg (register-custodian-shutdown in* (lambda (p) (set! in #f) (close-input-port p)))) + in*) + +(define (make-buffer-mode-fun port) + (case-lambda [() (file-stream-buffer-mode port)] + [(mode) (file-stream-buffer-mode port mode)])) + +;; do-shutdown : (List FD Port Port) Boolean -> Void +;; Requirements: +;; - want to shutdown RD/WR when corresponding port closed +;; - want to shutdown *after* port closed to avoid low-level errors +;; - must *not* call shutdown after *both* ports closed (fd is stale) +;; So: okay to call shutdown if either of the ports is still open. +(define (do-shutdown fd+ports output?) + (define socket-fd (car fd+ports)) + (define ports (cdr fd+ports)) + (unless (andmap port-closed? ports) + (unless (zero? (shutdown socket-fd (if output? SHUT_WR SHUT_RD))) + ;; ENOTCONN is okay; the other side may have disconnected. + (unless (= (saved-errno) ENOTCONN) + (error (if output? 'close-output-port/unix-socket 'close-input-port/unix-socket) + "error from shutdown~a" (errno-error-lines (saved-errno))))))) + +;; ============================================================ +;; Connect + +;; unix-socket-connect : Path/String [Symbol] -> (values Input-Port Output-Port) +(define (unix-socket-connect path [mode 'SOCK-STREAM]) + (check-available 'unix-socket-connect) + (define cust (current-custodian)) + (define-values (sockaddr addrlen) (do-make-sockaddr 'unix-socket-connect path)) + (define connect-k + ;; Non-blocking connect may succeed immediately or require waiting to see. + ;; - If succeeds immediately, make ports in same atomic block + ;; - If wait, must exit atomic mode to sync + ;; So we return a procedure to be applied in non-atomic mode that does + ;; whatever needs doing. + (call-as-atomic + (lambda () + (when (custodian-shut-down? cust) + (error 'unix-socket-connect "the custodian has been shut down")) + (define-values (socket-fd reg) (do-make-socket 'unix-socket-connect + (match mode + ['SOCK-STREAM SOCK-STREAM] + ['SOCK-SEQPACKET SOCK-SEQPACKET]))) + (define r (connect socket-fd sockaddr addrlen)) + (define errno (saved-errno)) + (cond [(= r 0) ;; connected + (define-values (in out) (make-socket-ports 'unix-socket-connect socket-fd reg)) + (lambda () (values in out))] + [(= errno EINPROGRESS) ;; wait and see + (define ready-evt (fd->evt socket-fd 'write)) + (lambda () ;; called in non-atomic mode! + (sync ready-evt) + (call-as-atomic + (lambda () + (when (custodian-shut-down? cust) ;; => socket-fd already closed by shutdown + (error 'unix-socket-connect "the custodian has been shut down")) + (define errno (getsockopt socket-fd SOL_SOCKET SO_ERROR)) + (cond [(= errno 0) + (make-socket-ports 'unix-socket-connect socket-fd reg)] + [else + (close/unregister socket-fd reg) + (error 'unix-socket-connect + "failed to connect socket (non-blocking)\n path: ~e~a" + path (errno-error-lines errno))]))))] + [else + (close/unregister socket-fd reg) + (error 'unix-socket-connect "failed to connect socket\n path: ~e~a" + path (errno-error-lines (saved-errno)))])))) + (connect-k)) + + +;; ============================================================ +;; Listen & Accept + +;; A Listener is (unix-socket-listener Nat/#f Cust-Reg/#f) +;; States: +;; OPEN: (unix-socket-listener Nat Cust-Reg) +;; CLOSED: (unix-socket-listener #f #f) +;; Only transition allowed is OPEN to CLOSED, must happen atomically. +(struct unix-socket-listener (fd reg) + #:mutable + #:property prop:evt + (lambda (self) + (call-as-atomic + (lambda () + (wrap-evt + ;; ready when fd is readable OR listener is closed + ;; If closed after evt creation, then fd-evt becomes ready + ;; when fd closed and fd-evt is unregistered. + (cond [(unix-socket-listener-fd self) + => (lambda (fd) (fd->evt fd 'read))] + [else always-evt]) + (lambda (r) self)))))) + +;; unix-socket-listen : Path/String [Nat] [Symbol] -> Unix-Socket-Listener +(define (unix-socket-listen path [backlog 4] [mode 'SOCK-STREAM]) + (check-available 'unix-socket-listen) + (define-values (sockaddr addrlen) (do-make-sockaddr 'unix-socket-listen path)) + (call-as-atomic + (lambda () + (define-values (socket-fd reg) (do-make-socket 'unix-socket-listen + (match mode + ['SOCK-STREAM SOCK-STREAM] + ['SOCK-SEQPACKET SOCK-SEQPACKET]))) + (unless (zero? (bind socket-fd sockaddr addrlen)) + (close/unregister socket-fd reg) + (error 'unix-socket-listen "failed to bind socket\n path: ~e~a" + path (errno-error-lines (saved-errno)))) + (unless (zero? (listen socket-fd backlog)) + (close/unregister socket-fd reg) + (error 'unix-socket-listen "failed to listen\n path: ~e~a" + path (errno-error-lines (saved-errno)))) + (define listener (unix-socket-listener socket-fd #f)) + (set-unix-socket-listener-reg! listener + (register-custodian-shutdown listener do-close-listener)) + (unregister-custodian-shutdown socket-fd reg) + listener))) + +;; unix-socket-close-listener : Listener -> Void +(define (unix-socket-close-listener l) + (call-as-atomic (lambda () (do-close-listener l #t)))) + +(define (do-close-listener l [unregister? #f]) + (define fd (unix-socket-listener-fd l)) + (define reg (unix-socket-listener-reg l)) + (when fd + (set-unix-socket-listener-fd! l #f) + (set-unix-socket-listener-reg! l #f) + (when unregister? (unregister-custodian-shutdown l reg)) + (close/unregister fd) + (void))) + +;; ---------------------------------------- + +(struct accept-evt (who listener cust) ;; <: (Evt-of (-> (list Input-Port Output-Port))) + #:property prop:evt + (unsafe-poller (lambda (self maybe-wakeups) (accept-poll self maybe-wakeups)))) + +;; unix-socket-accept : Unix-Socket-Listener -> (values Input-Port Output-Port) +(define (unix-socket-accept l) + (apply values ((sync (accept-evt 'unix-socket-accept l (current-custodian)))))) + +;; unix-socket-accept-evt : Unix-Socket-Listener -> (Evt-of (list Input-Port Output-Port)) +(define (unix-socket-accept-evt l) + (wrap-evt (accept-evt 'unix-socket-accept-evt l (current-custodian)) (lambda (r) (r)))) + +;; accept-poll : Accept-Evt (U #f Wakeups) -> (U (values List #f) (values #f Evt)) +(define (accept-poll accept-evt maybe-wakeups) + (define l (accept-evt-listener accept-evt)) + (define who (accept-evt-who accept-evt)) + (define lfd (unix-socket-listener-fd l)) + (cond [(eq? lfd #f) + (values (list (lambda () (error who "unix socket listener is closed"))) + #f)] + [(custodian-shut-down? (accept-evt-cust accept-evt)) + (values (list (lambda () (error '|unix-socket-accept-evt poll| "the custodian has been shut down"))) + #f)] + [lfd + (cond [maybe-wakeups (accept-poll/sleep who accept-evt maybe-wakeups lfd)] + [else (accept-poll/check who accept-evt lfd)])])) + +(define (accept-poll/sleep who accept-evt wakeups lfd) + ;; No need to register wakeup for custodian; custodian shutdown means a Racket thread + ;; did work, so accept-evt will get re-polled. + (unsafe-poll-ctx-fd-wakeup wakeups lfd 'read) + (values #f accept-evt)) + +(define (accept-poll/check who accept-evt lfd) + (define fd (accept lfd)) + (cond [(< fd 0) + (let ([errno (saved-errno)]) + (cond [(or (= errno EAGAIN) (= errno EWOULDBLOCK) (= errno EINTR)) + (values #f accept-evt)] + [else + (values (list (lambda () + (error who "failed to accept socket~a" + (errno-error-lines errno)))) + #f)]))] + [else + (define cust (accept-evt-cust accept-evt)) + (define r + (with-handlers ([(lambda (e) #t) + (lambda (e) (lambda () (raise e)))]) + (parameterize ((current-custodian cust)) + (define-values (in out) (make-socket-ports who fd #f)) + (lambda () (list in out))))) + (values (list r) #f)]))