comment comms.rkt
This commit is contained in:
parent
9069ae496b
commit
521e986937
|
@ -20,34 +20,58 @@
|
|||
racket/tcp
|
||||
"not-crypto.rkt")
|
||||
|
||||
;; define message types (they must all be prefab for fasl)
|
||||
(struct msg [from-id] #:prefab)
|
||||
(struct msg:hello msg [type pubkey] #:prefab)
|
||||
(struct msg:meow msg [meow] #:prefab)
|
||||
|
||||
;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy)
|
||||
(struct signed [fasl signature] #:prefab)
|
||||
(struct locked [fasl nonce mac] #:prefab)
|
||||
|
||||
;; 30 second timeout to execute the handshake, or the connection will be aborted
|
||||
;; once the handshake is complete, the connection can stay open indefinitely with an indefinite
|
||||
;; time between messages
|
||||
(define HANDSHAKE-TIMEOUT 30)
|
||||
|
||||
;; node info (not all fields will always be present)
|
||||
;; type: 'server 'agent 'client
|
||||
(struct node [id name type pubkey seckey host port] #:transparent)
|
||||
|
||||
;; creates an exn:fail to be passed by thread mailbox
|
||||
(define (make-error str)
|
||||
(exn:fail (format "error: ~a" str) (current-continuation-marks)))
|
||||
|
||||
;; sends a message to the specified thread, and retrieves a response
|
||||
;; if the response is an exn, raises it
|
||||
(define (thread-sendrecv to type data)
|
||||
(thread-send to (cons (current-thread) (cons type data)))
|
||||
(match (thread-receive)
|
||||
[(? exn? e) (raise e)]
|
||||
[x x]))
|
||||
|
||||
;; runs the communications platform for a node
|
||||
;; initially, a node object describing the local node is required
|
||||
;; information about other nodes can be added after startup
|
||||
;; uses thread mailboxes to process and respond to messages sequentially, which might include
|
||||
;; mutating internal data (hence the serialization of operations from the mailbox, to avoid races)
|
||||
(define (comms-event-loop my-node)
|
||||
;; performs a handshake with a new peer connection
|
||||
;; raises exn:fail if any part of the handshake fails
|
||||
(define (peer-handshake el-thread in out)
|
||||
;; generate ephemeral keys for key exchange
|
||||
(define eph-sk (crypto-key-exchange-make-key))
|
||||
(define eph-pk (crypto-key-exchange-public-key eph-sk))
|
||||
;; create a hello msg, containing our node meta and the ephemeral public key
|
||||
(define hello-msg (s-exp->fasl (msg:hello (node-id my-node) (node-type my-node) eph-pk)))
|
||||
;; sign the hello message using our node signing key
|
||||
;; then send it
|
||||
(s-exp->fasl (signed hello-msg
|
||||
(crypto-sign (node-seckey my-node) (node-pubkey my-node) hello-msg)) out)
|
||||
(flush-output out)
|
||||
;; recieve the hello message from the other side
|
||||
(match (fasl->s-exp in)
|
||||
;; verify that it's a signed message, with the expected node key signature
|
||||
[(signed msg-signed signature)
|
||||
(define msg (fasl->s-exp msg-signed))
|
||||
(match msg
|
||||
|
@ -57,15 +81,21 @@
|
|||
(error "nonexistent peer tried to connect" from-id type pubkey))
|
||||
(unless (equal? type (node-type peer-data))
|
||||
(error "peer type mismatch" from-id type (node-type peer-data)))
|
||||
;; check signature (the important part)
|
||||
(unless (crypto-check signature (node-pubkey peer-data) msg-signed)
|
||||
(error "invalid signature received during handshake"))
|
||||
;; derive session key
|
||||
(define session-key (crypto-key-exchange eph-sk pubkey))
|
||||
;; wipe ephemeral secret key -- no need for it anymore
|
||||
(crypto-wipe eph-sk)
|
||||
(cons peer-data session-key)]
|
||||
[_ (error "invalid message type recieved during handshake")])]
|
||||
[_ (error "invalid data recieved during handshake")]))
|
||||
|
||||
;; encapsulates a message queue for one peer connection
|
||||
;; sends and recieves encrypted data using the session key
|
||||
(define (peer-thread el-thread peer-data session-key in out)
|
||||
;; handles tcp data in
|
||||
(define (handle-in-msg)
|
||||
(match (fasl->s-exp in)
|
||||
[(locked fasl nonce mac)
|
||||
|
@ -80,21 +110,26 @@
|
|||
[_ (error "invalid msg data from peer" (node-id peer-data))])])]
|
||||
[_ (error "invalid data recieved from peer" (node-id peer-data))]))
|
||||
|
||||
;; handles a thread queue message to tcp out
|
||||
(define (handle-out-msg)
|
||||
(match (thread-receive)
|
||||
[(? msg? m)
|
||||
(define fasl (s-exp->fasl m))
|
||||
;; use a random nonce each time
|
||||
;; this doesn't need to be cryptographically secure RNG but we use it anyway, can't hurt
|
||||
(define nonce (crypto-lock-make-nonce))
|
||||
(define-values [ct mac] (crypto-lock session-key nonce fasl))
|
||||
(s-exp->fasl (locked ct nonce mac) out)
|
||||
(flush-output out)]
|
||||
[x (error "invalid thread msg" x)]))
|
||||
|
||||
;; cleanup helper -- wipe the session key and deregister ourselves from the main loop
|
||||
(define (cleanup)
|
||||
(crypto-wipe session-key)
|
||||
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data))
|
||||
(void))
|
||||
|
||||
;; handle mailbox <-> tcp
|
||||
(with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
|
||||
(let loop ()
|
||||
(match (sync (thread-receive-evt) in)
|
||||
|
@ -103,6 +138,9 @@
|
|||
(loop))
|
||||
(cleanup)))
|
||||
|
||||
;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake
|
||||
;; then starts a peer thread and a peer thread cleanup monitor
|
||||
;; this cannot be run synchronously on the main loop because it does a thread-sendrecv
|
||||
(define (make-peer-thread el-thread ports-proc [cust (make-custodian)])
|
||||
(define-values [new-thd peer-data]
|
||||
(parameterize ([current-custodian cust])
|
||||
|
@ -110,6 +148,8 @@
|
|||
|
||||
;; run handshake process with a timeout
|
||||
(match-define (cons peer-data session-key)
|
||||
;; engines are cool
|
||||
;; we use an engine to limit runtime for the handshake
|
||||
(let ([eng (engine (lambda (_) (peer-handshake el-thread in out)))])
|
||||
(if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng)
|
||||
(engine-result eng)
|
||||
|
@ -124,20 +164,27 @@
|
|||
|
||||
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
|
||||
|
||||
;; monitor thread -- shuts down the custodian once the peer thread is done
|
||||
(thread (lambda ()
|
||||
(thread-wait new-thd)
|
||||
(custodian-shutdown-all cust)))
|
||||
new-thd)
|
||||
|
||||
|
||||
;; this thread
|
||||
(define el-thread (current-thread))
|
||||
;; async channel for messages sent to the local node
|
||||
(define local-msg-channel (make-async-channel))
|
||||
;; map of peer channels that are currently active
|
||||
(define peer-registry (make-hash))
|
||||
;; map of all known nodes
|
||||
(define node-registry (make-hash))
|
||||
;; insert ourselves, to start
|
||||
(hash-set! node-registry (node-id my-node) my-node)
|
||||
|
||||
;; tcp listener, if this is a server
|
||||
(define listener-thd #f)
|
||||
|
||||
;; starts the tcp listener
|
||||
(define (handle-listen from port)
|
||||
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))])
|
||||
(when (false? listener-thd)
|
||||
|
@ -152,6 +199,8 @@
|
|||
(loop))))))
|
||||
(thread-send from (void) #f))))
|
||||
|
||||
;; starts a connection and handshake process
|
||||
;; calls back to from once the handshake is complete and the queue thread is registered
|
||||
(define (handle-connect from id)
|
||||
(thread
|
||||
(lambda ()
|
||||
|
@ -162,10 +211,16 @@
|
|||
(make-peer-thread el-thread (lambda () (tcp-connect host port)))
|
||||
(thread-send from (void) #f)])))))
|
||||
|
||||
;; main comms loop -- responds to thread mailbox items and peforms serialized data manipulation
|
||||
(let loop ()
|
||||
(match-define (cons from (cons type data)) (thread-receive))
|
||||
(match type
|
||||
;; this call doesn't return until there's an active listening socket. listen errors are
|
||||
;; forwarded to the caller
|
||||
['listen (handle-listen from data)]
|
||||
;; likewise, once this call returns the peer handshake has completed and the queue thread is
|
||||
;; active, and can be destroyed with destroy-channel
|
||||
;; this call is guaranteed to return within max(HANDSHAKE-TIMEOUT, system tcp connect timeout)
|
||||
['connect (handle-connect from data)]
|
||||
['channel-available? (thread-send from (hash-has-key? peer-registry data) #f)]
|
||||
['get-node-info (thread-send from (hash-ref node-registry data #f) #f)]
|
||||
|
@ -184,6 +239,18 @@
|
|||
[#f (void)]
|
||||
[thd (kill-thread thd) (hash-remove! peer-registry data)])
|
||||
(thread-send from (void) #f)]
|
||||
;; if this call forwards an exn:fail, it means the queue thread for this peer is dead, so you
|
||||
;; need to either wait for the peer to reconnect if this is a server, or initiate a new
|
||||
;; connect call if this is a client
|
||||
;;
|
||||
;; why is this a potentially-erroring call instead of another async channel?
|
||||
;; if a sent message fails i wanted the sender to be notified as quickly as possible so the
|
||||
;; sender can implement whatever retry or drop behavior is necessary based on the priority of
|
||||
;; the message. i don't want the message to be sitting in an opaque retry queue forever
|
||||
;;
|
||||
;; note: for transactional behavior, implement a timeout. the remote node could fail to
|
||||
;; respond ever, or might decide to force close the connection right as the message is being
|
||||
;; transferred, in which case you wouldn't be notified that it failed
|
||||
['dispatch-msg
|
||||
(match-define (cons peer-id msg) data)
|
||||
(if (equal? peer-id (node-id my-node))
|
||||
|
@ -195,14 +262,17 @@
|
|||
[#f (kill-thread thd) (hash-remove! peer-registry peer-id)
|
||||
(thread-send from (make-error "failed to dispatch to thread") #f)]
|
||||
[_ (thread-send from (void) #f)])]))]
|
||||
;; retrieves an async channel with the local received message queue
|
||||
;; this can be used to fetch new messages without further interacting with the comms thread
|
||||
['local-channel (thread-send from local-msg-channel #f)]
|
||||
[_ (thread-send from (make-error "unknown thread message type") #f)])
|
||||
(loop)))
|
||||
|
||||
|
||||
;; creates the comms thread
|
||||
(define (make-comms my-node)
|
||||
(thread (lambda () (comms-event-loop my-node))))
|
||||
|
||||
;; demo code
|
||||
(define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||
(define server-pk (crypto-sign-public-key server-sk))
|
||||
(define client-sk #"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
|
||||
|
|
Loading…
Reference in New Issue