fixed unix sockets

This commit is contained in:
Milo 2020-01-30 18:56:17 -05:00
parent 83f2a89b25
commit 7505e181ea
3 changed files with 52 additions and 49 deletions

View File

@ -14,9 +14,12 @@
(case (system-type 'os)
[(macosx) 'bsd]
[(unix)
(define sys (path->string (system-library-subpath #f)))
(cond [(regexp-match? #rx"-linux$" sys) 'linux]
[(regexp-match? #rx"bsd$" sys) 'bsd]
(define machine
;; security guard may prevent executing uname
(with-handlers ([exn:fail? (lambda (e) "unknown")])
(system-type 'machine)))
(cond [(regexp-match? #rx"^Linux" machine) 'linux]
[(regexp-match? #rx"^[a-zA-Z]*BSD" machine) 'bsd]
[else #f])]
[else #f]))
@ -154,8 +157,8 @@
;; Racket constants and functions
;; indirection to support testing; see below
(define (fd->evt fd mode)
(unsafe-fd->evt fd mode #t))
(define (socket->semaphore fd mode)
(unsafe-socket->semaphore fd mode))
;; ============================================================
;; Testing
@ -172,7 +175,7 @@
(when #f
;; -- mock for connect returning EINPROGRESS
(let ([real-connect connect]
[real-fd->evt fd->evt])
[real-socket->semaphore socket->semaphore])
;; connecting-fds : hash[nat => #t]
(define connecting-fds (make-hash))
(set! connect
@ -184,7 +187,7 @@
(eprintf "** mock connect: setting EINPROGRESS\n")
-1]
[else r])))
(set! fd->evt
(set! socket->semaphore
(lambda (fd kind)
(cond [(and (eq? kind 'write)
(hash-ref connecting-fds fd #f))
@ -197,14 +200,14 @@
(hash-remove! connecting-fds fd)
sema]
[else
(real-fd->evt fd kind)])))))
(real-socket->semaphore fd kind)])))))
;; mock for accept returning EWOULDBLOCK/EAGAIN no longer works,
;; probably because doesn't intercept unsafe-poll-ctx-fd-wakeup
(when #f
;; - mock for accept returning EWOULDBLOCK/EAGAIN
(let ([real-accept accept]
[real-fd->evt fd->evt])
[real-socket->semaphore socket->semaphore])
;; accepting-fds : hash[nat => #t]
(define accepting-fds (make-hash))
(set! accept
@ -217,7 +220,7 @@
(hash-set! accepting-fds s #t)
(saved-errno EWOULDBLOCK)
-1])))
(set! fd->evt
(set! socket->semaphore
(lambda (fd kind)
(cond [(and (eq? kind 'read)
(hash-ref accepting-fds fd #f))
@ -229,4 +232,4 @@
(semaphore-post sema)))
sema]
[else
(real-fd->evt fd kind)])))))
(real-socket->semaphore fd kind)])))))

41
router
View File

@ -28,6 +28,7 @@
(define buf (make-bytes 65536))
(let loop ()
(define len (read-bytes-avail! buf sock-in))
(printf "got ~a bytes from ~a...\n" len peer)
(define jsexpr (bytes->jsexpr (subbytes buf 0 len)))
(channel-put mail (list peer jsexpr))
(loop))))))
@ -37,35 +38,37 @@
(printf "from ~a:\n~s\n" peer data)
(loop))
(define loop-thread
(thread loop))
(λ ()
(for-each break-thread
(cons loop-thread
peer-threads))))
(printf "waiting for messages...\n")
(loop))
;; Str [Listof Peer] ->
;; Router main
(define (run-router asn peers)
(displayln asn)
(map displayln peers)
(run-router/conns
(for/list ([peer (in-list peers)])
(define-values [sock-in sock-out]
(unix-socket-connect (ip->string (peer-ip peer))
'SOCK_SEQPACKET))
(peer-conn peer
sock-in
sock-out))))
(displayln "------------")
(with-handlers ([exn:break:terminate? (λ (e)
(printf "time to die.\n"))])
(run-router/conns
asn
(for/list ([peer (in-list peers)])
(define-values [sock-in sock-out]
(unix-socket-connect (ip->string (peer-ip peer))
'SOCK-SEQPACKET))
(peer-conn peer
sock-in
sock-out)))))
(module+ main
(command-line
#:program "router"
#:args
(asn . peers)
;; Run the router
(run-router asn (map string->peer peers))))
(with-output-to-file "log.txt"
#:exists 'replace
(λ ()
;; Run the router
(run-router asn (map string->peer peers))))))
(module+ test
@ -74,9 +77,9 @@
(define p1 (peer-conn (string->peer "1.2.3.4-cust") in1 out1))
(define p2 (peer-conn (string->peer "1.2.3.5-peer") in2 out2))
(define abort-router
; (define abort-router
(run-router/conns "123"
(list p1 p2)))
(list p1 p2))
(void
(write-string "{\"a\": 1, \"b\": [1,2,3]}" out1)))

View File

@ -80,7 +80,7 @@
;; close/unregister : Nat Cust-Reg/#f -> Void
(define (close/unregister fd [reg #f])
(close fd)
(fd->evt fd 'remove)
(socket->semaphore fd 'remove)
(when reg (unregister-custodian-shutdown fd reg)))
;; make-socket-ports : Symbol FD Cust-Reg/#f -> (values Input-Port Output-Port)
@ -154,35 +154,33 @@
;; unix-socket-connect : Path/String [Symbol] -> (values Input-Port Output-Port)
(define (unix-socket-connect path [mode 'SOCK-STREAM])
(check-available 'unix-socket-connect)
(define cust (current-custodian))
(define-values (sockaddr addrlen) (do-make-sockaddr 'unix-socket-connect path))
(define connect-k
;; Non-blocking connect may succeed immediately or require waiting to see.
;; - If succeeds immediately, make ports in same atomic block
;; - If wait, must exit atomic mode to sync
;; So we return a procedure to be applied in non-atomic mode that does
;; So we return a procedure to be applied in non-atomic mode that does
;; whatever needs doing.
(call-as-atomic
(lambda ()
(when (custodian-shut-down? cust)
(when (custodian-shut-down? (current-custodian))
(error 'unix-socket-connect "the custodian has been shut down"))
(define-values (socket-fd reg) (do-make-socket 'unix-socket-connect
(match mode
['SOCK-STREAM SOCK-STREAM]
['SOCK-SEQPACKET SOCK-SEQPACKET])))
['SOCK-STREAM SOCK-STREAM]
['SOCK-SEQPACKET SOCK-SEQPACKET])))
(define r (connect socket-fd sockaddr addrlen))
(define errno (saved-errno))
(cond [(= r 0) ;; connected
(define-values (in out) (make-socket-ports 'unix-socket-connect socket-fd reg))
(lambda () (values in out))]
[(= errno EINPROGRESS) ;; wait and see
(define ready-evt (fd->evt socket-fd 'write))
(define sema (socket->semaphore socket-fd 'write))
(lambda () ;; called in non-atomic mode!
(sync ready-evt)
(sync sema)
;; FIXME: check custodian hasn't been shut down?
(call-as-atomic
(lambda ()
(when (custodian-shut-down? cust) ;; => socket-fd already closed by shutdown
(error 'unix-socket-connect "the custodian has been shut down"))
(define errno (getsockopt socket-fd SOL_SOCKET SO_ERROR))
(cond [(= errno 0)
(make-socket-ports 'unix-socket-connect socket-fd reg)]
@ -214,10 +212,10 @@
(lambda ()
(wrap-evt
;; ready when fd is readable OR listener is closed
;; If closed after evt creation, then fd-evt becomes ready
;; when fd closed and fd-evt is unregistered.
;; If closed after evt creation, then fd-sema becomes ready
;; when fd closed and fd-sema unregistered.
(cond [(unix-socket-listener-fd self)
=> (lambda (fd) (fd->evt fd 'read))]
=> (lambda (fd) (socket->semaphore fd 'read))]
[else always-evt])
(lambda (r) self))))))
@ -229,8 +227,8 @@
(lambda ()
(define-values (socket-fd reg) (do-make-socket 'unix-socket-listen
(match mode
['SOCK-STREAM SOCK-STREAM]
['SOCK-SEQPACKET SOCK-SEQPACKET])))
['SOCK-STREAM SOCK-STREAM]
['SOCK-SEQPACKET SOCK-SEQPACKET])))
(unless (zero? (bind socket-fd sockaddr addrlen))
(close/unregister socket-fd reg)
(error 'unix-socket-listen "failed to bind socket\n path: ~e~a"
@ -282,15 +280,14 @@
(values (list (lambda () (error who "unix socket listener is closed")))
#f)]
[(custodian-shut-down? (accept-evt-cust accept-evt))
(values (list (lambda () (error '|unix-socket-accept-evt poll| "the custodian has been shut down")))
#f)]
(error '|unix-socket-accept-evt poll| "the custodian has been shut down")]
[lfd
(cond [maybe-wakeups (accept-poll/sleep who accept-evt maybe-wakeups lfd)]
[else (accept-poll/check who accept-evt lfd)])]))
(define (accept-poll/sleep who accept-evt wakeups lfd)
;; No need to register wakeup for custodian; custodian shutdown means a Racket thread
;; did work, so accept-evt will get re-polled.
;; No need to register wakeup for custodian; if custodian is shut down, then
;; lfd semaphore becomes ready when it is unregistered
(unsafe-poll-ctx-fd-wakeup wakeups lfd 'read)
(values #f accept-evt))