implement transactional rpc in comms
This commit is contained in:
parent
b036c37cba
commit
c5c7f26fe9
|
@ -16,14 +16,15 @@
|
||||||
;; You should have received a copy of the GNU Affero General Public License
|
;; You should have received a copy of the GNU Affero General Public License
|
||||||
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
|
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
(require racket/async-channel racket/bool racket/engine racket/fasl racket/list racket/match
|
(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list
|
||||||
racket/tcp
|
racket/match racket/tcp
|
||||||
"not-crypto.rkt")
|
"not-crypto.rkt")
|
||||||
|
|
||||||
;; define message types (they must all be prefab for fasl)
|
;; define message types (they must all be prefab for fasl)
|
||||||
(struct msg [from-id] #:prefab)
|
(struct msg [from-id] #:prefab)
|
||||||
(struct msg:hello msg [type pubkey] #:prefab)
|
(struct msg:hello msg [type pubkey] #:prefab)
|
||||||
(struct msg:meow msg [meow] #:prefab)
|
(struct msg:meow msg [meow] #:prefab)
|
||||||
|
(struct msg:transaction msg [trans-id request? rpc-id data] #:prefab)
|
||||||
|
|
||||||
;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy)
|
;; signed and encrypted messages (wrapped in another layer of fasl because i'm lazy)
|
||||||
(struct signed [fasl signature] #:prefab)
|
(struct signed [fasl signature] #:prefab)
|
||||||
|
@ -33,6 +34,8 @@
|
||||||
;; once the handshake is complete, the connection can stay open indefinitely with an indefinite
|
;; once the handshake is complete, the connection can stay open indefinitely with an indefinite
|
||||||
;; time between messages
|
;; time between messages
|
||||||
(define HANDSHAKE-TIMEOUT 30)
|
(define HANDSHAKE-TIMEOUT 30)
|
||||||
|
(define TRANSACTION-TIMEOUT 30)
|
||||||
|
(define SHUTDOWN-TIMEOUT 5)
|
||||||
|
|
||||||
;; node info (not all fields will always be present)
|
;; node info (not all fields will always be present)
|
||||||
;; type: 'server 'agent 'client
|
;; type: 'server 'agent 'client
|
||||||
|
@ -55,10 +58,13 @@
|
||||||
;; information about other nodes can be added after startup
|
;; information about other nodes can be added after startup
|
||||||
;; uses thread mailboxes to process and respond to messages sequentially, which might include
|
;; uses thread mailboxes to process and respond to messages sequentially, which might include
|
||||||
;; mutating internal data (hence the serialization of operations from the mailbox, to avoid races)
|
;; mutating internal data (hence the serialization of operations from the mailbox, to avoid races)
|
||||||
(define (comms-event-loop my-node)
|
(define (comms-event-loop my-node startup-thd)
|
||||||
|
;; this thread
|
||||||
|
(define el-thread (current-thread))
|
||||||
|
|
||||||
;; performs a handshake with a new peer connection
|
;; performs a handshake with a new peer connection
|
||||||
;; raises exn:fail if any part of the handshake fails
|
;; raises exn:fail if any part of the handshake fails
|
||||||
(define (peer-handshake el-thread in out)
|
(define (peer-handshake in out)
|
||||||
;; generate ephemeral keys for key exchange
|
;; generate ephemeral keys for key exchange
|
||||||
(define eph-sk (crypto-key-exchange-make-key))
|
(define eph-sk (crypto-key-exchange-make-key))
|
||||||
(define eph-pk (crypto-key-exchange-public-key eph-sk))
|
(define eph-pk (crypto-key-exchange-public-key eph-sk))
|
||||||
|
@ -94,10 +100,12 @@
|
||||||
|
|
||||||
;; encapsulates a message queue for one peer connection
|
;; encapsulates a message queue for one peer connection
|
||||||
;; sends and recieves encrypted data using the session key
|
;; sends and recieves encrypted data using the session key
|
||||||
(define (peer-thread el-thread peer-data session-key in out)
|
(define (peer-thread peer-data session-key in out)
|
||||||
|
(define run-queue #t)
|
||||||
|
|
||||||
;; handles tcp data in
|
;; handles tcp data in
|
||||||
(define (handle-in-msg)
|
(define (handle-in-msg)
|
||||||
(match (fasl->s-exp in)
|
(match (with-handlers ([exn? (lambda (_) #f)]) (fasl->s-exp in))
|
||||||
[(locked fasl nonce mac)
|
[(locked fasl nonce mac)
|
||||||
(match (crypto-unlock session-key nonce mac fasl)
|
(match (crypto-unlock session-key nonce mac fasl)
|
||||||
[#f (error "corrupted message from peer" (node-id peer-data))]
|
[#f (error "corrupted message from peer" (node-id peer-data))]
|
||||||
|
@ -108,10 +116,11 @@
|
||||||
(error "mismatched node id" (msg-from-id m) (node-id peer-data)))
|
(error "mismatched node id" (msg-from-id m) (node-id peer-data)))
|
||||||
(thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))]
|
(thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))]
|
||||||
[_ (error "invalid msg data from peer" (node-id peer-data))])])]
|
[_ (error "invalid msg data from peer" (node-id peer-data))])])]
|
||||||
|
[#f ;; likely EOF. could also be invalid fasl data that could not be deserialized
|
||||||
|
;; in either case, close the connection. there's not much else we can do
|
||||||
|
(set! run-queue #f)]
|
||||||
[_ (error "invalid data recieved from peer" (node-id peer-data))]))
|
[_ (error "invalid data recieved from peer" (node-id peer-data))]))
|
||||||
|
|
||||||
(define run-queue #t)
|
|
||||||
|
|
||||||
;; handles a thread queue message to tcp out
|
;; handles a thread queue message to tcp out
|
||||||
(define (handle-out-msg)
|
(define (handle-out-msg)
|
||||||
(match (thread-receive)
|
(match (thread-receive)
|
||||||
|
@ -136,7 +145,7 @@
|
||||||
;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake
|
;; synchronously performs ports-proc to retrieve a new set of tcp ports, and a handshake
|
||||||
;; then starts a peer thread and a peer thread cleanup monitor
|
;; then starts a peer thread and a peer thread cleanup monitor
|
||||||
;; this cannot be run synchronously on the main loop because it does a thread-sendrecv
|
;; this cannot be run synchronously on the main loop because it does a thread-sendrecv
|
||||||
(define (make-peer-thread el-thread ports-proc [cust (make-custodian)])
|
(define (make-peer-thread ports-proc [cust (make-custodian)])
|
||||||
(define-values [new-thd peer-data session-key]
|
(define-values [new-thd peer-data session-key]
|
||||||
(parameterize ([current-custodian cust])
|
(parameterize ([current-custodian cust])
|
||||||
(define-values [in out] (ports-proc))
|
(define-values [in out] (ports-proc))
|
||||||
|
@ -145,28 +154,28 @@
|
||||||
(match-define (cons peer-data session-key)
|
(match-define (cons peer-data session-key)
|
||||||
;; engines are cool
|
;; engines are cool
|
||||||
;; we use an engine to limit runtime for the handshake
|
;; we use an engine to limit runtime for the handshake
|
||||||
(let ([eng (engine (lambda (_) (peer-handshake el-thread in out)))])
|
(let ([eng (engine (lambda (_) (peer-handshake in out)))])
|
||||||
(if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng)
|
(if (engine-run (* 1000 HANDSHAKE-TIMEOUT) eng)
|
||||||
(engine-result eng)
|
(engine-result eng)
|
||||||
(begin (engine-kill eng) (error "handshake timeout")))))
|
(begin (engine-kill eng) (error "handshake timeout")))))
|
||||||
|
|
||||||
(values
|
(values
|
||||||
(thread (lambda () (peer-thread el-thread peer-data session-key in out)))
|
(thread (lambda () (peer-thread peer-data session-key in out)))
|
||||||
peer-data
|
peer-data
|
||||||
session-key)))
|
session-key)))
|
||||||
|
|
||||||
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
|
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
|
||||||
|
(displayln (list "new node connection:" (node-id peer-data)))
|
||||||
|
|
||||||
;; monitor thread -- shuts down the custodian once the peer thread is done
|
;; monitor thread -- shuts down the custodian once the peer thread is done
|
||||||
(thread (lambda ()
|
(thread (lambda ()
|
||||||
(thread-wait new-thd)
|
(thread-wait new-thd)
|
||||||
(custodian-shutdown-all cust)
|
(custodian-shutdown-all cust)
|
||||||
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data))
|
(crypto-wipe session-key)
|
||||||
(crypto-wipe session-key)))
|
(with-handlers ([exn? void])
|
||||||
|
(thread-sendrecv el-thread 'deregister-channel (node-id peer-data)))))
|
||||||
new-thd)
|
new-thd)
|
||||||
|
|
||||||
;; this thread
|
|
||||||
(define el-thread (current-thread))
|
|
||||||
;; async channel for messages sent to the local node
|
;; async channel for messages sent to the local node
|
||||||
(define local-msg-channel (make-async-channel))
|
(define local-msg-channel (make-async-channel))
|
||||||
;; map of peer channels that are currently active
|
;; map of peer channels that are currently active
|
||||||
|
@ -193,7 +202,7 @@
|
||||||
(define (cleanup) (set! listener-thd #f))
|
(define (cleanup) (set! listener-thd #f))
|
||||||
(with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
|
(with-handlers ([exn? (lambda (ex) (cleanup) (raise ex))])
|
||||||
(let loop ()
|
(let loop ()
|
||||||
(make-peer-thread el-thread (lambda () (tcp-accept listener)))
|
(make-peer-thread (lambda () (tcp-accept listener)))
|
||||||
(loop))))))
|
(loop))))))
|
||||||
(thread-send from (void) #f))))
|
(thread-send from (void) #f))))
|
||||||
|
|
||||||
|
@ -206,9 +215,13 @@
|
||||||
(match (hash-ref node-registry id #f)
|
(match (hash-ref node-registry id #f)
|
||||||
[#f (thread-send from (make-error "no such node" id) #f)]
|
[#f (thread-send from (make-error "no such node" id) #f)]
|
||||||
[(node id name type pubkey seckey host port)
|
[(node id name type pubkey seckey host port)
|
||||||
(make-peer-thread el-thread (lambda () (tcp-connect host port)))
|
(make-peer-thread (lambda () (tcp-connect host port)))
|
||||||
(thread-send from (void) #f)])))))
|
(thread-send from (void) #f)])))))
|
||||||
|
|
||||||
|
;; notify that startup is done
|
||||||
|
(thread-send startup-thd 'done #f)
|
||||||
|
(set! startup-thd #f)
|
||||||
|
|
||||||
;; main comms loop -- responds to thread mailbox items and peforms serialized data manipulation
|
;; main comms loop -- responds to thread mailbox items and peforms serialized data manipulation
|
||||||
(let loop ()
|
(let loop ()
|
||||||
(match-define (cons from (cons type data)) (thread-receive))
|
(match-define (cons from (cons type data)) (thread-receive))
|
||||||
|
@ -232,13 +245,14 @@
|
||||||
(hash-set! peer-registry peer-id thd)
|
(hash-set! peer-registry peer-id thd)
|
||||||
(thread-send from (void) #f)]
|
(thread-send from (void) #f)]
|
||||||
;; remove channel, don't kill thread
|
;; remove channel, don't kill thread
|
||||||
['deregister-channel (hash-remove! peer-registry data) (thread-send from (void) #f)]
|
['deregister-channel
|
||||||
|
(hash-remove! peer-registry data) (thread-send from (void) #f)]
|
||||||
;; remove channel, kill thread
|
;; remove channel, kill thread
|
||||||
['destroy-channel
|
['destroy-channel
|
||||||
(match (hash-ref peer-registry data #f)
|
(match (hash-ref peer-registry data #f)
|
||||||
[#f (void)]
|
[#f (void)]
|
||||||
[thd (thread-send thd 'stop #f)
|
[thd (thread-send thd 'stop #f)
|
||||||
(sync/timeout 5 thd)
|
(sync/timeout SHUTDOWN-TIMEOUT thd)
|
||||||
(kill-thread thd)
|
(kill-thread thd)
|
||||||
(hash-remove! peer-registry data)])
|
(hash-remove! peer-registry data)])
|
||||||
(thread-send from (void) #f)]
|
(thread-send from (void) #f)]
|
||||||
|
@ -256,8 +270,10 @@
|
||||||
;; transferred, in which case you wouldn't be notified that it failed
|
;; transferred, in which case you wouldn't be notified that it failed
|
||||||
['dispatch-msg
|
['dispatch-msg
|
||||||
(match-define (cons peer-id msg) data)
|
(match-define (cons peer-id msg) data)
|
||||||
|
(displayln (list "dispatch msg" peer-id msg))
|
||||||
(if (= peer-id (node-id my-node))
|
(if (= peer-id (node-id my-node))
|
||||||
(async-channel-put local-msg-channel msg)
|
(begin (async-channel-put local-msg-channel msg)
|
||||||
|
(thread-send from (void) #f))
|
||||||
(match (hash-ref peer-registry peer-id #f)
|
(match (hash-ref peer-registry peer-id #f)
|
||||||
[#f (thread-send from (make-error "no such peer connection") #f)]
|
[#f (thread-send from (make-error "no such peer connection") #f)]
|
||||||
[thd
|
[thd
|
||||||
|
@ -271,7 +287,7 @@
|
||||||
['shutdown
|
['shutdown
|
||||||
(for ([(_ thd) (in-hash peer-registry)])
|
(for ([(_ thd) (in-hash peer-registry)])
|
||||||
(thread-send thd 'stop #f)
|
(thread-send thd 'stop #f)
|
||||||
(sync/timeout 5 thd)
|
(sync/timeout SHUTDOWN-TIMEOUT thd)
|
||||||
(kill-thread thd))
|
(kill-thread thd))
|
||||||
(when listener-thd
|
(when listener-thd
|
||||||
(kill-thread listener-thd)
|
(kill-thread listener-thd)
|
||||||
|
@ -284,7 +300,10 @@
|
||||||
|
|
||||||
;; creates the comms thread
|
;; creates the comms thread
|
||||||
(define (make-comms my-node)
|
(define (make-comms my-node)
|
||||||
(thread (lambda () (comms-event-loop my-node))))
|
(define this-thread (current-thread))
|
||||||
|
(define comms (thread (lambda () (comms-event-loop my-node this-thread))))
|
||||||
|
(thread-receive) ;; wait for startup
|
||||||
|
comms)
|
||||||
|
|
||||||
(define (comms-listen comms port)
|
(define (comms-listen comms port)
|
||||||
(thread-sendrecv comms 'listen port))
|
(thread-sendrecv comms 'listen port))
|
||||||
|
@ -323,6 +342,149 @@
|
||||||
(comms-dispatch-msg comms to-id msg)))
|
(comms-dispatch-msg comms to-id msg)))
|
||||||
|
|
||||||
|
|
||||||
|
(define (transaction-manager my-node comms startup-thd)
|
||||||
|
(define run-tm #t)
|
||||||
|
(define trans-id 0)
|
||||||
|
|
||||||
|
(define local-channel (comms-local-channel comms))
|
||||||
|
(define response-table (make-hash))
|
||||||
|
(define rpc-table (make-hash))
|
||||||
|
|
||||||
|
(define tm-thread (current-thread))
|
||||||
|
(define tm-cust (make-custodian))
|
||||||
|
|
||||||
|
(define (make-trans-id)
|
||||||
|
(set! trans-id (add1 trans-id))
|
||||||
|
trans-id)
|
||||||
|
|
||||||
|
(define (cleanup)
|
||||||
|
(for ([(_ thd) (in-hash response-table)])
|
||||||
|
(thread-send thd (trans-data-serialize (make-error "transaction manager shut down")) #f))
|
||||||
|
(custodian-shutdown-all tm-cust))
|
||||||
|
|
||||||
|
(define (trans-data-serialize data)
|
||||||
|
(match data
|
||||||
|
[(? exn?)
|
||||||
|
(define-values (ty _) (struct-info data))
|
||||||
|
(define-values (name _2 _3 _4 _5 _6 _7 _8) (struct-type-info name))
|
||||||
|
`('exn ,name ,(exn-message data))]
|
||||||
|
[x x]))
|
||||||
|
|
||||||
|
(define (trans-data-deserialize data)
|
||||||
|
(match data
|
||||||
|
;; try to pretty strictly make sure this is an exn: subtype because we use eval to resolve
|
||||||
|
;; the binding and call it -- we don't want this to be used to call arbitrary functions
|
||||||
|
[(list 'exn (and (? symbol? name) (app symbol->string (pregexp #px"^exn(:.*?[^\\?])?$")))
|
||||||
|
(? string? message))
|
||||||
|
;; for good measure, create a new empty namespace for the exn type lookup
|
||||||
|
(define constructor (parameterize ([current-namespace (make-base-namespace)])
|
||||||
|
(eval name)))
|
||||||
|
(constructor message (current-continuation-marks))]
|
||||||
|
[x x]))
|
||||||
|
|
||||||
|
(define (recv-transaction from key to-id transaction)
|
||||||
|
(define (cleanup)
|
||||||
|
(thread-sendrecv tm-thread 'deregister-response key))
|
||||||
|
(with-handlers ([exn? (lambda (ex)
|
||||||
|
(cleanup) (thread-send from ex #f))])
|
||||||
|
(thread-receive) ;; go token
|
||||||
|
(displayln "sending transaction")
|
||||||
|
(comms-dispatch-msg/retry comms to-id transaction)
|
||||||
|
(match (sync/timeout TRANSACTION-TIMEOUT (thread-receive-evt))
|
||||||
|
[#f
|
||||||
|
(cleanup)
|
||||||
|
(displayln "timeout!!!")
|
||||||
|
(thread-send from (make-error "transaction timeout") #f)]
|
||||||
|
[_ (define response (thread-receive))
|
||||||
|
(displayln "got response!")
|
||||||
|
(thread-send from (trans-data-deserialize response) #f)])))
|
||||||
|
|
||||||
|
(define (send-transaction from to-id rpc-id rpc-data)
|
||||||
|
(define trans-id (make-trans-id))
|
||||||
|
(define key (cons to-id trans-id))
|
||||||
|
(define transaction (msg:transaction (node-id my-node) trans-id #t rpc-id
|
||||||
|
(trans-data-serialize rpc-data)))
|
||||||
|
(define response-thread (thread (lambda () (recv-transaction from key to-id transaction))))
|
||||||
|
(hash-set! response-table key response-thread)
|
||||||
|
(thread-send response-thread 'go))
|
||||||
|
|
||||||
|
(define (handle-incoming-transaction func msg)
|
||||||
|
(match-define (msg:transaction from-id trans-id _ rpc-id rpc-data) msg)
|
||||||
|
(displayln "handling incoming transaction")
|
||||||
|
|
||||||
|
(define (respond data)
|
||||||
|
(define resp
|
||||||
|
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
|
||||||
|
(with-handlers ([exn? (lambda (ex) (displayln "failed to dispatch transaction response"))])
|
||||||
|
(comms-dispatch-msg/retry comms from-id resp)))
|
||||||
|
|
||||||
|
(with-handlers ([exn? respond])
|
||||||
|
(define arg-data (trans-data-deserialize rpc-data))
|
||||||
|
(define result (apply func arg-data))
|
||||||
|
(displayln (list "result" result "sending back..."))
|
||||||
|
(respond result)))
|
||||||
|
|
||||||
|
(define (handle-thread-msg)
|
||||||
|
(match-define (cons from (cons type data)) (thread-receive))
|
||||||
|
(match type
|
||||||
|
['register-rpc (hash-set! rpc-table (car data) (cdr data))
|
||||||
|
(thread-send from (void) #f)]
|
||||||
|
['deregister-rpc (hash-remove! rpc-table data)
|
||||||
|
(thread-send from (void) #f)]
|
||||||
|
['deregister-response (hash-remove! response-table data) (thread-send from (void) #f)]
|
||||||
|
['transact
|
||||||
|
(match-define (cons to-id (cons rpc-id rpc-data)) data)
|
||||||
|
(send-transaction from to-id rpc-id rpc-data)]
|
||||||
|
['shutdown (set! run-tm #f) (cleanup) (thread-send from (void) #f)]
|
||||||
|
[_ (thread-send from (make-error "invalid transaction thread msg" #f))]))
|
||||||
|
|
||||||
|
(define (handle-incoming msg)
|
||||||
|
(displayln (list "incoming message!" msg))
|
||||||
|
(match msg
|
||||||
|
[(msg:transaction from-id trans-id #t rpc-id rpc-data)
|
||||||
|
(match (hash-ref rpc-table rpc-id #f)
|
||||||
|
[#f (displayln (list "got unknown rpc req" msg))]
|
||||||
|
[func (thread (lambda () (handle-incoming-transaction func msg)))])]
|
||||||
|
[(msg:transaction from-id trans-id #f rpc-id rpc-data)
|
||||||
|
(define key (cons from-id trans-id))
|
||||||
|
(match (hash-ref response-table key #f)
|
||||||
|
[#f (displayln (list "got spurious transaction response" msg))]
|
||||||
|
[thd (thread-send thd rpc-data #f) (hash-remove! response-table key)])]
|
||||||
|
[_ (displayln (list "got unknown msg" msg))]))
|
||||||
|
|
||||||
|
;; it's a thread cell and i'm too lazy to add a parameterize clause... it should work
|
||||||
|
(current-custodian tm-cust)
|
||||||
|
;; notify that startup is done
|
||||||
|
(thread-send startup-thd 'done #f)
|
||||||
|
(set! startup-thd #f)
|
||||||
|
|
||||||
|
(let loop ()
|
||||||
|
(define thd-evt (thread-receive-evt))
|
||||||
|
(match (sync thd-evt local-channel)
|
||||||
|
[(? (curry equal? thd-evt)) (handle-thread-msg)]
|
||||||
|
[x (handle-incoming x)])
|
||||||
|
(when run-tm (loop))))
|
||||||
|
|
||||||
|
(define (make-transaction-manager my-node comms)
|
||||||
|
(define this-thread (current-thread))
|
||||||
|
(define tm (thread (lambda () (transaction-manager my-node comms this-thread))))
|
||||||
|
;; wait for startup
|
||||||
|
(thread-receive)
|
||||||
|
tm)
|
||||||
|
|
||||||
|
(define (tm-register-rpc tm name func)
|
||||||
|
(thread-sendrecv tm 'register-rpc (cons name func)))
|
||||||
|
|
||||||
|
(define (tm-deregister-rpc tm name)
|
||||||
|
(thread-sendrecv tm 'deregister-rpc name))
|
||||||
|
|
||||||
|
(define (tm-transact tm to-id rpc-id rpc-data)
|
||||||
|
(thread-sendrecv tm 'transact (cons to-id (cons rpc-id rpc-data))))
|
||||||
|
|
||||||
|
(define (tm-shutdown tm)
|
||||||
|
(thread-sendrecv tm 'shutdown (void)))
|
||||||
|
|
||||||
|
|
||||||
;; demo code
|
;; demo code
|
||||||
(define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
(define server-sk #"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||||
(define server-pk (crypto-sign-public-key server-sk))
|
(define server-pk (crypto-sign-public-key server-sk))
|
||||||
|
@ -340,13 +502,24 @@
|
||||||
["server"
|
["server"
|
||||||
(define comms (make-comms server-node))
|
(define comms (make-comms server-node))
|
||||||
(comms-set-node-info comms client-node)
|
(comms-set-node-info comms client-node)
|
||||||
|
(define tm (make-transaction-manager server-node comms))
|
||||||
|
(tm-register-rpc tm 'add1 add1)
|
||||||
|
|
||||||
(comms-listen comms 1337)
|
(comms-listen comms 1337)
|
||||||
(displayln "awaiting messages")
|
|
||||||
(displayln (async-channel-get (comms-local-channel comms)))
|
(displayln "listening")
|
||||||
|
(sleep 9999)
|
||||||
|
|
||||||
|
(tm-shutdown tm)
|
||||||
(comms-shutdown comms)]
|
(comms-shutdown comms)]
|
||||||
["client"
|
["client"
|
||||||
(define comms (make-comms client-node))
|
(define comms (make-comms client-node))
|
||||||
(comms-set-node-info comms server-node)
|
(comms-set-node-info comms server-node)
|
||||||
(displayln "sending")
|
(define tm (make-transaction-manager client-node comms))
|
||||||
(comms-dispatch-msg/retry comms 0 (msg:meow 1 "hello world"))
|
|
||||||
|
(displayln "transacting...")
|
||||||
|
(displayln (tm-transact tm 0 'add1 (list 1)))
|
||||||
|
(displayln "done")
|
||||||
|
|
||||||
|
(tm-shutdown tm)
|
||||||
(comms-shutdown comms)])
|
(comms-shutdown comms)])
|
||||||
|
|
Loading…
Reference in New Issue