fix concurrency bug in comms

This commit is contained in:
xenia 2020-12-12 22:49:14 -05:00
parent 630c1b96f0
commit 6607dcb78a
1 changed files with 10 additions and 7 deletions

View File

@ -107,7 +107,7 @@
;; encapsulates a message queue for one peer connection ;; encapsulates a message queue for one peer connection
;; sends and recieves encrypted data using the session key ;; sends and recieves encrypted data using the session key
(define (peer-thread peer-data session-key in out) (define (peer-thread peer-data session-key in out local-msg-channel)
(define run-queue #t) (define run-queue #t)
;; handles tcp data in ;; handles tcp data in
@ -121,7 +121,7 @@
[(? msg? m) [(? msg? m)
(unless (= (msg-from-id m) (node-id peer-data)) (unless (= (msg-from-id m) (node-id peer-data))
(error "mismatched node id" (msg-from-id m) (node-id peer-data))) (error "mismatched node id" (msg-from-id m) (node-id peer-data)))
(thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))] (async-channel-put local-msg-channel m)]
[_ (error "invalid msg data from peer" (node-id peer-data))])])] [_ (error "invalid msg data from peer" (node-id peer-data))])])]
[#f ;; likely EOF. could also be invalid fasl data that could not be deserialized [#f ;; likely EOF. could also be invalid fasl data that could not be deserialized
;; in either case, close the connection. there's not much else we can do ;; in either case, close the connection. there's not much else we can do
@ -152,7 +152,7 @@
;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake ;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake
;; then starts a peer thread and a peer thread cleanup monitor ;; then starts a peer thread and a peer thread cleanup monitor
;; this cannot be run synchronously on the main loop because it does a thread-sendrecv ;; this cannot be run synchronously on the main loop because it does a thread-sendrecv
(define (make-peer-thread ports-proc [cust (make-custodian)]) (define (make-peer-thread ports-proc local-msg-channel [cust (make-custodian)])
(define-values [new-thd peer-data session-key] (define-values [new-thd peer-data session-key]
(parameterize ([current-custodian cust]) (parameterize ([current-custodian cust])
(define-values [in out] (ports-proc)) (define-values [in out] (ports-proc))
@ -167,7 +167,7 @@
(begin (engine-kill eng) (error "handshake timeout"))))) (begin (engine-kill eng) (error "handshake timeout")))))
(values (values
(thread (lambda () (peer-thread peer-data session-key in out))) (thread (lambda () (peer-thread peer-data session-key in out local-msg-channel)))
peer-data peer-data
session-key))) session-key)))
@ -210,7 +210,7 @@
(define (cleanup) (set! listener-thd #f)) (define (cleanup) (set! listener-thd #f))
(with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))]) (with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
(let loop () (let loop ()
(make-peer-thread (lambda () (tcp-accept listener))) (make-peer-thread (lambda () (tcp-accept listener)) local-msg-channel)
(loop)))))) (loop))))))
(thread-send from (void) #f)))) (thread-send from (void) #f))))
@ -224,7 +224,7 @@
[#f (thread-send from (make-error "no such node" id) #f)] [#f (thread-send from (make-error "no such node" id) #f)]
[(node id name type pubkey seckey host port) [(node id name type pubkey seckey host port)
(when (and host port) (when (and host port)
(make-peer-thread (lambda () (tcp-connect host port)))) (make-peer-thread (lambda () (tcp-connect host port)) local-msg-channel))
(thread-send from (void) #f)]))))) (thread-send from (void) #f)])))))
;; notify that startup is done ;; notify that startup is done
@ -539,7 +539,10 @@
(export #,sig-name) (export #,sig-name)
#,@(for/list ([mem (in-list members)]) #,@(for/list ([mem (in-list members)])
#`(define (#,mem . args) #`(define (#,mem . args)
(tm-transact (current-tm) (node-id (current-to-node)) (quote #,mem) args))))) ;; breaks off (there's a hard timeout of 30s, so the break will be delivered
;; eventually)
(parameterize-break #f
(tm-transact (current-tm) (node-id (current-to-node)) (quote #,mem) args))))))
unit-out) unit-out)
;; this creates a wrapper unit for the given signature that delegates to the transaction manager ;; this creates a wrapper unit for the given signature that delegates to the transaction manager