add individual topics for logging

This commit is contained in:
xenia 2020-11-23 03:12:44 -05:00
parent 3ede25222e
commit 13e6eba161
3 changed files with 79 additions and 61 deletions

View File

@ -17,9 +17,13 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require racket/async-channel racket/bool racket/engine racket/fasl racket/function racket/list
racket/match racket/tcp racket/unit syntax/parse/define
racket/logging racket/match racket/tcp racket/unit syntax/parse/define
(for-syntax racket/base racket/list racket/syntax racket/unit racket/unit-exptime)
"not-crypto.rkt")
"logging.rkt" "not-crypto.rkt")
;; logging!
(define-logger comms #:parent global-logger)
(define-logger tm #:parent global-logger)
;; define message types (they must all be prefab for fasl)
(struct msg [from-id] #:prefab)
@ -167,7 +171,7 @@
session-key)))
(thread-sendrecv el-thread 'register-channel (cons (node-id peer-data) new-thd))
(displayln (list "new node connection:" (node-id peer-data)))
(log-comms-info "new node connection: ~a" (node-id peer-data))
;; monitor thread -- shuts down the custodian once the peer thread is done
(thread (lambda ()
@ -198,6 +202,7 @@
(with-handlers ([exn? (lambda (ex) (thread-send from ex #f))])
(when (false? listener-thd)
(set! listener (tcp-listen port 4 #t))
(log-comms-info "listening on port ~a" port)
(set! listener-thd
(thread
(lambda ()
@ -274,7 +279,6 @@
;; transferred, in which case you wouldn't be notified that it failed
['dispatch-msg
(match-define (cons peer-id msg) data)
(displayln (list "dispatch msg" peer-id msg))
(if (= peer-id (node-id my-node))
(begin (async-channel-put local-msg-channel msg)
(thread-send from (void) #f))
@ -400,15 +404,13 @@
(with-handlers ([exn? (lambda (ex)
(cleanup) (thread-send from ex #f))])
(thread-receive) ;; go token
(displayln "sending transaction")
(comms-dispatch-msg/retry comms to-id transaction)
(match (sync/timeout TRANSACTION-TIMEOUT (thread-receive-evt))
[#f
(cleanup)
(displayln "timeout!!!")
(log-tm-error "timeout sending transaction to ~a" to-id)
(thread-send from (make-error "transaction timeout") #f)]
[_ (define response (thread-receive))
(displayln "got response!")
(thread-send from (trans-data-deserialize response) #f)])))
(define (send-transaction from to-id rpc-id rpc-data)
@ -422,12 +424,13 @@
(define (handle-incoming-transaction func msg)
(match-define (msg:transaction from-id trans-id _ rpc-id rpc-data) msg)
(displayln "handling incoming transaction")
(define (respond data)
(define resp
(msg:transaction (node-id my-node) trans-id #f rpc-id (trans-data-serialize data)))
(with-handlers ([exn? (lambda (ex) (displayln "failed to dispatch transaction response"))])
(with-handlers ([exn?
(lambda (ex)
((error-display-handler) "failed to dispatch transaction response" ex))])
(comms-dispatch-msg/retry comms from-id resp)))
(with-handlers ([exn? respond])
@ -437,7 +440,6 @@
;; TODO : apply timeout on the handler function?
;; we don't want this thread to potentially hang forever if there's some sort of deadlock
(apply func arg-data)))
(displayln (list "result" result "sending back..."))
(respond result)))
(define (handle-thread-msg)
@ -455,18 +457,17 @@
[_ (thread-send from (make-error "invalid transaction thread msg" #f))]))
(define (handle-incoming msg)
(displayln (list "incoming message!" msg))
(match msg
[(msg:transaction from-id trans-id #t rpc-id rpc-data)
(match (hash-ref rpc-table rpc-id #f)
[#f (displayln (list "got unknown rpc req" msg))]
[#f (log-tm-warning "got unknown rpc req: ~a" msg)]
[func (thread (lambda () (handle-incoming-transaction func msg)))])]
[(msg:transaction from-id trans-id #f rpc-id rpc-data)
(define key (cons from-id trans-id))
(match (hash-ref response-table key #f)
[#f (displayln (list "got spurious transaction response" msg))]
[#f (log-tm-warning "got spurious transaction response: ~a" msg)]
[thd (thread-send thd rpc-data #f) (hash-remove! response-table key)])]
[_ (displayln (list "got unknown msg" msg))]))
[_ (log-tm-warning "got unknown msg: ~a" msg)]))
;; it's a thread cell and i'm too lazy to add a parameterize clause... it should work
(current-custodian tm-cust)
@ -582,7 +583,7 @@
;
; (comms-listen comms 1337)
;
; (displayln "listening")
; (log-info "listening")
; (sleep 9999)
;
; (tm-shutdown tm)
@ -592,9 +593,9 @@
; (comms-set-node-info comms server-node)
; (define tm (make-transaction-manager client-node comms))
;
; (displayln "transacting...")
; (displayln (tm-transact tm 0 'add1 (list 1)))
; (displayln "done")
; (log-info "transacting...")
; (log-info "transaction: ~a" (tm-transact tm 0 'add1 (list 1)))
; (log-info "done")
;
; (tm-shutdown tm)
; (comms-shutdown comms)])

View File

@ -18,7 +18,17 @@
(require racket/bool racket/date racket/match racket/string)
(provide install-logging!)
(provide global-logger install-logging!)
(define global-logger (make-logger))
(define (default-log-filter level topic)
(match topic
['optimizer #f]
['collapsible-contract-bailout #f]
['collapsible-value-bailout #f]
['racket/contract #f]
[else #t]))
(define (recv-thd receiver stop-chan)
;; iso8601 gang
@ -27,6 +37,7 @@
;; formats one log entry to stdout
(define (log-one entry)
(match-define (vector level msg arg topic) entry)
(when (default-log-filter level topic)
(define level-str
(match level
['fatal "FATAL"]
@ -50,15 +61,16 @@
;; prefix each line of the log entry with the metadata
(define msg-lines (string-split msg "\n"))
(for ([line (in-list msg-lines)])
(printf "~a [~aZ] [~a] ~a\n" prefix-str time-str level-str line)))
(printf "~a[~aZ] [~a] ~a\n" prefix-str time-str level-str line))))
;; process log entries until told to stop
;; this uses the same technique as with-intercepted-logging
(let loop ()
(define next (sync receiver stop-chan))
(unless (symbol=? next 'stop)
(match (sync receiver stop-chan)
['stop (void)]
[next
(log-one next)
(loop)))
(loop)]))
;; flush any remaining log entries
(let cleanup ()
@ -70,12 +82,11 @@
;; install the logging system
;; call as early as possible in the application
(define (install-logging! [level 'debug])
(define logger (make-logger))
(define recv (make-log-receiver logger level))
(define recv (make-log-receiver global-logger level))
(define stop-chan (make-channel))
(define logger-thd (thread (lambda () (recv-thd recv stop-chan))))
(current-logger logger)
(current-logger global-logger)
;; make uncaught exceptions go through the log
;; this uses the same (tbh, disgusting) trick as xrepl
@ -88,7 +99,7 @@
(define os (open-output-string))
(parameterize ([current-error-port os])
(original-handler msg ex))
(log-message logger 'fatal
(log-message global-logger 'fatal
(format "uncaught exception: ~a" (get-output-string os))
(current-continuation-marks))))

View File

@ -17,13 +17,17 @@
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(require db/base db/sqlite3
data/queue racket/bool racket/contract racket/fasl racket/file racket/list racket/match
racket/path racket/random racket/runtime-path racket/set racket/string racket/unit srfi/19
data/queue racket/bool racket/contract racket/fasl racket/file racket/list racket/logging
racket/match racket/path racket/random racket/runtime-path racket/set racket/string
racket/unit srfi/19
north/base north/adapter/base north/adapter/sqlite
"comms.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
"comms.rkt" "logging.rkt" "manifest.rkt" "not-crypto.rkt" "pattern.rkt" "protocol.rkt"
;; port-fsync
(submod "static-support.rkt" misc-calls))
;; logging
(define-logger server #:parent global-logger)
;; configuration
(define PRODUCTION? #f)
@ -61,7 +65,7 @@
(define target-revision (migration-revision (migration-most-recent base)))
(define plan (migration-plan base current-revision target-revision))
(for ([migration (in-list plan)])
(displayln (format "applying migration: ~a" (migration-revision migration)))
(log-server-info "applying migration: ~a" (migration-revision migration))
(adapter-apply! adapter (migration-revision migration) (migration-up migration)))
(void))
@ -119,7 +123,7 @@
(define existing-ids (mutable-set))
(call-with-transaction (current-db) (lambda ()
(define (cleanup id exists? path)
(displayln (format "removing corrupted/incomplete task ~a" id))
(log-server-warning "removing corrupted/incomplete task ~a" id)
(when exists? (delete-file path))
(query-exec (current-db) q-delete-task id))
(for ([(id committed) (in-query (current-db) q-get-task-id-commit)])
@ -424,7 +428,7 @@
(define (invoke/retry-forever proc)
(let init-loop ([retry-delay *min-retry-delay*])
(with-handlers ([exn? (lambda (ex)
(displayln (format "agent ~a encountered error ~a" id ex))
(log-server-error "agent ~a encountered error ~a" id ex)
(sleep retry-delay)
(init-loop (min *max-retry-delay*
(* *retry-delay-ratio* retry-delay))))])
@ -565,7 +569,7 @@
;; although ideally this case wouldn't occur because the timeout is three times the target
;; subtask duration
(hash-set! task-size taskid *min-subtask-size*)
(displayln (format "agent ~a timed out on task ~a" id taskid))
(log-server-warning "agent ~a timed out on task ~a" id taskid)
(cancel-assignment! overdue)))
;; cancel whatever the agent is currently working on, in case the server crashed and came back
@ -691,9 +695,10 @@
(require racket/cmdline)
;; TODO : read cmdline and config file
;; TODO : real logging, replace all displayln
;; initialize server
(install-logging!)
(log-server-info "initializing server")
(current-db (open-server-db 'create))
(migrate-server-db)
;; temp key
@ -718,13 +723,14 @@
;; start listening
(comms-listen (current-comms) 1337)
(displayln "server running")
(log-server-info "server running")
;; wait for break
(with-handlers ([exn? (lambda (ex) (displayln (format "encountered exception: ~a" ex)))])
(with-handlers ([exn:break? void])
(sync never-evt))
;; shutdown
(displayln "stopping server")
(log-server-info "stopping server")
(agent-handler-shutdown)
(tm-shutdown (current-tm))
(comms-shutdown (current-comms))