implement comms demo

This commit is contained in:
xenia 2020-11-08 22:21:58 -05:00
parent 404723946b
commit 563b5d691b
2 changed files with 60 additions and 18 deletions

View File

@ -16,11 +16,12 @@
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/bool racket/engine racket/fasl racket/match racket/tcp
(require data/queue racket/bool racket/engine racket/fasl racket/list racket/match racket/tcp
"not-crypto.rkt")
(struct msg [from-id] #:prefab)
(struct msg:hello msg [type pubkey] #:prefab)
(struct msg:meow msg [meow] #:prefab)
(struct signed [fasl signature] #:prefab)
(struct locked [fasl nonce mac] #:prefab)
@ -81,11 +82,13 @@
[(locked fasl nonce mac)
(match (crypto-unlock session-key nonce mac fasl)
[#f (error "corrupted message from peer" (node-id peer-data))]
[data
(match (fasl->s-exp data)
[(? msg? m)
(unless (= (msg-from-id m) (node-id peer-data))
(error "mismatched node id" (msg-from-id m) (node-id peer-data)))
(thread-sendrecv el-thread 'dispatch-msg (cons (node-id my-node) m))]
[_ (error "invalid data from peer" (node-id peer-data))])]
[_ (error "invalid msg data from peer" (node-id peer-data))])])]
[_ (error "invalid data recieved from peer" (node-id peer-data))]))
(define (handle-out-msg)
@ -94,7 +97,7 @@
(define fasl (s-exp->fasl m))
(define nonce (crypto-lock-make-nonce))
(define-values [ct mac] (crypto-lock session-key nonce fasl))
(fasl->s-exp (locked ct nonce mac) out)
(s-exp->fasl (locked ct nonce mac) out)
(flush-output out)]
[x (error "invalid thread msg" x)]))
@ -119,6 +122,9 @@
(define el-thread (current-thread))
(define local-msg-queue (make-queue))
(define local-msg-waiters '())
(define peer-waiters (make-hash))
(define peer-registry (make-hash))
(define node-registry (make-hash))
(hash-set! node-registry (node-id my-node) my-node)
@ -139,24 +145,55 @@
(match-define (cons host port) data)
(make-peer-thread el-thread (lambda () (tcp-connect host port))))
(define (handle-local-msg msg)
(if (empty? local-msg-waiters)
(enqueue! local-msg-queue msg)
(begin
(for ([waiter (in-list local-msg-waiters)])
(thread-send waiter msg #f))
(set! local-msg-waiters '()))))
(define (handle-fetch-local-msg from-thd)
(if (queue-empty? local-msg-queue)
(set! local-msg-waiters (cons from-thd local-msg-waiters))
(let ([next (dequeue! local-msg-queue)])
(when (false? (thread-send from-thd next #f))
(enqueue-front! local-msg-queue next)))))
(define (handle-node-wait id thd)
(if (hash-has-key? peer-registry id)
(thread-send thd (void) #f)
(hash-update! peer-waiters 0 (lambda (v) (cons thd v)) (lambda () '()))))
(let loop ()
(match-define (cons from (cons type data)) (thread-receive))
(match type
['listen (handle-listen data) (thread-send from (void))]
['connect (handle-connect data) (thread-send from (void))]
['get-node-info (thread-send from (hash-ref node-registry data (lambda () #f)))]
['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void))]
['listen (handle-listen data) (thread-send from (void) #f)]
['connect (handle-connect data) (thread-send from (void) #f)]
['wait-for (handle-node-wait data from)]
['get-node-info (thread-send from (hash-ref node-registry data #f) #f)]
['set-node-info (hash-set! node-registry (node-id data) data) (thread-send from (void) #f)]
['register-channel
(when (hash-has-key? peer-registry data)
(kill-thread (hash-ref peer-registry data)))
(hash-set! peer-registry data from)
(thread-send from (void))]
(for ([thd (in-list (hash-ref peer-waiters data '()))])
(thread-send thd (void) #f))
(hash-remove! peer-waiters data)
(thread-send from (void) #f)]
['dispatch-msg
(match-define (cons peer-id msg) data)
(match (hash-ref peer-registry peer-id)
[#f (thread-send from (make-error "no such peer connection"))]
[thd (thread-send thd data) (thread-send from (void))])]
[_ (thread-send from (make-error "unknown thread message type"))])
(if (equal? peer-id (node-id my-node))
(handle-local-msg msg)
(match (hash-ref peer-registry peer-id #f)
[#f (thread-send from (make-error "no such peer connection") #f)]
[thd
(thread-send
from
(thread-send thd msg (lambda () (make-error "failed to dispatch to thread")))
#f)]))]
['fetch-msg (handle-fetch-local-msg from)]
[_ (thread-send from (make-error "unknown thread message type") #f)])
(loop)))
@ -179,10 +216,13 @@
["server"
(define comms (make-comms server-node))
(thread-sendrecv comms 'set-node-info client-node)
(thread-sendrecv comms 'listen 1337)]
(thread-sendrecv comms 'listen 1337)
(displayln (thread-sendrecv comms 'fetch-msg (void)))]
["client"
(define comms (make-comms client-node))
(thread-sendrecv comms 'set-node-info server-node)
(thread-sendrecv comms 'connect (cons "localhost" 1337))])
(thread-sendrecv comms 'connect (cons "localhost" 1337))
(thread-sendrecv comms 'wait-for 0)
(thread-sendrecv comms 'dispatch-msg (cons 0 (msg:meow 1 "hello world")))])
(sleep 9999)

View File

@ -23,6 +23,8 @@
builtin-isets
pattern-count pattern-start pattern-end)
;; TODO : replace with data/integer-set
;; pattern processing
;; Iset is a listof Interval