implement smp mode, but right now it's slower u__u
This commit is contained in:
parent
d08892d6e9
commit
f9486817ed
|
@ -184,92 +184,136 @@
|
|||
(agent-report-state aid 'error)
|
||||
(async-channel-put (current-queue) (cons 'stop aid)))
|
||||
|
||||
(with-handlers ([exn:fail? report-error])
|
||||
(define work-range (assignment-work-range assignment))
|
||||
(with-handlers ([exn:fail? report-error]
|
||||
;; in the case of a break (sent from the main thread in response to a cancel),
|
||||
;; kill this thread without reporting status
|
||||
[exn:break? (lambda (ex) (kill-thread (current-thread)))])
|
||||
(define work-range (make-integer-set (assignment-work-range assignment)))
|
||||
(define manifest (assignment-manifest assignment))
|
||||
(log-agent-info "the work for assignment ~a is ~a" aid work-range)
|
||||
(log-agent-info "the work for assignment ~a is ~a" aid (integer-set-contents work-range))
|
||||
|
||||
(define cmd (manifest-data-ref manifest 'command))
|
||||
(define num-cpus (count-cpus))
|
||||
(define smp? (first (manifest-data-ref manifest 'smp)))
|
||||
;; TODO : handle smp
|
||||
;; in non-smp mode, we just pretend there's only one cpu
|
||||
(define num-cpus (if smp? (count-cpus) 1))
|
||||
(define mode (first (manifest-data-ref manifest 'mode)))
|
||||
|
||||
(define pattern (manifest-pattern manifest))
|
||||
|
||||
(for ([interval (in-list work-range)])
|
||||
(define pp-start (resolve-pattern-pos pattern (pos->pattern-pos pattern (car interval))))
|
||||
(define pp-end (resolve-pattern-pos pattern (pos->pattern-pos pattern (cdr interval))))
|
||||
(define args (for/fold ([args '()]) ([pps (in-vector pp-start)] [ppe (in-vector pp-end)])
|
||||
;; TODO : this isn't very efficient...
|
||||
(append args (list (number->string (car pps) 16)
|
||||
(number->string (cdr pps) 16)
|
||||
(number->string (car ppe) 16)
|
||||
(number->string (cdr ppe) 16)))))
|
||||
;; split up into chunks per CPU
|
||||
(define percpu-size (quotient (integer-set-count work-range) num-cpus))
|
||||
(define pattern-ranges
|
||||
(for/fold ([ranges '()] [pr work-range] #:result (cons pr ranges))
|
||||
([i (in-range (sub1 num-cpus))])
|
||||
(define-values [pr-this pr-rest] (pattern-range-take pr percpu-size))
|
||||
(values (cons pr-this ranges) pr-rest)))
|
||||
(for ([pr (in-list pattern-ranges)] [i (in-naturals)])
|
||||
(log-agent-info "assignment ~a cpu ~a: ~a" aid i (integer-set-contents pr)))
|
||||
|
||||
(define-values [proc input-proc out]
|
||||
(parameterize ([current-custodian cust] [current-directory extract-dir])
|
||||
(match mode
|
||||
['callback
|
||||
(define-values [proc out in err]
|
||||
(apply subprocess #f #f (current-error-port) 'new (append cmd args)))
|
||||
(values proc #f out)]
|
||||
['stdio
|
||||
(define-values [ig-proc ig-out ig-in ig-err]
|
||||
(apply subprocess #f #f (current-error-port) 'new *default-cg-cmd* args))
|
||||
(define-values [proc out in err]
|
||||
(apply subprocess #f ig-out (current-error-port) 'new cmd))
|
||||
(values proc ig-proc out)])))
|
||||
(define (execute-cpu work-range)
|
||||
(for ([interval (in-list (integer-set-contents work-range))])
|
||||
(define pp-start (resolve-pattern-pos pattern (pos->pattern-pos pattern (car interval))))
|
||||
(define pp-end (resolve-pattern-pos pattern (pos->pattern-pos pattern (cdr interval))))
|
||||
(define args (for/fold ([args '()]) ([pps (in-vector pp-start)] [ppe (in-vector pp-end)])
|
||||
;; TODO : this isn't very efficient...
|
||||
(append args (list (number->string (car pps) 16)
|
||||
(number->string (cdr pps) 16)
|
||||
(number->string (car ppe) 16)
|
||||
(number->string (cdr ppe) 16)))))
|
||||
|
||||
(with-handlers ([exn:break? (lambda (_)
|
||||
(log-agent-info "killing process for ~a" aid)
|
||||
;; nicely ask the process to stop
|
||||
(subprocess-kill proc #f)
|
||||
(when input-proc
|
||||
(subprocess-kill input-proc))
|
||||
(sync/timeout *subproc-kill-delay* proc)
|
||||
;; will handle killing for us :P
|
||||
(custodian-shutdown-all cust)
|
||||
;; exit without reporting status
|
||||
(kill-thread (current-thread)))])
|
||||
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
|
||||
(define eof-e (eof-evt out))
|
||||
(let loop ([reached-eof #f] [proc-done #f])
|
||||
(match (sync/enable-break proc line-match eof-e)
|
||||
[(== proc)
|
||||
(unless reached-eof
|
||||
(loop reached-eof #t))]
|
||||
[(? eof-object?)
|
||||
(unless proc-done
|
||||
(loop #t proc-done))]
|
||||
[(list line)
|
||||
(define line-str (bytes->string/utf-8 line #\?))
|
||||
(define line-parts
|
||||
(and line-str
|
||||
(map (lambda (x) (string->number x 16))
|
||||
(string-split (string-trim line-str) " "))))
|
||||
;; check format, if it looks correct-ish then report it
|
||||
;; otherwise warn
|
||||
;; it will be #f if the line failed to decode as utf-8
|
||||
;; theoretically since we're only dealing with 0-9a-f we could also just decode as
|
||||
;; ascii but i like utf-8 so whatever potential bugs i'm introducing with this be
|
||||
;; hecked tbh
|
||||
(if (and ((listof integer?) line-parts)
|
||||
(= (length line-parts) (vector-length (manifest-pattern manifest))))
|
||||
(report-success/retry aid line-parts)
|
||||
(log-agent-warning "assignment ~a input loop got unparseable line ~s" aid line))
|
||||
(loop reached-eof proc-done)]
|
||||
[x (log-agent-warning "assignment ~a input loop got unexpected value ~a" aid x)
|
||||
(loop reached-eof proc-done)])))
|
||||
(define-values [proc input-proc out]
|
||||
(parameterize ([current-custodian cust] [current-directory extract-dir])
|
||||
(match mode
|
||||
['callback
|
||||
(define-values [proc out in err]
|
||||
(apply subprocess #f #f (current-error-port) 'new (append cmd args)))
|
||||
(values proc #f out)]
|
||||
['stdio
|
||||
(define-values [ig-proc ig-out ig-in ig-err]
|
||||
(apply subprocess #f #f (current-error-port) 'new *default-cg-cmd* args))
|
||||
(define-values [proc out in err]
|
||||
(apply subprocess #f ig-out (current-error-port) 'new cmd))
|
||||
(values proc ig-proc out)])))
|
||||
|
||||
(define errcode (subprocess-status proc))
|
||||
(log-agent-info "assignment ~a process exited with code ~a" aid errcode)
|
||||
;; report error if it's a nonzero exit code
|
||||
(unless (zero? errcode)
|
||||
(error "process exited with nonzero code" errcode))))
|
||||
(with-handlers ([exn:break? (lambda (_)
|
||||
(log-agent-info "killing process for ~a" aid)
|
||||
;; nicely ask the process to stop
|
||||
(subprocess-kill proc #f)
|
||||
(when input-proc
|
||||
(subprocess-kill input-proc))
|
||||
(sync/timeout *subproc-kill-delay* proc)
|
||||
;; exit without reporting status
|
||||
(kill-thread (current-thread)))])
|
||||
(define line-match (regexp-match-evt #px"^[^\n]*\n" out))
|
||||
(define eof-e (eof-evt out))
|
||||
(let loop ([reached-eof #f] [proc-done #f])
|
||||
(match (sync/enable-break proc line-match eof-e)
|
||||
[(== proc)
|
||||
(unless reached-eof
|
||||
(loop reached-eof #t))]
|
||||
[(? eof-object?)
|
||||
(unless proc-done
|
||||
(loop #t proc-done))]
|
||||
[(list line)
|
||||
(define line-str (bytes->string/utf-8 line #\?))
|
||||
(define line-parts
|
||||
(and line-str
|
||||
(map (lambda (x) (string->number x 16))
|
||||
(string-split (string-trim line-str) " "))))
|
||||
;; check format, if it looks correct-ish then report it
|
||||
;; otherwise warn
|
||||
;; it will be #f if the line failed to decode as utf-8
|
||||
;; theoretically since we're only dealing with 0-9a-f we could also just decode as
|
||||
;; ascii but i like utf-8 so whatever potential bugs i'm introducing with this be
|
||||
;; hecked tbh
|
||||
(if (and ((listof integer?) line-parts)
|
||||
(= (length line-parts) (vector-length (manifest-pattern manifest))))
|
||||
(report-success/retry aid line-parts)
|
||||
(log-agent-warning "assignment ~a input loop got unparseable line ~s" aid line))
|
||||
(loop reached-eof proc-done)]
|
||||
[x (log-agent-warning "assignment ~a input loop got unexpected value ~a" aid x)
|
||||
(loop reached-eof proc-done)])))
|
||||
|
||||
(cleanup)
|
||||
(void))
|
||||
(define errcode (subprocess-status proc))
|
||||
(log-agent-info "assignment ~a process exited with code ~a" aid errcode)
|
||||
;; report error if it's a nonzero exit code
|
||||
(unless (zero? errcode)
|
||||
(error "process exited with nonzero code" errcode))))
|
||||
|
||||
(define (execute-cpu-wrap work-range result-box)
|
||||
(with-handlers ([exn? (lambda (ex) (set-box! result-box ex))])
|
||||
(execute-cpu work-range)))
|
||||
|
||||
;; SMP manager thread
|
||||
(break-enabled #f)
|
||||
;; create one child thread (managing a subprocess or pair of piped subprocesses) per
|
||||
;; cpu-task-list
|
||||
(define children
|
||||
(parameterize ([current-custodian cust])
|
||||
(for/list ([i (in-naturals)] [wr (in-list pattern-ranges)])
|
||||
(define b (box #f))
|
||||
(cons (thread (lambda () (execute-cpu-wrap wr b))) b))))
|
||||
(let loop ()
|
||||
(with-handlers ([exn? (lambda (ex)
|
||||
(log-agent-error "stopping assignment ~a due to error" aid)
|
||||
;; break all children
|
||||
(map (compose break-thread car) children)
|
||||
(apply sync/timeout (+ *subproc-kill-delay* 2)
|
||||
(map car children))
|
||||
;; kill all children and subprocesses that may be remaining
|
||||
(custodian-shutdown-all)
|
||||
;; reraise
|
||||
(raise ex))])
|
||||
(apply sync/enable-break (map car children))
|
||||
(define threads-running? (ormap (compose thread-running? car) children))
|
||||
(define any-exn (ormap (compose unbox cdr) children))
|
||||
(when any-exn
|
||||
(raise any-exn))
|
||||
(when threads-running?
|
||||
(loop))))
|
||||
(break-enabled #t))
|
||||
|
||||
(cleanup)
|
||||
(void))
|
||||
|
||||
;; utils
|
||||
|
||||
|
|
Loading…
Reference in New Issue