rework comms architecture for more robustness

This commit is contained in:
xenia 2020-11-08 23:59:48 -05:00
parent 563b5d691b
commit 9069ae496b
1 changed files with 85 additions and 79 deletions

View File

@ -16,7 +16,8 @@
;; You should have received a copy of the GNU Affero General Public License ;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>. ;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require data/queue racket/bool racket/engine racket/fasl racket/list racket/match racket/tcp (require racket/async-channel racket/bool racket/engine racket/fasl racket/list racket/match
racket/tcp
"not-crypto.rkt") "not-crypto.rkt")
(struct msg [from-id] #:prefab) (struct msg [from-id] #:prefab)
@ -26,7 +27,7 @@
(struct signed [fasl signature] #:prefab) (struct signed [fasl signature] #:prefab)
(struct locked [fasl nonce mac] #:prefab) (struct locked [fasl nonce mac] #:prefab)
(define TIMEOUT 30) (define HANDSHAKE-TIMEOUT 30)
(struct node [id name type pubkey seckey host port] #:transparent) (struct node [id name type pubkey seckey host port] #:transparent)
(define (make-error str) (define (make-error str)
@ -64,19 +65,7 @@
[_ (error "invalid message type recieved during handshake")])] [_ (error "invalid message type recieved during handshake")])]
[_ (error "invalid data recieved during handshake")])) [_ (error "invalid data recieved during handshake")]))
(define (peer-thread el-thread in out) (define (peer-thread el-thread peer-data session-key in out)
;; run handshake process with a timeout
(match-define (cons peer-data session-key)
(let ([eng (engine (lambda (_) (peer-handshake el-thread in out)))])
(if (engine-run (* 1000 TIMEOUT) eng)
(engine-result eng)
(begin (engine-kill eng)
(error "handshake timeout")))))
(displayln "handshake complete")
(displayln (format "negotiated ~s" session-key))
(thread-sendrecv el-thread 'register-channel (node-id peer-data))
(define (handle-in-msg) (define (handle-in-msg)
(match (fasl->s-exp in) (match (fasl->s-exp in)
[(locked fasl nonce mac) [(locked fasl nonce mac)
@ -101,20 +90,40 @@
(flush-output out)] (flush-output out)]
[x (error "invalid thread msg" x)])) [x (error "invalid thread msg" x)]))
(let loop () (define (cleanup)
(match (sync (thread-receive-evt) in) (crypto-wipe session-key)
[(? input-port?) (handle-in-msg)] (thread-sendrecv el-thread 'deregister-channel (node-id peer-data))
[_ (handle-out-msg)]) (void))
(loop))
(void)) (with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
(let loop ()
(match (sync (thread-receive-evt) in)
[(? input-port?) (handle-in-msg)]
[_ (handle-out-msg)])
(loop))
(cleanup)))
(define (make-peer-thread el-thread ports-proc) (define (make-peer-thread el-thread ports-proc [cust (make-custodian)])
(define cust (make-custodian)) (define-values [new-thd peer-data]
(define new-thd
(parameterize ([current-custodian cust]) (parameterize ([current-custodian cust])
(define-values [in out] (ports-proc)) (define-values [in out] (ports-proc))
(thread (lambda () (peer-thread el-thread in out)))))
;; run handshake process with a timeout
(match-define (cons peer-data session-key)
(let ([eng (engine (lambda (_) (peer-handshake el-thread in out)))])
(if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng)
(engine-result eng)
(begin (engine-kill eng) (error "handshake timeout")))))
(displayln "handshake complete")
(displayln (format "negotiated ~s" session-key))
(values
(thread (lambda () (peer-thread el-thread peer-data session-key in out)))
peer-data)))
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
(thread (lambda () (thread (lambda ()
(thread-wait new-thd) (thread-wait new-thd)
(custodian-shutdown-all cust))) (custodian-shutdown-all cust)))
@ -122,77 +131,71 @@
(define el-thread (current-thread)) (define el-thread (current-thread))
(define local-msg-queue (make-queue)) (define local-msg-channel (make-async-channel))
(define local-msg-waiters '())
(define peer-waiters (make-hash))
(define peer-registry (make-hash)) (define peer-registry (make-hash))
(define node-registry (make-hash)) (define node-registry (make-hash))
(hash-set! node-registry (node-id my-node) my-node) (hash-set! node-registry (node-id my-node) my-node)
(define listener-thread #f) (define listener-thd #f)
(define (handle-listen port) (define (handle-listen from port)
(when (false? listener-thread) (with-handlers ([exn? (lambda (ex) (thread-send from ex #f))])
(set! listener-thread (when (false? listener-thd)
(thread (define listener (tcp-listen port 4 #t))
(lambda () (set! listener-thd
(define listener (tcp-listen port 4 #t)) (thread
(let loop () (lambda ()
(make-peer-thread el-thread (lambda () (tcp-accept listener))) (define (cleanup) (set! listener-thd #f))
(loop))))))) (with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
(let loop ()
(make-peer-thread el-thread (lambda () (tcp-accept listener)))
(loop))))))
(thread-send from (void) #f))))
(define (handle-connect data) (define (handle-connect from id)
(match-define (cons host port) data) (thread
(make-peer-thread el-thread (lambda () (tcp-connect host port)))) (lambda ()
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))])
(define (handle-local-msg msg) (match (hash-ref node-registry id #f)
(if (empty? local-msg-waiters) [#f (thread-send from (make-error "no such node" id) #f)]
(enqueue! local-msg-queue msg) [(node id name type pubkey seckey host port)
(begin (make-peer-thread el-thread (lambda () (tcp-connect host port)))
(for ([waiter (in-list local-msg-waiters)]) (thread-send from (void) #f)])))))
(thread-send waiter msg #f))
(set! local-msg-waiters '()))))
(define (handle-fetch-local-msg from-thd)
(if (queue-empty? local-msg-queue)
(set! local-msg-waiters (cons from-thd local-msg-waiters))
(let ([next (dequeue! local-msg-queue)])
(when (false? (thread-send from-thd next #f))
(enqueue-front! local-msg-queue next)))))
(define (handle-node-wait id thd)
(if (hash-has-key? peer-registry id)
(thread-send thd (void) #f)
(hash-update! peer-waiters 0 (lambda (v) (cons thd v)) (lambda () '()))))
(let loop () (let loop ()
(match-define (cons from (cons type data)) (thread-receive)) (match-define (cons from (cons type data)) (thread-receive))
(match type (match type
['listen (handle-listen data) (thread-send from (void) #f)] ['listen (handle-listen from data)]
['connect (handle-connect data) (thread-send from (void) #f)] ['connect (handle-connect from data)]
['wait-for (handle-node-wait data from)] ['channel-available? (thread-send from (hash-has-key? peer-registry data) #f)]
['get-node-info (thread-send from (hash-ref node-registry data #f) #f)] ['get-node-info (thread-send from (hash-ref node-registry data #f) #f)]
['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)] ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)]
['register-channel ['register-channel
(when (hash-has-key? peer-registry data) (match-define (cons peer-id thd) data)
(kill-thread (hash-ref peer-registry data))) (when (hash-has-key? peer-registry peer-id)
(hash-set! peer-registry data from) (kill-thread (hash-ref peer-registry peer-id)))
(for ([thd (in-list (hash-ref peer-waiters data '()))]) (hash-set! peer-registry peer-id thd)
(thread-send thd (void) #f)) (thread-send from (void) #f)]
(hash-remove! peer-waiters data) ;; remove channel, don't kill thread
['deregister-channel (hash-remove! peer-registry data) (thread-send from (void) #f)]
;; remove channel, kill thread
['destroy-channel
(match (hash-ref peer-registry data #f)
[#f (void)]
[thd (kill-thread thd) (hash-remove! peer-registry data)])
(thread-send from (void) #f)] (thread-send from (void) #f)]
['dispatch-msg ['dispatch-msg
(match-define (cons peer-id msg) data) (match-define (cons peer-id msg) data)
(if (equal? peer-id (node-id my-node)) (if (equal? peer-id (node-id my-node))
(handle-local-msg msg) (async-channel-put local-msg-channel msg)
(match (hash-ref peer-registry peer-id #f) (match (hash-ref peer-registry peer-id #f)
[#f (thread-send from (make-error "no such peer connection") #f)] [#f (thread-send from (make-error "no such peer connection") #f)]
[thd [thd
(thread-send (match (thread-send thd msg #f)
from [#f (kill-thread thd) (hash-remove! peer-registry peer-id)
(thread-send thd msg (lambda () (make-error "failed to dispatch to thread"))) (thread-send from (make-error "failed to dispatch to thread") #f)]
#f)]))] [_ (thread-send from (void) #f)])]))]
['fetch-msg (handle-fetch-local-msg from)] ['local-channel (thread-send from local-msg-channel #f)]
[_ (thread-send from (make-error "unknown thread message type") #f)]) [_ (thread-send from (make-error "unknown thread message type") #f)])
(loop))) (loop)))
@ -217,12 +220,15 @@
(define comms (make-comms server-node)) (define comms (make-comms server-node))
(thread-sendrecv comms 'set-node-info client-node) (thread-sendrecv comms 'set-node-info client-node)
(thread-sendrecv comms 'listen 1337) (thread-sendrecv comms 'listen 1337)
(displayln (thread-sendrecv comms 'fetch-msg (void)))] (displayln "awaiting messages")
(displayln (async-channel-get (thread-sendrecv comms 'local-channel (void))))]
["client" ["client"
(define comms (make-comms client-node)) (define comms (make-comms client-node))
(thread-sendrecv comms 'set-node-info server-node) (thread-sendrecv comms 'set-node-info server-node)
(thread-sendrecv comms 'connect (cons "localhost" 1337)) (displayln "connecting")
(thread-sendrecv comms 'wait-for 0) (with-handlers ([exn? (lambda (ex) (displayln "failed!!!") (displayln ex))])
(thread-sendrecv comms 'connect 0))
(displayln "connected")
(thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))]) (thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))])
(sleep 9999) (sleep 9999)