diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index a716883..8098509 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -110,6 +110,8 @@ [_ (error "invalid msg data from peer" (node-id peer-data))])])] [_ (error "invalid data recieved from peer" (node-id peer-data))])) + (define run-queue #t) + ;; handles a thread queue message to tcp out (define (handle-out-msg) (match (thread-receive) @@ -121,28 +123,21 @@ (define-values [ct mac] (crypto-lock session-key nonce fasl)) (s-exp->fasl (locked ct nonce mac) out) (flush-output out)] + ['stop (set! run-queue #f)] [x (error "invalid thread msg" x)])) - ;; cleanup helper -- wipe the session key and deregister ourselves from the main loop - (define (cleanup) - (crypto-wipe session-key) - (thread-sendrecv el-thread 'deregister-channel (node-id peer-data)) - (void)) - ;; handle mailbox <-> tcp - (with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))]) - (let loop () - (match (sync (thread-receive-evt) in) - [(? input-port?) (handle-in-msg)] - [_ (handle-out-msg)]) - (loop)) - (cleanup))) + (let loop () + (match (sync (thread-receive-evt) in) + [(? input-port?) (handle-in-msg)] + [_ (handle-out-msg)]) + (when run-queue (loop)))) ;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake ;; then starts a peer thread and a peer thread cleanup monitor ;; this cannot be run synchronously on the main loop because it does a thread-sendrecv (define (make-peer-thread el-thread ports-proc [cust (make-custodian)]) - (define-values [new-thd peer-data] + (define-values [new-thd peer-data session-key] (parameterize ([current-custodian cust]) (define-values [in out] (ports-proc)) @@ -155,19 +150,19 @@ (engine-result eng) (begin (engine-kill eng) (error "handshake timeout"))))) - (displayln "handshake complete") - (displayln (format "negotiated ~s" session-key)) - (values (thread (lambda () (peer-thread el-thread peer-data session-key in out))) - peer-data))) + peer-data + session-key))) (thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd)) ;; monitor thread -- shuts down the custodian once the peer thread is done (thread (lambda () (thread-wait new-thd) - (custodian-shutdown-all cust))) + (custodian-shutdown-all cust) + (thread-sendrecv el-thread 'deregister-channel (node-id peer-data)) + (crypto-wipe session-key))) new-thd) ;; this thread @@ -182,13 +177,16 @@ (hash-set! node-registry (node-id my-node) my-node) ;; tcp listener, if this is a server + (define listener #f) (define listener-thd #f) + (define run-comms #t) + ;; starts the tcp listener (define (handle-listen from port) (with-handlers ([exn? (lambda (ex) (thread-send from ex #f))]) (when (false? listener-thd) - (define listener (tcp-listen port 4 #t)) + (set! listener (tcp-listen port 4 #t)) (set! listener-thd (thread (lambda () @@ -222,7 +220,9 @@ ;; active, and can be destroyed with destroy-channel ;; this call is guaranteed to return within max(HANDSHAKE-TIMEOUT, system tcp connect timeout) ['connect (handle-connect from data)] - ['channel-available? (thread-send from (hash-has-key? peer-registry data) #f)] + ['channel-available? + (thread-send + from (or (= data (node-id my-node)) (hash-has-key? peer-registry data) #f))] ['get-node-info (thread-send from (hash-ref node-registry data #f) #f)] ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)] ['register-channel @@ -237,7 +237,10 @@ ['destroy-channel (match (hash-ref peer-registry data #f) [#f (void)] - [thd (kill-thread thd) (hash-remove! peer-registry data)]) + [thd (thread-send thd 'stop #f) + (sync/timeout 5 thd) + (kill-thread thd) + (hash-remove! peer-registry data)]) (thread-send from (void) #f)] ;; if this call forwards an exn:fail, it means the queue thread for this peer is dead, so you ;; need to either wait for the peer to reconnect if this is a server, or initiate a new @@ -253,7 +256,7 @@ ;; transferred, in which case you wouldn't be notified that it failed ['dispatch-msg (match-define (cons peer-id msg) data) - (if (equal? peer-id (node-id my-node)) + (if (= peer-id (node-id my-node)) (async-channel-put local-msg-channel msg) (match (hash-ref peer-registry peer-id #f) [#f (thread-send from (make-error "no such peer connection") #f)] @@ -265,13 +268,61 @@ ;; retrieves an async channel with the local received message queue ;; this can be used to fetch new messages without further interacting with the comms thread ['local-channel (thread-send from local-msg-channel #f)] + ['shutdown + (for ([(_ thd) (in-hash peer-registry)]) + (thread-send thd 'stop #f) + (sync/timeout 5 thd) + (kill-thread thd)) + (when listener-thd + (kill-thread listener-thd) + (tcp-close listener)) + (set! run-comms #f) + (thread-send from (void) #f)] [_ (thread-send from (make-error "unknown thread message type") #f)]) - (loop))) + (when run-comms + (loop)))) ;; creates the comms thread (define (make-comms my-node) (thread (lambda () (comms-event-loop my-node)))) +(define (comms-listen comms port) + (thread-sendrecv comms 'listen port)) + +(define (comms-connect comms id) + (thread-sendrecv comms 'connect id)) + +(define (comms-set-node-info comms info) + (thread-sendrecv comms 'set-node-info info)) + +(define (comms-get-node-info comms id) + (thread-sendrecv comms 'get-node-info id)) + +(define (comms-local-channel comms) + (thread-sendrecv comms 'local-channel (void))) + +(define (comms-dispatch-msg comms to-id msg) + (thread-sendrecv comms 'dispatch-msg (cons to-id msg))) + +(define (comms-channel-available? comms ch-id) + (thread-sendrecv comms 'channel-available? ch-id)) + +(define (comms-shutdown comms) + (thread-sendrecv comms 'shutdown (void))) + + +(define (comms-dispatch-msg/retry comms to-id msg [tries 3] [retry-delay 2]) + (define (do-retry ex) + (when (<= tries 0) + (raise ex)) + (sleep retry-delay) + (comms-dispatch-msg/retry comms to-id msg (sub1 tries) retry-delay)) + (with-handlers ([exn? do-retry]) + (unless (comms-channel-available? comms to-id) + (comms-connect comms to-id)) + (comms-dispatch-msg comms to-id msg))) + + ;; demo code (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") (define server-pk (crypto-sign-public-key server-sk)) @@ -288,17 +339,14 @@ (match mode ["server" (define comms (make-comms server-node)) - (thread-sendrecv comms 'set-node-info client-node) - (thread-sendrecv comms 'listen 1337) + (comms-set-node-info comms client-node) + (comms-listen comms 1337) (displayln "awaiting messages") - (displayln (async-channel-get (thread-sendrecv comms 'local-channel (void))))] + (displayln (async-channel-get (comms-local-channel comms))) + (comms-shutdown comms)] ["client" (define comms (make-comms client-node)) - (thread-sendrecv comms 'set-node-info server-node) - (displayln "connecting") - (with-handlers ([exn? (lambda (ex) (displayln "failed!!!") (displayln ex))]) - (thread-sendrecv comms 'connect 0)) - (displayln "connected") - (thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))]) - -(sleep 9999) + (comms-set-node-info comms server-node) + (displayln "sending") + (comms-dispatch-msg/retry comms 0 (msg:meow 1 "hello world")) + (comms-shutdown comms)]) diff --git a/crossfire/main.rkt b/crossfire/main.rkt index 739e74b..c500d49 100644 --- a/crossfire/main.rkt +++ b/crossfire/main.rkt @@ -42,8 +42,6 @@ (define (parse-manifest manifest-def) (struct manifest [name mode command isets pattern] #:transparent) - (define (eq/m x) (curry equal? x)) - (define-syntax (check-false stx) (syntax-case stx () [(_ what) @@ -56,14 +54,14 @@ (for/fold ([mf (manifest #f 'stdin #f builtin-isets #f)]) ([line (in-list manifest-def)]) (match line - [(list (? (eq/m 'name)) name) (struct-copy manifest mf [name name])] - [(list (? (eq/m 'mode)) mode) (struct-copy manifest mf [mode mode])] - [(list (? (eq/m 'command)) command) (struct-copy manifest mf [command command])] - [(list (? (eq/m 'iset)) name val) + [(list 'name name) (struct-copy manifest mf [name name])] + [(list 'mode mode) (struct-copy manifest mf [mode mode])] + [(list 'command command) (struct-copy manifest mf [command command])] + [(list 'iset name val) (struct-copy manifest mf [isets (hash-set (manifest-isets mf) (symbol->string name) (string->iset val))])] - [(list (? (eq/m 'pattern)) pattern) (struct-copy manifest mf [pattern pattern])]))) + [(list 'pattern pattern) (struct-copy manifest mf [pattern pattern])]))) (check-false name) (check-false command)