diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 8098509..a6cfdf0 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -16,14 +16,15 @@ ;; You should have received a copy of the GNU Affero General Public License ;; along with this program. If not, see . -(require racket/async-channel racket/bool racket/engine racket/fasl racket/list racket/match - racket/tcp +(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list + racket/match racket/tcp "not-crypto.rkt") ;; define message types (they must all be prefab for fasl) (struct msg [from-id] #:prefab) (struct msg:hello msg [type pubkey] #:prefab) (struct msg:meow msg [meow] #:prefab) +(struct msg:transaction msg [trans-id request? rpc-id data] #:prefab) ;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy) (struct signed [fasl signature] #:prefab) @@ -33,6 +34,8 @@ ;; once the handshake is complete, the connection can stay open indefinitely with an indefinite ;; time between messages (define HANDSHAKE-TIMEOUT 30) +(define TRANSACTION-TIMEOUT 30) +(define SHUTDOWN-TIMEOUT 5) ;; node info (not all fields will always be present) ;; type: 'server 'agent 'client @@ -55,10 +58,13 @@ ;; information about other nodes can be added after startup ;; uses thread mailboxes to process and respond to messages sequentially, which might include ;; mutating internal data (hence the serialization of operations from the mailbox, to avoid races) -(define (comms-event-loop my-node) +(define (comms-event-loop my-node startup-thd) + ;; this thread + (define el-thread (current-thread)) + ;; performs a handshake with a new peer connection ;; raises exn:fail if any part of the handshake fails - (define (peer-handshake el-thread in out) + (define (peer-handshake in out) ;; generate ephemeral keys for key exchange (define eph-sk (crypto-key-exchange-make-key)) (define eph-pk (crypto-key-exchange-public-key eph-sk)) @@ -94,10 +100,12 @@ ;; encapsulates a message queue for one peer connection ;; sends and recieves encrypted data using the session key - (define (peer-thread el-thread peer-data session-key in out) + (define (peer-thread peer-data session-key in out) + (define run-queue #t) + ;; handles tcp data in (define (handle-in-msg) - (match (fasl->s-exp in) + (match (with-handlers ([exn? (lambda (_) #f)]) (fasl->s-exp in)) [(locked fasl nonce mac) (match (crypto-unlock session-key nonce mac fasl) [#f (error "corrupted message from peer" (node-id peer-data))] @@ -108,10 +116,11 @@ (error "mismatched node id" (msg-from-id m) (node-id peer-data))) (thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))] [_ (error "invalid msg data from peer" (node-id peer-data))])])] + [#f ;; likely EOF. could also be invalid fasl data that could not be deserialized + ;; in either case, close the connection. there's not much else we can do + (set! run-queue #f)] [_ (error "invalid data recieved from peer" (node-id peer-data))])) - (define run-queue #t) - ;; handles a thread queue message to tcp out (define (handle-out-msg) (match (thread-receive) @@ -136,7 +145,7 @@ ;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake ;; then starts a peer thread and a peer thread cleanup monitor ;; this cannot be run synchronously on the main loop because it does a thread-sendrecv - (define (make-peer-thread el-thread ports-proc [cust (make-custodian)]) + (define (make-peer-thread ports-proc [cust (make-custodian)]) (define-values [new-thd peer-data session-key] (parameterize ([current-custodian cust]) (define-values [in out] (ports-proc)) @@ -145,28 +154,28 @@ (match-define (cons peer-data session-key) ;; engines are cool ;; we use an engine to limit runtime for the handshake - (let ([eng (engine (lambda (_) (peer-handshake el-thread in out)))]) + (let ([eng (engine (lambda (_) (peer-handshake in out)))]) (if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng) (engine-result eng) (begin (engine-kill eng) (error "handshake timeout"))))) (values - (thread (lambda () (peer-thread el-thread peer-data session-key in out))) + (thread (lambda () (peer-thread peer-data session-key in out))) peer-data session-key))) (thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd)) + (displayln (list "new node connection:" (node-id peer-data))) ;; monitor thread -- shuts down the custodian once the peer thread is done (thread (lambda () (thread-wait new-thd) (custodian-shutdown-all cust) - (thread-sendrecv el-thread 'deregister-channel (node-id peer-data)) - (crypto-wipe session-key))) + (crypto-wipe session-key) + (with-handlers ([exn? void]) + (thread-sendrecv el-thread 'deregister-channel (node-id peer-data))))) new-thd) - ;; this thread - (define el-thread (current-thread)) ;; async channel for messages sent to the local node (define local-msg-channel (make-async-channel)) ;; map of peer channels that are currently active @@ -193,7 +202,7 @@ (define (cleanup) (set! listener-thd #f)) (with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))]) (let loop () - (make-peer-thread el-thread (lambda () (tcp-accept listener))) + (make-peer-thread (lambda () (tcp-accept listener))) (loop)))))) (thread-send from (void) #f)))) @@ -206,9 +215,13 @@ (match (hash-ref node-registry id #f) [#f (thread-send from (make-error "no such node" id) #f)] [(node id name type pubkey seckey host port) - (make-peer-thread el-thread (lambda () (tcp-connect host port))) + (make-peer-thread (lambda () (tcp-connect host port))) (thread-send from (void) #f)]))))) + ;; notify that startup is done + (thread-send startup-thd 'done #f) + (set! startup-thd #f) + ;; main comms loop -- responds to thread mailbox items and peforms serialized data manipulation (let loop () (match-define (cons from (cons type data)) (thread-receive)) @@ -232,13 +245,14 @@ (hash-set! peer-registry peer-id thd) (thread-send from (void) #f)] ;; remove channel, don't kill thread - ['deregister-channel (hash-remove! peer-registry data) (thread-send from (void) #f)] + ['deregister-channel + (hash-remove! peer-registry data) (thread-send from (void) #f)] ;; remove channel, kill thread ['destroy-channel (match (hash-ref peer-registry data #f) [#f (void)] [thd (thread-send thd 'stop #f) - (sync/timeout 5 thd) + (sync/timeout SHUTDOWN-TIMEOUT thd) (kill-thread thd) (hash-remove! peer-registry data)]) (thread-send from (void) #f)] @@ -256,8 +270,10 @@ ;; transferred, in which case you wouldn't be notified that it failed ['dispatch-msg (match-define (cons peer-id msg) data) + (displayln (list "dispatch msg" peer-id msg)) (if (= peer-id (node-id my-node)) - (async-channel-put local-msg-channel msg) + (begin (async-channel-put local-msg-channel msg) + (thread-send from (void) #f)) (match (hash-ref peer-registry peer-id #f) [#f (thread-send from (make-error "no such peer connection") #f)] [thd @@ -271,7 +287,7 @@ ['shutdown (for ([(_ thd) (in-hash peer-registry)]) (thread-send thd 'stop #f) - (sync/timeout 5 thd) + (sync/timeout SHUTDOWN-TIMEOUT thd) (kill-thread thd)) (when listener-thd (kill-thread listener-thd) @@ -284,7 +300,10 @@ ;; creates the comms thread (define (make-comms my-node) - (thread (lambda () (comms-event-loop my-node)))) + (define this-thread (current-thread)) + (define comms (thread (lambda () (comms-event-loop my-node this-thread)))) + (thread-receive) ;; wait for startup + comms) (define (comms-listen comms port) (thread-sendrecv comms 'listen port)) @@ -323,6 +342,149 @@ (comms-dispatch-msg comms to-id msg))) +(define (transaction-manager my-node comms startup-thd) + (define run-tm #t) + (define trans-id 0) + + (define local-channel (comms-local-channel comms)) + (define response-table (make-hash)) + (define rpc-table (make-hash)) + + (define tm-thread (current-thread)) + (define tm-cust (make-custodian)) + + (define (make-trans-id) + (set! trans-id (add1 trans-id)) + trans-id) + + (define (cleanup) + (for ([(_ thd) (in-hash response-table)]) + (thread-send thd (trans-data-serialize (make-error "transaction manager shut down")) #f)) + (custodian-shutdown-all tm-cust)) + + (define (trans-data-serialize data) + (match data + [(? exn?) + (define-values (ty _) (struct-info data)) + (define-values (name _2 _3 _4 _5 _6 _7 _8) (struct-type-info name)) + `('exn ,name ,(exn-message data))] + [x x])) + + (define (trans-data-deserialize data) + (match data + ;; try to pretty strictly make sure this is an exn: subtype because we use eval to resolve + ;; the binding and call it -- we don't want this to be used to call arbitrary functions + [(list 'exn (and (? symbol? name) (app symbol->string (pregexp #px"^exn(:.*?[^\\?])?$"))) + (? string? message)) + ;; for good measure, create a new empty namespace for the exn type lookup + (define constructor (parameterize ([current-namespace (make-base-namespace)]) + (eval name))) + (constructor message (current-continuation-marks))] + [x x])) + + (define (recv-transaction from key to-id transaction) + (define (cleanup) + (thread-sendrecv tm-thread 'deregister-response key)) + (with-handlers ([exn? (lambda (ex) + (cleanup) (thread-send from ex #f))]) + (thread-receive) ;; go token + (displayln "sending transaction") + (comms-dispatch-msg/retry comms to-id transaction) + (match (sync/timeout TRANSACTION-TIMEOUT (thread-receive-evt)) + [#f + (cleanup) + (displayln "timeout!!!") + (thread-send from (make-error "transaction timeout") #f)] + [_ (define response (thread-receive)) + (displayln "got response!") + (thread-send from (trans-data-deserialize response) #f)]))) + + (define (send-transaction from to-id rpc-id rpc-data) + (define trans-id (make-trans-id)) + (define key (cons to-id trans-id)) + (define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id + (trans-data-serialize rpc-data))) + (define response-thread (thread (lambda () (recv-transaction from key to-id transaction)))) + (hash-set! response-table key response-thread) + (thread-send response-thread 'go)) + + (define (handle-incoming-transaction func msg) + (match-define (msg:transaction from-id trans-id _ rpc-id rpc-data) msg) + (displayln "handling incoming transaction") + + (define (respond data) + (define resp + (msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data))) + (with-handlers ([exn? (lambda (ex) (displayln "failed to dispatch transaction response"))]) + (comms-dispatch-msg/retry comms from-id resp))) + + (with-handlers ([exn? respond]) + (define arg-data (trans-data-deserialize rpc-data)) + (define result (apply func arg-data)) + (displayln (list "result" result "sending back...")) + (respond result))) + + (define (handle-thread-msg) + (match-define (cons from (cons type data)) (thread-receive)) + (match type + ['register-rpc (hash-set! rpc-table (car data) (cdr data)) + (thread-send from (void) #f)] + ['deregister-rpc (hash-remove! rpc-table data) + (thread-send from (void) #f)] + ['deregister-response (hash-remove! response-table data) (thread-send from (void) #f)] + ['transact + (match-define (cons to-id (cons rpc-id rpc-data)) data) + (send-transaction from to-id rpc-id rpc-data)] + ['shutdown (set! run-tm #f) (cleanup) (thread-send from (void) #f)] + [_ (thread-send from (make-error "invalid transaction thread msg" #f))])) + + (define (handle-incoming msg) + (displayln (list "incoming message!" msg)) + (match msg + [(msg:transaction from-id trans-id #t rpc-id rpc-data) + (match (hash-ref rpc-table rpc-id #f) + [#f (displayln (list "got unknown rpc req" msg))] + [func (thread (lambda () (handle-incoming-transaction func msg)))])] + [(msg:transaction from-id trans-id #f rpc-id rpc-data) + (define key (cons from-id trans-id)) + (match (hash-ref response-table key #f) + [#f (displayln (list "got spurious transaction response" msg))] + [thd (thread-send thd rpc-data #f) (hash-remove! response-table key)])] + [_ (displayln (list "got unknown msg" msg))])) + + ;; it's a thread cell and i'm too lazy to add a parameterize clause... it should work + (current-custodian tm-cust) + ;; notify that startup is done + (thread-send startup-thd 'done #f) + (set! startup-thd #f) + + (let loop () + (define thd-evt (thread-receive-evt)) + (match (sync thd-evt local-channel) + [(? (curry equal? thd-evt)) (handle-thread-msg)] + [x (handle-incoming x)]) + (when run-tm (loop)))) + +(define (make-transaction-manager my-node comms) + (define this-thread (current-thread)) + (define tm (thread (lambda () (transaction-manager my-node comms this-thread)))) + ;; wait for startup + (thread-receive) + tm) + +(define (tm-register-rpc tm name func) + (thread-sendrecv tm 'register-rpc (cons name func))) + +(define (tm-deregister-rpc tm name) + (thread-sendrecv tm 'deregister-rpc name)) + +(define (tm-transact tm to-id rpc-id rpc-data) + (thread-sendrecv tm 'transact (cons to-id (cons rpc-id rpc-data)))) + +(define (tm-shutdown tm) + (thread-sendrecv tm 'shutdown (void))) + + ;; demo code (define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") (define server-pk (crypto-sign-public-key server-sk)) @@ -340,13 +502,24 @@ ["server" (define comms (make-comms server-node)) (comms-set-node-info comms client-node) + (define tm (make-transaction-manager server-node comms)) + (tm-register-rpc tm 'add1 add1) + (comms-listen comms 1337) - (displayln "awaiting messages") - (displayln (async-channel-get (comms-local-channel comms))) + + (displayln "listening") + (sleep 9999) + + (tm-shutdown tm) (comms-shutdown comms)] ["client" (define comms (make-comms client-node)) (comms-set-node-info comms server-node) - (displayln "sending") - (comms-dispatch-msg/retry comms 0 (msg:meow 1 "hello world")) + (define tm (make-transaction-manager client-node comms)) + + (displayln "transacting...") + (displayln (tm-transact tm 0 'add1 (list 1))) + (displayln "done") + + (tm-shutdown tm) (comms-shutdown comms)])