expand functionality and reliability of comms

This commit is contained in:
xenia 2020-11-12 01:59:56 -05:00
parent 150bc8eccf
commit b036c37cba
2 changed files with 88 additions and 42 deletions

View File

@ -110,6 +110,8 @@
[_ (error "invalid msg data from peer" (node-id peer-data))])])] [_ (error "invalid msg data from peer" (node-id peer-data))])])]
[_ (error "invalid data recieved from peer" (node-id peer-data))])) [_ (error "invalid data recieved from peer" (node-id peer-data))]))
(define run-queue #t)
;; handles a thread queue message to tcp out ;; handles a thread queue message to tcp out
(define (handle-out-msg) (define (handle-out-msg)
(match (thread-receive) (match (thread-receive)
@ -121,28 +123,21 @@
(define-values [ct mac] (crypto-lock session-key nonce fasl)) (define-values [ct mac] (crypto-lock session-key nonce fasl))
(s-exp->fasl (locked ct nonce mac) out) (s-exp->fasl (locked ct nonce mac) out)
(flush-output out)] (flush-output out)]
['stop (set! run-queue #f)]
[x (error "invalid thread msg" x)])) [x (error "invalid thread msg" x)]))
;; cleanup helper -- wipe the session key and deregister ourselves from the main loop
(define (cleanup)
(crypto-wipe session-key)
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data))
(void))
;; handle mailbox <-> tcp ;; handle mailbox <-> tcp
(with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
(let loop () (let loop ()
(match (sync (thread-receive-evt) in) (match (sync (thread-receive-evt) in)
[(? input-port?) (handle-in-msg)] [(? input-port?) (handle-in-msg)]
[_ (handle-out-msg)]) [_ (handle-out-msg)])
(loop)) (when run-queue (loop))))
(cleanup)))
;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake ;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake
;; then starts a peer thread and a peer thread cleanup monitor ;; then starts a peer thread and a peer thread cleanup monitor
;; this cannot be run synchronously on the main loop because it does a thread-sendrecv ;; this cannot be run synchronously on the main loop because it does a thread-sendrecv
(define (make-peer-thread el-thread ports-proc [cust (make-custodian)]) (define (make-peer-thread el-thread ports-proc [cust (make-custodian)])
(define-values [new-thd peer-data] (define-values [new-thd peer-data session-key]
(parameterize ([current-custodian cust]) (parameterize ([current-custodian cust])
(define-values [in out] (ports-proc)) (define-values [in out] (ports-proc))
@ -155,19 +150,19 @@
(engine-result eng) (engine-result eng)
(begin (engine-kill eng) (error "handshake timeout"))))) (begin (engine-kill eng) (error "handshake timeout")))))
(displayln "handshake complete")
(displayln (format "negotiated ~s" session-key))
(values (values
(thread (lambda () (peer-thread el-thread peer-data session-key in out))) (thread (lambda () (peer-thread el-thread peer-data session-key in out)))
peer-data))) peer-data
session-key)))
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd)) (thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
;; monitor thread -- shuts down the custodian once the peer thread is done ;; monitor thread -- shuts down the custodian once the peer thread is done
(thread (lambda () (thread (lambda ()
(thread-wait new-thd) (thread-wait new-thd)
(custodian-shutdown-all cust))) (custodian-shutdown-all cust)
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data))
(crypto-wipe session-key)))
new-thd) new-thd)
;; this thread ;; this thread
@ -182,13 +177,16 @@
(hash-set! node-registry (node-id my-node) my-node) (hash-set! node-registry (node-id my-node) my-node)
;; tcp listener, if this is a server ;; tcp listener, if this is a server
(define listener #f)
(define listener-thd #f) (define listener-thd #f)
(define run-comms #t)
;; starts the tcp listener ;; starts the tcp listener
(define (handle-listen from port) (define (handle-listen from port)
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))]) (with-handlers ([exn? (lambda (ex) (thread-send from ex #f))])
(when (false? listener-thd) (when (false? listener-thd)
(define listener (tcp-listen port 4 #t)) (set! listener (tcp-listen port 4 #t))
(set! listener-thd (set! listener-thd
(thread (thread
(lambda () (lambda ()
@ -222,7 +220,9 @@
;; active, and can be destroyed with destroy-channel ;; active, and can be destroyed with destroy-channel
;; this call is guaranteed to return within max(HANDSHAKE-TIMEOUT, system tcp connect timeout) ;; this call is guaranteed to return within max(HANDSHAKE-TIMEOUT, system tcp connect timeout)
['connect (handle-connect from data)] ['connect (handle-connect from data)]
['channel-available? (thread-send from (hash-has-key? peer-registry data) #f)] ['channel-available?
(thread-send
from (or (= data (node-id my-node)) (hash-has-key? peer-registry data) #f))]
['get-node-info (thread-send from (hash-ref node-registry data #f) #f)] ['get-node-info (thread-send from (hash-ref node-registry data #f) #f)]
['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)] ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)]
['register-channel ['register-channel
@ -237,7 +237,10 @@
['destroy-channel ['destroy-channel
(match (hash-ref peer-registry data #f) (match (hash-ref peer-registry data #f)
[#f (void)] [#f (void)]
[thd (kill-thread thd) (hash-remove! peer-registry data)]) [thd (thread-send thd 'stop #f)
(sync/timeout 5 thd)
(kill-thread thd)
(hash-remove! peer-registry data)])
(thread-send from (void) #f)] (thread-send from (void) #f)]
;; if this call forwards an exn:fail, it means the queue thread for this peer is dead, so you ;; if this call forwards an exn:fail, it means the queue thread for this peer is dead, so you
;; need to either wait for the peer to reconnect if this is a server, or initiate a new ;; need to either wait for the peer to reconnect if this is a server, or initiate a new
@ -253,7 +256,7 @@
;; transferred, in which case you wouldn't be notified that it failed ;; transferred, in which case you wouldn't be notified that it failed
['dispatch-msg ['dispatch-msg
(match-define (cons peer-id msg) data) (match-define (cons peer-id msg) data)
(if (equal? peer-id (node-id my-node)) (if (= peer-id (node-id my-node))
(async-channel-put local-msg-channel msg) (async-channel-put local-msg-channel msg)
(match (hash-ref peer-registry peer-id #f) (match (hash-ref peer-registry peer-id #f)
[#f (thread-send from (make-error "no such peer connection") #f)] [#f (thread-send from (make-error "no such peer connection") #f)]
@ -265,13 +268,61 @@
;; retrieves an async channel with the local received message queue ;; retrieves an async channel with the local received message queue
;; this can be used to fetch new messages without further interacting with the comms thread ;; this can be used to fetch new messages without further interacting with the comms thread
['local-channel (thread-send from local-msg-channel #f)] ['local-channel (thread-send from local-msg-channel #f)]
['shutdown
(for ([(_ thd) (in-hash peer-registry)])
(thread-send thd 'stop #f)
(sync/timeout 5 thd)
(kill-thread thd))
(when listener-thd
(kill-thread listener-thd)
(tcp-close listener))
(set! run-comms #f)
(thread-send from (void) #f)]
[_ (thread-send from (make-error "unknown thread message type") #f)]) [_ (thread-send from (make-error "unknown thread message type") #f)])
(loop))) (when run-comms
(loop))))
;; creates the comms thread ;; creates the comms thread
(define (make-comms my-node) (define (make-comms my-node)
(thread (lambda () (comms-event-loop my-node)))) (thread (lambda () (comms-event-loop my-node))))
(define (comms-listen comms port)
(thread-sendrecv comms 'listen port))
(define (comms-connect comms id)
(thread-sendrecv comms 'connect id))
(define (comms-set-node-info comms info)
(thread-sendrecv comms 'set-node-info info))
(define (comms-get-node-info comms id)
(thread-sendrecv comms 'get-node-info id))
(define (comms-local-channel comms)
(thread-sendrecv comms 'local-channel (void)))
(define (comms-dispatch-msg comms to-id msg)
(thread-sendrecv comms 'dispatch-msg (cons to-id msg)))
(define (comms-channel-available? comms ch-id)
(thread-sendrecv comms 'channel-available? ch-id))
(define (comms-shutdown comms)
(thread-sendrecv comms 'shutdown (void)))
(define (comms-dispatch-msg/retry comms to-id msg [tries 3] [retry-delay 2])
(define (do-retry ex)
(when (<= tries 0)
(raise ex))
(sleep retry-delay)
(comms-dispatch-msg/retry comms to-id msg (sub1 tries) retry-delay))
(with-handlers ([exn? do-retry])
(unless (comms-channel-available? comms to-id)
(comms-connect comms to-id))
(comms-dispatch-msg comms to-id msg)))
;; demo code ;; demo code
(define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
(define server-pk (crypto-sign-public-key server-sk)) (define server-pk (crypto-sign-public-key server-sk))
@ -288,17 +339,14 @@
(match mode (match mode
["server" ["server"
(define comms (make-comms server-node)) (define comms (make-comms server-node))
(thread-sendrecv comms 'set-node-info client-node) (comms-set-node-info comms client-node)
(thread-sendrecv comms 'listen 1337) (comms-listen comms 1337)
(displayln "awaiting messages") (displayln "awaiting messages")
(displayln (async-channel-get (thread-sendrecv comms 'local-channel (void))))] (displayln (async-channel-get (comms-local-channel comms)))
(comms-shutdown comms)]
["client" ["client"
(define comms (make-comms client-node)) (define comms (make-comms client-node))
(thread-sendrecv comms 'set-node-info server-node) (comms-set-node-info comms server-node)
(displayln "connecting") (displayln "sending")
(with-handlers ([exn? (lambda (ex) (displayln "failed!!!") (displayln ex))]) (comms-dispatch-msg/retry comms 0 (msg:meow 1 "hello world"))
(thread-sendrecv comms 'connect 0)) (comms-shutdown comms)])
(displayln "connected")
(thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))])
(sleep 9999)

View File

@ -42,8 +42,6 @@
(define (parse-manifest manifest-def) (define (parse-manifest manifest-def)
(struct manifest [name mode command isets pattern] #:transparent) (struct manifest [name mode command isets pattern] #:transparent)
(define (eq/m x) (curry equal? x))
(define-syntax (check-false stx) (define-syntax (check-false stx)
(syntax-case stx () (syntax-case stx ()
[(_ what) [(_ what)
@ -56,14 +54,14 @@
(for/fold ([mf (manifest #f 'stdin #f builtin-isets #f)]) (for/fold ([mf (manifest #f 'stdin #f builtin-isets #f)])
([line (in-list manifest-def)]) ([line (in-list manifest-def)])
(match line (match line
[(list (? (eq/m 'name)) name) (struct-copy manifest mf [name name])] [(list 'name name) (struct-copy manifest mf [name name])]
[(list (? (eq/m 'mode)) mode) (struct-copy manifest mf [mode mode])] [(list 'mode mode) (struct-copy manifest mf [mode mode])]
[(list (? (eq/m 'command)) command) (struct-copy manifest mf [command command])] [(list 'command command) (struct-copy manifest mf [command command])]
[(list (? (eq/m 'iset)) name val) [(list 'iset name val)
(struct-copy manifest mf (struct-copy manifest mf
[isets (hash-set (manifest-isets mf) [isets (hash-set (manifest-isets mf)
(symbol->string name) (string->iset val))])] (symbol->string name) (string->iset val))])]
[(list (? (eq/m 'pattern)) pattern) (struct-copy manifest mf [pattern pattern])]))) [(list 'pattern pattern) (struct-copy manifest mf [pattern pattern])])))
(check-false name) (check-false name)
(check-false command) (check-false command)