From 563b5d691be318e965e8c9d769a7bab04d0d2a31 Mon Sep 17 00:00:00 2001 From: haskal Date: Sun, 8 Nov 2020 22:21:58 -0500 Subject: [PATCH] implement comms demo --- crossfire/comms.rkt | 76 +++++++++++++++++++++++++++++++++---------- crossfire/pattern.rkt | 2 ++ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/crossfire/comms.rkt b/crossfire/comms.rkt index 278971d..f8d91c4 100644 --- a/crossfire/comms.rkt +++ b/crossfire/comms.rkt @@ -16,11 +16,12 @@ ;; You should have received a copy of the GNU Affero General Public License ;; along with this program. If not, see . -(require racket/bool racket/engine racket/fasl racket/match racket/tcp +(require data/queue racket/bool racket/engine racket/fasl racket/list racket/match racket/tcp "not-crypto.rkt") (struct msg [from-id] #:prefab) (struct msg:hello msg [type pubkey] #:prefab) +(struct msg:meow msg [meow] #:prefab) (struct signed [fasl signature] #:prefab) (struct locked [fasl nonce mac] #:prefab) @@ -81,11 +82,13 @@ [(locked fasl nonce mac) (match (crypto-unlock session-key nonce mac fasl) [#f (error "corrupted message from peer" (node-id peer-data))] - [(? msg? m) - (unless (= (msg-from-id m) (node-id peer-data)) - (error "mismatched node id" (msg-from-id m) (node-id peer-data))) - (thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))] - [_ (error "invalid data from peer" (node-id peer-data))])] + [data + (match (fasl->s-exp data) + [(? msg? m) + (unless (= (msg-from-id m) (node-id peer-data)) + (error "mismatched node id" (msg-from-id m) (node-id peer-data))) + (thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))] + [_ (error "invalid msg data from peer" (node-id peer-data))])])] [_ (error "invalid data recieved from peer" (node-id peer-data))])) (define (handle-out-msg) @@ -94,7 +97,7 @@ (define fasl (s-exp->fasl m)) (define nonce (crypto-lock-make-nonce)) (define-values [ct mac] (crypto-lock session-key nonce fasl)) - (fasl->s-exp (locked ct nonce mac) out) + (s-exp->fasl (locked ct nonce mac) out) (flush-output out)] [x (error "invalid thread msg" x)])) @@ -119,6 +122,9 @@ (define el-thread (current-thread)) + (define local-msg-queue (make-queue)) + (define local-msg-waiters '()) + (define peer-waiters (make-hash)) (define peer-registry (make-hash)) (define node-registry (make-hash)) (hash-set! node-registry (node-id my-node) my-node) @@ -139,24 +145,55 @@ (match-define (cons host port) data) (make-peer-thread el-thread (lambda () (tcp-connect host port)))) + (define (handle-local-msg msg) + (if (empty? local-msg-waiters) + (enqueue! local-msg-queue msg) + (begin + (for ([waiter (in-list local-msg-waiters)]) + (thread-send waiter msg #f)) + (set! local-msg-waiters '())))) + + (define (handle-fetch-local-msg from-thd) + (if (queue-empty? local-msg-queue) + (set! local-msg-waiters (cons from-thd local-msg-waiters)) + (let ([next (dequeue! local-msg-queue)]) + (when (false? (thread-send from-thd next #f)) + (enqueue-front! local-msg-queue next))))) + + (define (handle-node-wait id thd) + (if (hash-has-key? peer-registry id) + (thread-send thd (void) #f) + (hash-update! peer-waiters 0 (lambda (v) (cons thd v)) (lambda () '())))) + (let loop () (match-define (cons from (cons type data)) (thread-receive)) (match type - ['listen (handle-listen data) (thread-send from (void))] - ['connect (handle-connect data) (thread-send from (void))] - ['get-node-info (thread-send from (hash-ref node-registry data (lambda () #f)))] - ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void))] + ['listen (handle-listen data) (thread-send from (void) #f)] + ['connect (handle-connect data) (thread-send from (void) #f)] + ['wait-for (handle-node-wait data from)] + ['get-node-info (thread-send from (hash-ref node-registry data #f) #f)] + ['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)] ['register-channel (when (hash-has-key? peer-registry data) (kill-thread (hash-ref peer-registry data))) (hash-set! peer-registry data from) - (thread-send from (void))] + (for ([thd (in-list (hash-ref peer-waiters data '()))]) + (thread-send thd (void) #f)) + (hash-remove! peer-waiters data) + (thread-send from (void) #f)] ['dispatch-msg (match-define (cons peer-id msg) data) - (match (hash-ref peer-registry peer-id) - [#f (thread-send from (make-error "no such peer connection"))] - [thd (thread-send thd data) (thread-send from (void))])] - [_ (thread-send from (make-error "unknown thread message type"))]) + (if (equal? peer-id (node-id my-node)) + (handle-local-msg msg) + (match (hash-ref peer-registry peer-id #f) + [#f (thread-send from (make-error "no such peer connection") #f)] + [thd + (thread-send + from + (thread-send thd msg (lambda () (make-error "failed to dispatch to thread"))) + #f)]))] + ['fetch-msg (handle-fetch-local-msg from)] + [_ (thread-send from (make-error "unknown thread message type") #f)]) (loop))) @@ -179,10 +216,13 @@ ["server" (define comms (make-comms server-node)) (thread-sendrecv comms 'set-node-info client-node) - (thread-sendrecv comms 'listen 1337)] + (thread-sendrecv comms 'listen 1337) + (displayln (thread-sendrecv comms 'fetch-msg (void)))] ["client" (define comms (make-comms client-node)) (thread-sendrecv comms 'set-node-info server-node) - (thread-sendrecv comms 'connect (cons "localhost" 1337))]) + (thread-sendrecv comms 'connect (cons "localhost" 1337)) + (thread-sendrecv comms 'wait-for 0) + (thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))]) (sleep 9999) diff --git a/crossfire/pattern.rkt b/crossfire/pattern.rkt index 927ae53..97af0c8 100644 --- a/crossfire/pattern.rkt +++ b/crossfire/pattern.rkt @@ -23,6 +23,8 @@ builtin-isets pattern-count pattern-start pattern-end) +;; TODO : replace with data/integer-set + ;; pattern processing ;; Iset is a listof Interval