Add modified unix socket files and radix tree prototype
This commit is contained in:
commit
a79937a532
|
@ -0,0 +1 @@
|
||||||
|
*.rkt~
|
|
@ -0,0 +1,232 @@
|
||||||
|
;; https://github.com/racket/unix-socket
|
||||||
|
;; License: Apache 2.0/MIT
|
||||||
|
;; FFI functions, constants, and types for unix domain sockets (unsafe)
|
||||||
|
#lang racket/base
|
||||||
|
(require ffi/unsafe
|
||||||
|
ffi/unsafe/define
|
||||||
|
ffi/unsafe/port)
|
||||||
|
(provide (protect-out (all-defined-out)))
|
||||||
|
|
||||||
|
;; platform : (U 'bsd 'linux #f)
|
||||||
|
;; Data structures and constants differ between platforms.
|
||||||
|
;; Mac OS X and the BSDs I tried seem to have consistent definitions.
|
||||||
|
(define platform
|
||||||
|
(case (system-type 'os)
|
||||||
|
[(macosx) 'bsd]
|
||||||
|
[(unix)
|
||||||
|
(define sys (path->string (system-library-subpath #f)))
|
||||||
|
(cond [(regexp-match? #rx"-linux$" sys) 'linux]
|
||||||
|
[(regexp-match? #rx"bsd$" sys) 'bsd]
|
||||||
|
[else #f])]
|
||||||
|
[else #f]))
|
||||||
|
|
||||||
|
(define unix-socket-available?
|
||||||
|
(and platform #t))
|
||||||
|
|
||||||
|
;; ========================================
|
||||||
|
;; Constants
|
||||||
|
|
||||||
|
;; linux: bits/socket.h; bsd/macosx: sys/socket.h
|
||||||
|
(define AF-UNIX 1)
|
||||||
|
(define SOCK-STREAM 1)
|
||||||
|
(define SOCK-SEQPACKET 5)
|
||||||
|
|
||||||
|
;; linux: sys/socket.h; bsd/macosx: sys/socket.h
|
||||||
|
(define SHUT_RD 0)
|
||||||
|
(define SHUT_WR 1)
|
||||||
|
|
||||||
|
;; linux: asm-generic/{errno-base,errno}.h; bsd/macosx: sys/errno.h
|
||||||
|
(define EINTR 4)
|
||||||
|
(define EAGAIN (case platform [(linux) 11] [(bsd) 35]))
|
||||||
|
(define EWOULDBLOCK EAGAIN)
|
||||||
|
(define EINPROGRESS (case platform [(linux) 115] [(bsd) 36]))
|
||||||
|
(define ENOTCONN (case platform [(linux) 107] [(bsd) 57]))
|
||||||
|
|
||||||
|
;; linux: asm-generic/fcntl.h; bsd/macosx: sys/fcntl.h
|
||||||
|
(define F_SETFL 4)
|
||||||
|
(define O_NONBLOCK (case platform [(linux) #o4000] [(bsd) 4]))
|
||||||
|
|
||||||
|
;; linux: asm-generic/socket.h; bsd/macosx: sys/socket.h
|
||||||
|
(define SOL_SOCKET (case platform [(linux) 1] [(bsd) #xFFFF]))
|
||||||
|
(define SO_ERROR (case platform [(linux) 4] [(bsd) #x1007]))
|
||||||
|
|
||||||
|
;; linux: sys/un.h; bsd/macosx: sys/un.h
|
||||||
|
(define UNIX-PATH-MAX (case platform [(linux) 108] [else 104]))
|
||||||
|
|
||||||
|
;; linux: bits/sockaddr.h; bsd/macosx: sys/un.h
|
||||||
|
(define _sa_family (case platform [(linux) _ushort] [else _ubyte]))
|
||||||
|
|
||||||
|
;; linux: bits/types.h; bsd/macosx: i386/_types.h
|
||||||
|
(define _socklen_t _uint32)
|
||||||
|
|
||||||
|
(define-cstruct _linux_sockaddr_un
|
||||||
|
([sun_family _sa_family]
|
||||||
|
[sun_path (make-array-type _byte UNIX-PATH-MAX)]))
|
||||||
|
|
||||||
|
(define-cstruct _bsd_sockaddr_un
|
||||||
|
([sun_len _ubyte]
|
||||||
|
[sun_family _sa_family]
|
||||||
|
[sun_path (make-array-type _byte UNIX-PATH-MAX)]))
|
||||||
|
|
||||||
|
(define _sockaddr_un-pointer
|
||||||
|
(case platform
|
||||||
|
[(linux) _linux_sockaddr_un-pointer]
|
||||||
|
[(bsd) _bsd_sockaddr_un-pointer]
|
||||||
|
[else _pointer]))
|
||||||
|
|
||||||
|
(define (make-sockaddr path-bytes)
|
||||||
|
(case platform
|
||||||
|
[(linux)
|
||||||
|
(make-linux_sockaddr_un AF-UNIX path-bytes)]
|
||||||
|
[(bsd)
|
||||||
|
(make-bsd_sockaddr_un (bytes-length path-bytes) AF-UNIX path-bytes)]))
|
||||||
|
|
||||||
|
;; ========================================
|
||||||
|
;; System functions
|
||||||
|
|
||||||
|
(define-ffi-definer define-libc (ffi-lib #f)
|
||||||
|
#:default-make-fail make-not-available)
|
||||||
|
|
||||||
|
(define-libc socket
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _int _int -> _int))
|
||||||
|
|
||||||
|
(define-libc connect
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _sockaddr_un-pointer _int -> _int))
|
||||||
|
|
||||||
|
(define-libc bind
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _sockaddr_un-pointer _int -> _int))
|
||||||
|
|
||||||
|
(define-libc listen
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _int -> _int))
|
||||||
|
|
||||||
|
(define-libc accept
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int (_pointer = #f) (_pointer = #f)
|
||||||
|
-> _int))
|
||||||
|
|
||||||
|
(define-libc close
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int -> _int))
|
||||||
|
|
||||||
|
(define-libc shutdown
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _int -> _int))
|
||||||
|
|
||||||
|
(define-libc fcntl
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _int _int -> _int))
|
||||||
|
|
||||||
|
(define-libc getsockopt
|
||||||
|
(_fun #:save-errno 'posix
|
||||||
|
_int _int _int (value : (_ptr io _int) = 0) (len : (_ptr io _uint32) = (ctype-sizeof _int))
|
||||||
|
-> (result : _int)
|
||||||
|
-> (cond [(zero? result)
|
||||||
|
value]
|
||||||
|
[else
|
||||||
|
(error 'getsockopt "error~a" (errno-error-lines (saved-errno)))])))
|
||||||
|
|
||||||
|
(define strerror-name
|
||||||
|
(case platform
|
||||||
|
[(linux) "__xpg_strerror_r"]
|
||||||
|
[else "strerror_r"]))
|
||||||
|
|
||||||
|
(define strerror_r
|
||||||
|
(get-ffi-obj strerror-name #f
|
||||||
|
(_fun (errno) ::
|
||||||
|
(errno : _int)
|
||||||
|
(buf : _bytes = (make-bytes 1000))
|
||||||
|
(buf-len : _size = (bytes-length buf))
|
||||||
|
-> _void
|
||||||
|
-> (cast buf _bytes _string/locale))
|
||||||
|
(lambda ()
|
||||||
|
(lambda (errno) #f))))
|
||||||
|
|
||||||
|
(define (errno-error-lines errno)
|
||||||
|
(define err (strerror_r errno))
|
||||||
|
(format "\n errno: ~a~a" errno (if err (format "\n error: ~a" err) "")))
|
||||||
|
|
||||||
|
|
||||||
|
;; ========================================
|
||||||
|
;; Racket constants and functions
|
||||||
|
|
||||||
|
;; indirection to support testing; see below
|
||||||
|
(define (fd->evt fd mode)
|
||||||
|
(unsafe-fd->evt fd mode #t))
|
||||||
|
|
||||||
|
;; ============================================================
|
||||||
|
;; Testing
|
||||||
|
|
||||||
|
;; The unix socket code is difficult to test completely, because there
|
||||||
|
;; are errors/conditions that the kernel may return that are
|
||||||
|
;; infeasible to deliberately provoke. So optionally replace certain
|
||||||
|
;; system calls here with mock versions just for testing.
|
||||||
|
|
||||||
|
;; An alternative would be to use units; that would allow testing with
|
||||||
|
;; the mocked system calls without editing the source, but I don't
|
||||||
|
;; want the overhead of units :/
|
||||||
|
|
||||||
|
(when #f
|
||||||
|
;; -- mock for connect returning EINPROGRESS
|
||||||
|
(let ([real-connect connect]
|
||||||
|
[real-fd->evt fd->evt])
|
||||||
|
;; connecting-fds : hash[nat => #t]
|
||||||
|
(define connecting-fds (make-hash))
|
||||||
|
(set! connect
|
||||||
|
(lambda (s addr len)
|
||||||
|
(define r (real-connect s addr len))
|
||||||
|
(cond [(zero? r)
|
||||||
|
(hash-set! connecting-fds s #t)
|
||||||
|
(saved-errno EINPROGRESS)
|
||||||
|
(eprintf "** mock connect: setting EINPROGRESS\n")
|
||||||
|
-1]
|
||||||
|
[else r])))
|
||||||
|
(set! fd->evt
|
||||||
|
(lambda (fd kind)
|
||||||
|
(cond [(and (eq? kind 'write)
|
||||||
|
(hash-ref connecting-fds fd #f))
|
||||||
|
(define sema (make-semaphore))
|
||||||
|
(eprintf "** mock fd_to_sema: creating semaphore\n")
|
||||||
|
(thread (lambda ()
|
||||||
|
(sleep 1)
|
||||||
|
(eprintf "** mock fd_to_sema: posting to semaphore\n")
|
||||||
|
(semaphore-post sema)))
|
||||||
|
(hash-remove! connecting-fds fd)
|
||||||
|
sema]
|
||||||
|
[else
|
||||||
|
(real-fd->evt fd kind)])))))
|
||||||
|
|
||||||
|
;; mock for accept returning EWOULDBLOCK/EAGAIN no longer works,
|
||||||
|
;; probably because doesn't intercept unsafe-poll-ctx-fd-wakeup
|
||||||
|
(when #f
|
||||||
|
;; - mock for accept returning EWOULDBLOCK/EAGAIN
|
||||||
|
(let ([real-accept accept]
|
||||||
|
[real-fd->evt fd->evt])
|
||||||
|
;; accepting-fds : hash[nat => #t]
|
||||||
|
(define accepting-fds (make-hash))
|
||||||
|
(set! accept
|
||||||
|
(lambda (s)
|
||||||
|
(cond [(hash-ref accepting-fds s #f)
|
||||||
|
(hash-remove! accepting-fds s)
|
||||||
|
(real-accept s)]
|
||||||
|
[else
|
||||||
|
(eprintf "** mock accept: setting EWOULDBLOCK\n")
|
||||||
|
(hash-set! accepting-fds s #t)
|
||||||
|
(saved-errno EWOULDBLOCK)
|
||||||
|
-1])))
|
||||||
|
(set! fd->evt
|
||||||
|
(lambda (fd kind)
|
||||||
|
(cond [(and (eq? kind 'read)
|
||||||
|
(hash-ref accepting-fds fd #f))
|
||||||
|
(define sema (make-semaphore))
|
||||||
|
(eprintf "** mock fd_to_sema: creating semaphore\n")
|
||||||
|
(thread (lambda ()
|
||||||
|
(sleep 1)
|
||||||
|
(eprintf "** mock fd_to_sema: posting to semaphore\n")
|
||||||
|
(semaphore-post sema)))
|
||||||
|
sema]
|
||||||
|
[else
|
||||||
|
(real-fd->evt fd kind)])))))
|
|
@ -0,0 +1,89 @@
|
||||||
|
#lang racket
|
||||||
|
|
||||||
|
(require data/bit-vector)
|
||||||
|
|
||||||
|
(struct rt-node [edge0 edge1 data] #:transparent #:mutable)
|
||||||
|
(struct rt-edge [label target] #:transparent)
|
||||||
|
|
||||||
|
(define (make-rt)
|
||||||
|
(rt-node #f #f #f))
|
||||||
|
|
||||||
|
;; Helper functions for rt-node struct access based on whether the edge is 1 or 0
|
||||||
|
(define (rt-getter bit) (if bit rt-node-edge1 rt-node-edge0))
|
||||||
|
(define (rt-setter bit) (if bit set-rt-node-edge1! set-rt-node-edge0!))
|
||||||
|
|
||||||
|
;; Bit vector functions but short
|
||||||
|
(define bv-ref bit-vector-ref)
|
||||||
|
(define bv-len bit-vector-length)
|
||||||
|
(define bv-copy bit-vector-copy)
|
||||||
|
(define bv bit-vector)
|
||||||
|
|
||||||
|
(define (bv-common-len bv1 bv1-start bv2 bv2-start)
|
||||||
|
;; How to avoid this bv copy?? Why is there no drop for sequences either
|
||||||
|
;; Matthias pls
|
||||||
|
(for/sum ([b1 (in-bit-vector (bv-copy bv1 bv1-start))]
|
||||||
|
[b2 (in-bit-vector (bv-copy bv2 bv2-start))])
|
||||||
|
#:break (not (equal? b1 b2))
|
||||||
|
1))
|
||||||
|
|
||||||
|
(define (rt-partial-iterate node key [start 0])
|
||||||
|
(cond
|
||||||
|
[(>= start (bv-len key)) (list 'exact node)]
|
||||||
|
[else
|
||||||
|
(let* ([bit (bv-ref key start)]
|
||||||
|
[getter (rt-getter bit)]
|
||||||
|
[next-edge (getter node)]
|
||||||
|
[next-label (and next-edge (rt-edge-label next-edge))]
|
||||||
|
[next-target (and next-edge (rt-edge-target next-edge))]
|
||||||
|
[next-common-len (and next-label (bv-common-len next-label 0 key start))])
|
||||||
|
(cond
|
||||||
|
[(and next-edge (= next-common-len (bv-len next-label)))
|
||||||
|
(rt-partial-iterate next-target key (+ start next-common-len))]
|
||||||
|
[next-edge (list 'partial node start next-edge next-common-len)]
|
||||||
|
[else (list 'no-match node start)]))]))
|
||||||
|
|
||||||
|
(define (rt-insert! node key data)
|
||||||
|
(define (insert-node! node key start data)
|
||||||
|
(let* ([bit (bv-ref key start)]
|
||||||
|
[setter! (rt-setter bit)])
|
||||||
|
(setter! node (rt-edge (bv-copy key start) (rt-node #f #f data)))))
|
||||||
|
(define (split-node! node key start orig-edge prefix-len data)
|
||||||
|
(let* ([bit (bv-ref key start)]
|
||||||
|
[setter! (rt-setter bit)]
|
||||||
|
[orig-label (rt-edge-label orig-edge)]
|
||||||
|
[orig-target (rt-edge-target orig-edge)]
|
||||||
|
[new-orig-edge (rt-edge (bv-copy orig-label prefix-len) orig-target)]
|
||||||
|
[new-insert-edge (rt-edge (bv-copy key (+ start prefix-len)) (rt-node #f #f data))]
|
||||||
|
[diff-bit (bv-ref key (+ start prefix-len))]
|
||||||
|
[common-node (rt-node (if diff-bit new-orig-edge new-insert-edge)
|
||||||
|
(if diff-bit new-insert-edge new-orig-edge)
|
||||||
|
#f)]
|
||||||
|
[common-edge (rt-edge (bv-copy key start (+ start prefix-len)) common-node)])
|
||||||
|
(setter! node common-edge)))
|
||||||
|
(match (rt-partial-iterate node key)
|
||||||
|
[(list 'exact node) (set-rt-node-data! data)]
|
||||||
|
[(list 'partial node start orig-edge prefix-len)
|
||||||
|
(split-node! node key start orig-edge prefix-len data)]
|
||||||
|
[(list 'no-match node start)
|
||||||
|
(insert-node! node key start data)]))
|
||||||
|
|
||||||
|
; (define test (make-rt))
|
||||||
|
; (define (test-insert! x)
|
||||||
|
; (rt-insert! test (string->bit-vector x) x))
|
||||||
|
; (test-insert! "0001")
|
||||||
|
; (test-insert! "1000")
|
||||||
|
; (test-insert! "1010")
|
||||||
|
; (test-insert! "0011")
|
||||||
|
; (test-insert! "0000")
|
||||||
|
;
|
||||||
|
; (define (rt-dump node [prefix ""])
|
||||||
|
; (displayln (format "~a node ~a" prefix (rt-node-data node)))
|
||||||
|
; (define edge0 (rt-node-edge0 node))
|
||||||
|
; (define edge1 (rt-node-edge1 node))
|
||||||
|
; (when edge0
|
||||||
|
; (displayln (format "~a edge0 ~a" prefix (bit-vector->string (rt-edge-label edge0))))
|
||||||
|
; (rt-dump (rt-edge-target edge0) (string-append prefix " ")))
|
||||||
|
; (when edge1
|
||||||
|
; (displayln (format "~a edge1 ~a" prefix (bit-vector->string (rt-edge-label edge1))))
|
||||||
|
; (rt-dump (rt-edge-target edge1) (string-append prefix " "))))
|
||||||
|
; (rt-dump test)
|
|
@ -0,0 +1,316 @@
|
||||||
|
;; https://github.com/racket/unix-socket
|
||||||
|
;; License: Apache 2.0/MIT
|
||||||
|
;; Support for UNIX domain sockets.
|
||||||
|
#lang racket/base
|
||||||
|
(require racket/contract
|
||||||
|
racket/match
|
||||||
|
(rename-in ffi/unsafe (-> -->))
|
||||||
|
ffi/unsafe/atomic
|
||||||
|
ffi/unsafe/custodian
|
||||||
|
ffi/unsafe/define
|
||||||
|
ffi/unsafe/schedule
|
||||||
|
ffi/unsafe/port
|
||||||
|
ffi/file
|
||||||
|
"private/unix-socket-ffi.rkt")
|
||||||
|
(provide unix-socket-available?
|
||||||
|
unix-socket-listener?
|
||||||
|
unix-socket-path?
|
||||||
|
unix-socket-connect
|
||||||
|
unix-socket-listen
|
||||||
|
(contract-out
|
||||||
|
[unix-socket-close-listener
|
||||||
|
(-> unix-socket-listener? any)]
|
||||||
|
[unix-socket-accept
|
||||||
|
(-> unix-socket-listener? (values input-port? output-port?))]
|
||||||
|
[unix-socket-accept-evt
|
||||||
|
(-> unix-socket-listener? evt?)]))
|
||||||
|
|
||||||
|
(define (unix-socket-path? v)
|
||||||
|
(and (unix-socket-path->bytes v) #t))
|
||||||
|
|
||||||
|
(define (unix-socket-path->bytes path)
|
||||||
|
(if (path-string? path)
|
||||||
|
;; On all platforms, normal path of up to UNIX-PATH-MAX bytes after
|
||||||
|
;; conversion to absolute is considered valid and shall be accepted.
|
||||||
|
(let ([bstr (path->bytes (cleanse-path (path->complete-path path)))])
|
||||||
|
(and (<= (bytes-length bstr) UNIX-PATH-MAX) bstr))
|
||||||
|
|
||||||
|
;; On Linux, paths may be in so-called abstract namespace where they
|
||||||
|
;; start with #\nul and do not have a corresponding socket file.
|
||||||
|
;; We accept such paths only as byte strings because we don't know
|
||||||
|
;; the correct encoding.
|
||||||
|
(and (eq? platform 'linux)
|
||||||
|
(bytes? path)
|
||||||
|
(> (bytes-length path) 0)
|
||||||
|
(<= (bytes-length path) UNIX-PATH-MAX)
|
||||||
|
(= (bytes-ref path 0) 0)
|
||||||
|
path)))
|
||||||
|
|
||||||
|
(define (check-available who)
|
||||||
|
(unless unix-socket-available?
|
||||||
|
(error who "unix domain sockets are not supported on this platform")))
|
||||||
|
|
||||||
|
;; do-make-sockaddr : Symbol Path/String -> (values Sockaddr-Pointer Nat)
|
||||||
|
(define (do-make-sockaddr who path)
|
||||||
|
(when (path-string? path)
|
||||||
|
(security-guard-check-file who path '(read write)))
|
||||||
|
(define path-bytes (unix-socket-path->bytes path))
|
||||||
|
(define sockaddr (make-sockaddr path-bytes))
|
||||||
|
(define addrlen (+ (ctype-sizeof _ushort) (bytes-length path-bytes)))
|
||||||
|
(values sockaddr addrlen))
|
||||||
|
|
||||||
|
;; do-make-socket : Symbol Nat -> (values FD Cust-Reg)
|
||||||
|
;; Creates nonblocking socket, registers w/ custodian (returning registration).
|
||||||
|
;; Should be called in atomic mode.
|
||||||
|
(define (do-make-socket who mode)
|
||||||
|
(define socket-fd (socket AF-UNIX mode 0))
|
||||||
|
(unless (positive? socket-fd)
|
||||||
|
(error who "failed to create socket~a"
|
||||||
|
(errno-error-lines (saved-errno))))
|
||||||
|
(set-fd-nonblocking who socket-fd)
|
||||||
|
(values socket-fd (register-custodian-shutdown socket-fd close/unregister)))
|
||||||
|
|
||||||
|
;; set-fd-nonblocking : Symbol Nat -> Void
|
||||||
|
(define (set-fd-nonblocking who fd)
|
||||||
|
(unless (zero? (fcntl fd F_SETFL O_NONBLOCK))
|
||||||
|
(close fd)
|
||||||
|
(error who "failed to set non-blocking mode~a"
|
||||||
|
(errno-error-lines (saved-errno)))))
|
||||||
|
|
||||||
|
;; close/unregister : Nat Cust-Reg/#f -> Void
|
||||||
|
(define (close/unregister fd [reg #f])
|
||||||
|
(close fd)
|
||||||
|
(fd->evt fd 'remove)
|
||||||
|
(when reg (unregister-custodian-shutdown fd reg)))
|
||||||
|
|
||||||
|
;; make-socket-ports : Symbol FD Cust-Reg/#f -> (values Input-Port Output-Port)
|
||||||
|
(define (make-socket-ports who socket-fd reg)
|
||||||
|
(with-handlers ([(lambda (e) #t)
|
||||||
|
(lambda (exn)
|
||||||
|
(close/unregister socket-fd reg)
|
||||||
|
(raise exn))])
|
||||||
|
(define-values (in out)
|
||||||
|
(unsafe-file-descriptor->port socket-fd #"unix-socket-port" '(read write)))
|
||||||
|
;; closing the ports closes socket-fd, so custodian no longer needs to manage directly
|
||||||
|
(when reg (unregister-custodian-shutdown socket-fd reg))
|
||||||
|
(define fd+ports (list socket-fd in out))
|
||||||
|
(values (wrap-input-port in fd+ports) (wrap-output-port out fd+ports))))
|
||||||
|
|
||||||
|
;; wrap-output-port : Output-Port (List FD Port Port) -> Output-Port
|
||||||
|
;; Wrap port, override close to shutdown write side of socket.
|
||||||
|
(define (wrap-output-port out fd+ports)
|
||||||
|
(define (close)
|
||||||
|
(when out (close-output-port out)) ;; may block, so avoid in custodian shutdown
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(when creg (unregister-custodian-shutdown out* creg) (set! creg #f))
|
||||||
|
(when fd+ports (do-shutdown fd+ports #t) (set! fd+ports #f)))))
|
||||||
|
(define (get-write-evt buf start end) (write-bytes-avail-evt buf out start end))
|
||||||
|
(define buffer-mode (make-buffer-mode-fun out))
|
||||||
|
(define out*
|
||||||
|
(make-output-port 'unix-socket out out close #f get-write-evt #f #f void 1 buffer-mode))
|
||||||
|
(define creg (register-custodian-shutdown out* (lambda (p) (set! out #f) (close-output-port p))))
|
||||||
|
out*)
|
||||||
|
|
||||||
|
;; wrap-input-port : Input-Port (List FD Port Port) -> Input-Port
|
||||||
|
(define (wrap-input-port in fd+ports)
|
||||||
|
(define (close)
|
||||||
|
(when in (close-input-port in))
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(when creg (unregister-custodian-shutdown in* creg) (set! creg #f))
|
||||||
|
(when fd+ports (do-shutdown fd+ports #f) (set! fd+ports #f)))))
|
||||||
|
(define (get-progress-evt) (port-progress-evt in))
|
||||||
|
(define (commit k progress done) (port-commit-peeked k progress done in))
|
||||||
|
(define buffer-mode (make-buffer-mode-fun in))
|
||||||
|
(define in*
|
||||||
|
(make-input-port 'unix-socket in in close get-progress-evt commit #f void 1 buffer-mode))
|
||||||
|
(define creg (register-custodian-shutdown in* (lambda (p) (set! in #f) (close-input-port p))))
|
||||||
|
in*)
|
||||||
|
|
||||||
|
(define (make-buffer-mode-fun port)
|
||||||
|
(case-lambda [() (file-stream-buffer-mode port)]
|
||||||
|
[(mode) (file-stream-buffer-mode port mode)]))
|
||||||
|
|
||||||
|
;; do-shutdown : (List FD Port Port) Boolean -> Void
|
||||||
|
;; Requirements:
|
||||||
|
;; - want to shutdown RD/WR when corresponding port closed
|
||||||
|
;; - want to shutdown *after* port closed to avoid low-level errors
|
||||||
|
;; - must *not* call shutdown after *both* ports closed (fd is stale)
|
||||||
|
;; So: okay to call shutdown if either of the ports is still open.
|
||||||
|
(define (do-shutdown fd+ports output?)
|
||||||
|
(define socket-fd (car fd+ports))
|
||||||
|
(define ports (cdr fd+ports))
|
||||||
|
(unless (andmap port-closed? ports)
|
||||||
|
(unless (zero? (shutdown socket-fd (if output? SHUT_WR SHUT_RD)))
|
||||||
|
;; ENOTCONN is okay; the other side may have disconnected.
|
||||||
|
(unless (= (saved-errno) ENOTCONN)
|
||||||
|
(error (if output? 'close-output-port/unix-socket 'close-input-port/unix-socket)
|
||||||
|
"error from shutdown~a" (errno-error-lines (saved-errno)))))))
|
||||||
|
|
||||||
|
;; ============================================================
|
||||||
|
;; Connect
|
||||||
|
|
||||||
|
;; unix-socket-connect : Path/String [Symbol] -> (values Input-Port Output-Port)
|
||||||
|
(define (unix-socket-connect path [mode 'SOCK-STREAM])
|
||||||
|
(check-available 'unix-socket-connect)
|
||||||
|
(define cust (current-custodian))
|
||||||
|
(define-values (sockaddr addrlen) (do-make-sockaddr 'unix-socket-connect path))
|
||||||
|
(define connect-k
|
||||||
|
;; Non-blocking connect may succeed immediately or require waiting to see.
|
||||||
|
;; - If succeeds immediately, make ports in same atomic block
|
||||||
|
;; - If wait, must exit atomic mode to sync
|
||||||
|
;; So we return a procedure to be applied in non-atomic mode that does
|
||||||
|
;; whatever needs doing.
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(when (custodian-shut-down? cust)
|
||||||
|
(error 'unix-socket-connect "the custodian has been shut down"))
|
||||||
|
(define-values (socket-fd reg) (do-make-socket 'unix-socket-connect
|
||||||
|
(match mode
|
||||||
|
['SOCK-STREAM SOCK-STREAM]
|
||||||
|
['SOCK-SEQPACKET SOCK-SEQPACKET])))
|
||||||
|
(define r (connect socket-fd sockaddr addrlen))
|
||||||
|
(define errno (saved-errno))
|
||||||
|
(cond [(= r 0) ;; connected
|
||||||
|
(define-values (in out) (make-socket-ports 'unix-socket-connect socket-fd reg))
|
||||||
|
(lambda () (values in out))]
|
||||||
|
[(= errno EINPROGRESS) ;; wait and see
|
||||||
|
(define ready-evt (fd->evt socket-fd 'write))
|
||||||
|
(lambda () ;; called in non-atomic mode!
|
||||||
|
(sync ready-evt)
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(when (custodian-shut-down? cust) ;; => socket-fd already closed by shutdown
|
||||||
|
(error 'unix-socket-connect "the custodian has been shut down"))
|
||||||
|
(define errno (getsockopt socket-fd SOL_SOCKET SO_ERROR))
|
||||||
|
(cond [(= errno 0)
|
||||||
|
(make-socket-ports 'unix-socket-connect socket-fd reg)]
|
||||||
|
[else
|
||||||
|
(close/unregister socket-fd reg)
|
||||||
|
(error 'unix-socket-connect
|
||||||
|
"failed to connect socket (non-blocking)\n path: ~e~a"
|
||||||
|
path (errno-error-lines errno))]))))]
|
||||||
|
[else
|
||||||
|
(close/unregister socket-fd reg)
|
||||||
|
(error 'unix-socket-connect "failed to connect socket\n path: ~e~a"
|
||||||
|
path (errno-error-lines (saved-errno)))]))))
|
||||||
|
(connect-k))
|
||||||
|
|
||||||
|
|
||||||
|
;; ============================================================
|
||||||
|
;; Listen & Accept
|
||||||
|
|
||||||
|
;; A Listener is (unix-socket-listener Nat/#f Cust-Reg/#f)
|
||||||
|
;; States:
|
||||||
|
;; OPEN: (unix-socket-listener Nat Cust-Reg)
|
||||||
|
;; CLOSED: (unix-socket-listener #f #f)
|
||||||
|
;; Only transition allowed is OPEN to CLOSED, must happen atomically.
|
||||||
|
(struct unix-socket-listener (fd reg)
|
||||||
|
#:mutable
|
||||||
|
#:property prop:evt
|
||||||
|
(lambda (self)
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(wrap-evt
|
||||||
|
;; ready when fd is readable OR listener is closed
|
||||||
|
;; If closed after evt creation, then fd-evt becomes ready
|
||||||
|
;; when fd closed and fd-evt is unregistered.
|
||||||
|
(cond [(unix-socket-listener-fd self)
|
||||||
|
=> (lambda (fd) (fd->evt fd 'read))]
|
||||||
|
[else always-evt])
|
||||||
|
(lambda (r) self))))))
|
||||||
|
|
||||||
|
;; unix-socket-listen : Path/String [Nat] [Symbol] -> Unix-Socket-Listener
|
||||||
|
(define (unix-socket-listen path [backlog 4] [mode 'SOCK-STREAM])
|
||||||
|
(check-available 'unix-socket-listen)
|
||||||
|
(define-values (sockaddr addrlen) (do-make-sockaddr 'unix-socket-listen path))
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(define-values (socket-fd reg) (do-make-socket 'unix-socket-listen
|
||||||
|
(match mode
|
||||||
|
['SOCK-STREAM SOCK-STREAM]
|
||||||
|
['SOCK-SEQPACKET SOCK-SEQPACKET])))
|
||||||
|
(unless (zero? (bind socket-fd sockaddr addrlen))
|
||||||
|
(close/unregister socket-fd reg)
|
||||||
|
(error 'unix-socket-listen "failed to bind socket\n path: ~e~a"
|
||||||
|
path (errno-error-lines (saved-errno))))
|
||||||
|
(unless (zero? (listen socket-fd backlog))
|
||||||
|
(close/unregister socket-fd reg)
|
||||||
|
(error 'unix-socket-listen "failed to listen\n path: ~e~a"
|
||||||
|
path (errno-error-lines (saved-errno))))
|
||||||
|
(define listener (unix-socket-listener socket-fd #f))
|
||||||
|
(set-unix-socket-listener-reg! listener
|
||||||
|
(register-custodian-shutdown listener do-close-listener))
|
||||||
|
(unregister-custodian-shutdown socket-fd reg)
|
||||||
|
listener)))
|
||||||
|
|
||||||
|
;; unix-socket-close-listener : Listener -> Void
|
||||||
|
(define (unix-socket-close-listener l)
|
||||||
|
(call-as-atomic (lambda () (do-close-listener l #t))))
|
||||||
|
|
||||||
|
(define (do-close-listener l [unregister? #f])
|
||||||
|
(define fd (unix-socket-listener-fd l))
|
||||||
|
(define reg (unix-socket-listener-reg l))
|
||||||
|
(when fd
|
||||||
|
(set-unix-socket-listener-fd! l #f)
|
||||||
|
(set-unix-socket-listener-reg! l #f)
|
||||||
|
(when unregister? (unregister-custodian-shutdown l reg))
|
||||||
|
(close/unregister fd)
|
||||||
|
(void)))
|
||||||
|
|
||||||
|
;; ----------------------------------------
|
||||||
|
|
||||||
|
(struct accept-evt (who listener cust) ;; <: (Evt-of (-> (list Input-Port Output-Port)))
|
||||||
|
#:property prop:evt
|
||||||
|
(unsafe-poller (lambda (self maybe-wakeups) (accept-poll self maybe-wakeups))))
|
||||||
|
|
||||||
|
;; unix-socket-accept : Unix-Socket-Listener -> (values Input-Port Output-Port)
|
||||||
|
(define (unix-socket-accept l)
|
||||||
|
(apply values ((sync (accept-evt 'unix-socket-accept l (current-custodian))))))
|
||||||
|
|
||||||
|
;; unix-socket-accept-evt : Unix-Socket-Listener -> (Evt-of (list Input-Port Output-Port))
|
||||||
|
(define (unix-socket-accept-evt l)
|
||||||
|
(wrap-evt (accept-evt 'unix-socket-accept-evt l (current-custodian)) (lambda (r) (r))))
|
||||||
|
|
||||||
|
;; accept-poll : Accept-Evt (U #f Wakeups) -> (U (values List #f) (values #f Evt))
|
||||||
|
(define (accept-poll accept-evt maybe-wakeups)
|
||||||
|
(define l (accept-evt-listener accept-evt))
|
||||||
|
(define who (accept-evt-who accept-evt))
|
||||||
|
(define lfd (unix-socket-listener-fd l))
|
||||||
|
(cond [(eq? lfd #f)
|
||||||
|
(values (list (lambda () (error who "unix socket listener is closed")))
|
||||||
|
#f)]
|
||||||
|
[(custodian-shut-down? (accept-evt-cust accept-evt))
|
||||||
|
(values (list (lambda () (error '|unix-socket-accept-evt poll| "the custodian has been shut down")))
|
||||||
|
#f)]
|
||||||
|
[lfd
|
||||||
|
(cond [maybe-wakeups (accept-poll/sleep who accept-evt maybe-wakeups lfd)]
|
||||||
|
[else (accept-poll/check who accept-evt lfd)])]))
|
||||||
|
|
||||||
|
(define (accept-poll/sleep who accept-evt wakeups lfd)
|
||||||
|
;; No need to register wakeup for custodian; custodian shutdown means a Racket thread
|
||||||
|
;; did work, so accept-evt will get re-polled.
|
||||||
|
(unsafe-poll-ctx-fd-wakeup wakeups lfd 'read)
|
||||||
|
(values #f accept-evt))
|
||||||
|
|
||||||
|
(define (accept-poll/check who accept-evt lfd)
|
||||||
|
(define fd (accept lfd))
|
||||||
|
(cond [(< fd 0)
|
||||||
|
(let ([errno (saved-errno)])
|
||||||
|
(cond [(or (= errno EAGAIN) (= errno EWOULDBLOCK) (= errno EINTR))
|
||||||
|
(values #f accept-evt)]
|
||||||
|
[else
|
||||||
|
(values (list (lambda ()
|
||||||
|
(error who "failed to accept socket~a"
|
||||||
|
(errno-error-lines errno))))
|
||||||
|
#f)]))]
|
||||||
|
[else
|
||||||
|
(define cust (accept-evt-cust accept-evt))
|
||||||
|
(define r
|
||||||
|
(with-handlers ([(lambda (e) #t)
|
||||||
|
(lambda (e) (lambda () (raise e)))])
|
||||||
|
(parameterize ((current-custodian cust))
|
||||||
|
(define-values (in out) (make-socket-ports who fd #f))
|
||||||
|
(lambda () (list in out)))))
|
||||||
|
(values (list r) #f)]))
|
Loading…
Reference in New Issue