; scheduler.scm - Basic scheduler for multithreading ; ; Copyright (c) 2000-2007, Felix L. Winkelmann ; Copyright (c) 2008, The Chicken Team ; All rights reserved. ; ; Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following ; conditions are met: ; ; Redistributions of source code must retain the above copyright notice, this list of conditions and the following ; disclaimer. ; Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following ; disclaimer in the documentation and/or other materials provided with the distribution. ; Neither the name of the author nor the names of its contributors may be used to endorse or promote ; products derived from this software without specific prior written permission. ; ; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS ; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY ; AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ; CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR ; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY ; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR ; OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE ; POSSIBILITY OF SUCH DAMAGE. (declare (fixnum) (unit scheduler) (disable-interrupts) (usual-integrations) (disable-warning var) (hide ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#waiting-queue-head ##sys#waiting-queue-tail ##sys#timeout-list ##sys#timeout-list-head ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer ##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial ##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear ##sys#fdset-select-timeout ##sys#fdset-restore ##sys#clear-i/o-state-for-thread! make-int-priority-queue-entry int-priority-queue-entry? int-priority-queue-color int-priority-queue-color-set! int-priority-queue-parent int-priority-queue-parent-set! int-priority-queue-left int-priority-queue-left-set! int-priority-queue-right int-priority-queue-right-set! int-priority-queue-index int-priority-queue-index-set! int-priority-queue-value int-priority-queue-value-set! int-priority-queue-before? int-priority-queue-match? int-priority-queue-index-before? int-priority-queue-init! int-priority-queue->rbtree int-priority-queue-lookup int-priority-queue-node-fold int-priority-queue-node-for-each int-priority-queue-node-insert! int-priority-queue-remove! int-priority-queue-reposition! int-priority-queue-empty? int-priority-queue-singleton? int-priority-queue-delete-min! int-priority-queue-delete! ##sys#fd-list-add-thread! ) (foreign-declare #< # define C_signal_interrupted_p C_mk_bool(errno == EINTR) #else # define C_signal_interrupted_p C_SCHEME_FALSE #endif # include #ifdef _WIN32 # if _MSC_VER > 1300 # include # include # else # include # endif /* Beware: winsock2.h must come BEFORE windows.h */ # define C_msleep(n) (Sleep(C_unfix(n)), C_SCHEME_TRUE) #else # include # include # include # include static C_word C_msleep(C_word ms); C_word C_msleep(C_word ms) { #ifdef __CYGWIN__ if(usleep(C_unfix(ms) * 1000) == -1) return C_SCHEME_FALSE; #else struct timespec ts; unsigned long mss = C_unfix(ms); ts.tv_sec = mss / 1000; ts.tv_nsec = (mss % 1000) * 1000000; if(nanosleep(&ts, NULL) == -1) return C_SCHEME_FALSE; #endif return C_SCHEME_TRUE; } #endif static fd_set C_fdset_input, C_fdset_output, C_fdset_input_2, C_fdset_output_2; #define C_fd_test_input(fd) C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_input)) #define C_fd_test_output(fd) C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_output)) EOF ) ) (cond-expand [paranoia] [else (declare (unsafe)) ] ) (cond-expand (hygienic-macros (define-syntax dbg (syntax-rules () ((_ . _) #f))) #;(define-syntax dbg (syntax-rules () ((_ x ...) (begin (print x ...) (flush-output (current-output-port)))))) ) (else (define-macro (dbg . args) #f) (define-macro (dbg . args) `(print "DBG: " ,@args) ) ) ) (cond-expand (rbtree (include "rbtree.scm") ;; We shall replace that with a lolevel structure once. (define-record-type (make-int-priority-queue-entry color parent left right index value) int-priority-queue-entry? (color int-priority-queue-color int-priority-queue-color-set!) (parent int-priority-queue-parent int-priority-queue-parent-set!) (left int-priority-queue-left int-priority-queue-left-set!) (right int-priority-queue-right int-priority-queue-right-set!) (index int-priority-queue-index int-priority-queue-index-set!) (value int-priority-queue-value int-priority-queue-value-set!)) (define (int-priority-queue-before? node1 node2) ;; ordering function (fx< (int-priority-queue-index node1) (int-priority-queue-index node2))) (define (int-priority-queue-match? key node) (eqv? key (int-priority-queue-index node))) (define (int-priority-queue-index-before? key node) (fx< key (int-priority-queue-index node))) (define-rbtree int-priority-queue-init! ;; defined by define-rbtree int-priority-queue->rbtree ;; defined by define-rbtree int-priority-queue-lookup ;; defined by define-rbtree int-priority-queue-node-fold ;; defined by define-rbtree int-priority-queue-node-for-each ;; defined by define-rbtree int-priority-queue-node-insert! ;; defined by define-rbtree int-priority-queue-remove! ;; defined by define-rbtree int-priority-queue-reposition! ;; defined by define-rbtree int-priority-queue-empty? ;; defined by define-rbtree int-priority-queue-singleton? ;; defined by define-rbtree int-priority-queue-match? int-priority-queue-index-before? int-priority-queue-before? int-priority-queue-color int-priority-queue-color-set! int-priority-queue-parent int-priority-queue-parent-set! int-priority-queue-left int-priority-queue-left-set! int-priority-queue-right int-priority-queue-right-set! int-priority-queue-right int-priority-queue-right-set! #f #f) (define-inline (make-queue-entry k v) (make-int-priority-queue-entry #f #f #f #f k v)) (define-inline (make-int-priority-queue) (int-priority-queue-init! (make-int-priority-queue-entry #f #f #f #f #f #f))) (define ##sys#timeout-list (make-int-priority-queue)) (define-inline (##sys#timeout-list-empty?) (int-priority-queue-empty? ##sys#timeout-list)) (define-inline (timeout-queue-next) (int-priority-queue-right ##sys#timeout-list)) (define-inline (timeout-queue-unqueue!) (int-priority-queue-remove! (timeout-queue-next))) (define-inline (timeout-queue-insert-entry! entry) (int-priority-queue-node-insert! ##sys#timeout-list entry)) (define-inline (timeout-queue-remove-entry! entry) (int-priority-queue-remove! entry)) ) ;; rbtree (else ;; Sorry for that, I don't know any better yet. (define-syntax define-macro (syntax-rules () ((_ (name . llist) body ...) (define-syntax name (lambda (x r c) (apply (lambda llist body ...) (cdr x))))) ((_ name . body) (define-syntax name (lambda (x r c) (cdr x)))))) (define-macro (define-llrbtree-code features update update+ update! init-root-node! t-lookup t-min t-fold t-for-each t-insert t-delete t-delete-min t-empty? t-k-eq? t-k- (make-int-priority-queue-entry color left right index value) int-priority-queue-entry? (color int-priority-queue-color int-priority-queue-color-set!) (left int-priority-queue-left int-priority-queue-left-set!) (right int-priority-queue-right int-priority-queue-right-set!) (index int-priority-queue-index int-priority-queue-index-set!) (value int-priority-queue-value int-priority-queue-value-set!)) (define-inline (make-queue-entry k v) (make-int-priority-queue-entry #f #f #f k v)) (define-llrbtree-code () (args #f) (args #f) ((node . args) `(let ((node ,node)) . ,(let loop ((args args)) (if (null? args) '(node) (cons (case (car args) ((color:) `(int-priority-queue-color-set! node ,(cadr args))) ((left:) `(int-priority-queue-left-set! node ,(cadr args))) ((right:) `(int-priority-queue-right-set! node ,(cadr args))) (else (error (format "unbrauchbar ~a" args)))) (loop (cddr args))))))) int-priority-queue-init! ;; defined int-priority-queue-lookup ;; defined #f ;; no min defined int-priority-queue-node-fold ;; defined int-priority-queue-node-for-each ;; defined int-priority-queue-node-insert! ;; defined int-priority-queue-node-delete! ;; delete by node defined int-priority-queue-delete-min! ;; defined int-priority-queue-empty? ;; defined ((k n) `(fx= ,k (int-priority-queue-index ,n))) ((k n) `(fx<= ,k (int-priority-queue-index ,n))) ((n1 n2) `(fx<= (int-priority-queue-index ,n1) (int-priority-queue-index ,n2))) int-priority-queue-left int-priority-queue-left-set! int-priority-queue-right int-priority-queue-right-set! int-priority-queue-color int-priority-queue-color-set! #f) #;(define-llrbtree-code (ordered debug) (args #f) (args #f) ((node . args) `(let ((node ,node)) . ,(let loop ((args args)) (if (null? args) '(node) (cons (case (car args) ((color:) `(int-priority-queue-color-set! node ,(cadr args))) ((left:) `(int-priority-queue-parent-set! node ,(cadr args))) ((right:) `(int-priority-queue-right-set! node ,(cadr args))) (else (error (format "unbrauchbar ~a" args)))) (loop (cddr args))))))) #f ;; no init defined #f ;; no lookup defined #f ;; no min defined #f ;; no fold defined #f ;; no for-each defined #f ;; no insert defined int-priority-queue-delete! ;; delete by key defined #f ;; no delete-min defined #f ;; no empty? defined ((k n) `(fx= ,k (int-priority-queue-index ,n))) ((k n) `(fx< ,k (int-priority-queue-index ,n))) ((n1 n2) `(fx< (int-priority-queue-index ,n1) (int-priority-queue-index ,n2))) int-priority-queue-left int-priority-queue-left-set! int-priority-queue-right int-priority-queue-right-set! int-priority-queue-color int-priority-queue-color-set! #f) (define ##sys#timeout-list (int-priority-queue-init! (make-queue-entry #f #f))) (define ##sys#timeout-list-head #f) (define-inline (##sys#timeout-list-empty?) (not ##sys#timeout-list-head)) (define-inline (timeout-queue-next) ##sys#timeout-list-head) (define-inline (timeout-queue-unqueue!) (set! ##sys#timeout-list-head (int-priority-queue-delete-min! ##sys#timeout-list))) (define-inline (timeout-queue-remove-entry! entry) (if (eq? ##sys#timeout-list-head entry) (timeout-queue-unqueue!) (int-priority-queue-node-delete! ##sys#timeout-list entry))) (define-inline (timeout-queue-insert-entry! entry) (cond ((not ##sys#timeout-list-head) (set! ##sys#timeout-list-head entry)) ((fx< (int-priority-queue-index entry) (int-priority-queue-index ##sys#timeout-list-head)) (int-priority-queue-node-insert! ##sys#timeout-list ##sys#timeout-list-head) (set! ##sys#timeout-list-head entry)) (else (int-priority-queue-node-insert! ##sys#timeout-list entry)))) )) (define (make-int-priority-queue) (int-priority-queue-init! (make-queue-entry #f #f))) (define ##sys#fd-list (make-int-priority-queue)) (define-inline (##sys#fd-list-empty?) (int-priority-queue-empty? ##sys#fd-list)) (cond-expand (rbtree (define-inline (fd-list-remove-entry! entry) (int-priority-queue-remove! entry))) (else (define-inline (fd-list-remove-entry! entry) (int-priority-queue-node-delete! ##sys#fd-list entry)))) (cond-expand (ready-queue-linear (define ##sys#ready-queue-head '()) (define ##sys#ready-queue-tail '()) (define (##sys#ready-queue) ##sys#ready-queue-head) (define-inline (##sys#ready-queue-empty?) (eq? '() ##sys#ready-queue-head)) (define (##sys#add-to-ready-queue thread) (##sys#setslot thread 3 'ready) (let ((new-pair (cons thread '()))) (cond ((##sys#ready-queue-empty?) (set! ##sys#ready-queue-head new-pair)) (else (set-cdr! ##sys#ready-queue-tail new-pair)) ) (set! ##sys#ready-queue-tail new-pair) ) ) (define-inline (##sys#remove-from-ready-queue) (let ((first-pair ##sys#ready-queue-head)) (and (not (null? first-pair)) (let ((first-cdr (cdr first-pair))) (set! ##sys#ready-queue-head first-cdr) (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '())) (car first-pair) ) ) ) ) (define-inline (queue-fold cns init q) (fold cns init q)) (define ##sys#waiting-queue-head '()) (define ##sys#waiting-queue-tail '()) (define (##sys#waiting-queue) ##sys#waiting-queue-head) (define-inline (##sys#waiting-queue-empty?) (eq? '() ##sys#waiting-queue-head)) (define-inline (##sys#add-to-waiting-queue thread) (##sys#setslot thread 3 'ready) (let ((new-pair (cons thread '()))) (cond ((##sys#waiting-queue-empty?) (set! ##sys#waiting-queue-head new-pair)) (else (set-cdr! ##sys#waiting-queue-tail new-pair)) ) (set! ##sys#waiting-queue-tail new-pair) ) ) (define-inline (##sys#release-waiting-queue) (set! ##sys#ready-queue-head ##sys#waiting-queue-head) (set! ##sys#ready-queue-tail ##sys#waiting-queue-tail) (set! ##sys#waiting-queue-head '()) (set! ##sys#waiting-queue-tail '())) ) (else (define-inline (make-vector-queue) '#(0 0 #(#f #f #f #f))) (define-inline (vector-queue-empty? q) (eqv? (##sys#slot q 0) (##sys#slot q 1))) (define-inline (vector-queue-size q) (let ((len (fx- (##sys#slot q 1) (##sys#slot q 0)))) (if (fx< len 0) (fx+ len (##sys#size (##sys#slot q 2))) len))) (define-inline (queue-grow! q v) (let ((v2 (##sys#make-vector (fx* (##sys#size v) 2) #f)) (first (##sys#slot q 0)) (last (##sys#slot q 1)) ) (if (fx<= first last) (do ((i 0 (fx+ i 1)) (j first (fx+ j 1))) ((fx>= j last) (##sys#setslot q 1 i)) (##sys#setslot v2 i (##sys#slot v j)) ) (let* ( (max (##sys#size v)) ) (do ((i (do ((i 0 (fx+ i 1)) (j first (fx+ j 1))) ((fx>= j max) i) (##sys#setslot v2 i (##sys#slot v j)) ) (fx+ i 1)) (j 0 (fx+ j 1))) ((fx>= j last) (##sys#setslot q 1 i)) (##sys#setslot v2 i (##sys#slot v j)) ))) (##sys#setslot q 0 0) (##sys#setslot q 2 v2) )) (define (##sys#vector-queue-add! q d) (let ((ns (add1 (vector-queue-size q))) (v (##sys#slot q 2))) (if (fx>= ns (##sys#size v)) (begin (queue-grow! q v) (##sys#vector-queue-add! q d)) (let* ((pos (##sys#slot q 1)) (next (add1 pos))) (if (eqv? next (##sys#size v)) (set! next 0)) (##sys#setslot v pos d) (##sys#setslot q 1 next))))) (define-inline (vector-queue-remove! q) (let ((pos (##sys#slot q 0))) (if (eqv? (##sys#slot q 1) pos) #f (let* ((v (##sys#slot q 2)) (d (##sys#slot v pos)) (n (add1 pos))) (##sys#setslot v pos #f) (if (eqv? n (##sys#size v)) (set! n 0)) (##sys#setslot q 0 n) d)))) (define-inline (queue-fold cns init q) (let ((first (##sys#slot q 0)) (last (##sys#slot q 1)) (v (##sys#slot q 2))) (if (fx<= first last) (let loop ((i first)) (if (eqv? i last) init (cns (##sys#slot v i) (loop (add1 i))))) (let loop ((i first)) (if (eqv? i (##sys#size v)) (let loop ((i 0)) (if (eqv? i last) init (cns (##sys#slot v i) (loop (add1 i))))) (cns (##sys#slot v i) (loop (add1 i))))) ) ) ) (define ##sys#ready-queue-head (make-vector-queue)) (define (##sys#ready-queue) ##sys#ready-queue-head) (define-inline (##sys#ready-queue-empty?) (vector-queue-empty? ##sys#ready-queue-head)) (define (##sys#add-to-ready-queue thread) (##sys#setslot thread 3 'ready) (##sys#vector-queue-add! ##sys#ready-queue-head thread) ) (define-inline (##sys#remove-from-ready-queue) (vector-queue-remove! ##sys#ready-queue-head) ) (define ##sys#waiting-queue-head (make-vector-queue)) (define (##sys#waiting-queue) ##sys#waiting-queue-head) (define-inline (##sys#waiting-queue-empty?) (vector-queue-empty? ##sys#waiting-queue-head)) (define-inline (##sys#add-to-waiting-queue thread) (##sys#setslot thread 3 'ready) (##sys#vector-queue-add! ##sys#waiting-queue-head thread)) (define-inline (##sys#release-waiting-queue) (let ((rq ##sys#ready-queue-head)) (set! ##sys#ready-queue-head ##sys#waiting-queue-head) (set! ##sys#waiting-queue-head rq))) )) (define (##sys#schedule) (let* ([ct ##sys#current-thread] [cts (##sys#slot ct 3)] ) (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head " waiting: " ##sys#waiting-queue-head) (##sys#update-thread-state-buffer ct) ;; Put current thread on ready-queue: (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not. (##sys#setislot ct 13 #f) ; clear timeout-unblock flag (##sys#add-to-waiting-queue ct) ) ;; Fetch and activate next ready thread: (let loop ([nt (##sys#remove-from-ready-queue)]) (cond [(not nt) (##sys#release-waiting-queue) ;; Unblock threads waiting for timeout: (unless (##sys#timeout-list-empty?) (if (##sys#unblock-threads-for-timeout!) (##sys#force-primordial))) ;; Unblock threads blocked by I/O: (unless (##sys#fd-list-empty?) (##sys#unblock-threads-for-i/o) ) (if (and (##sys#ready-queue-empty?) (##sys#timeout-list-empty?) (##sys#fd-list-empty?)) (##sys#signal-hook #:runtime-error "deadlock") (loop (##sys#remove-from-ready-queue))) ] [(eq? (##sys#slot nt 3) 'ready) (dbg "switching to " nt) (set! ##sys#current-thread nt) (##sys#setslot nt 3 'running) (##sys#restore-thread-state-buffer nt) (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot nt 9)) ((##sys#slot nt 1))] [else (loop (##sys#remove-from-ready-queue))] ) ) )) (define (##sys#force-primordial) (dbg "primordial thread forced due to interrupt") (##sys#thread-unblock! ##sys#primordial-thread) ) (define (##sys#update-thread-state-buffer thread) (let ([buf (##sys#slot thread 5)]) (##sys#setslot buf 0 ##sys#dynamic-winds) (##sys#setslot buf 1 ##sys#standard-input) (##sys#setslot buf 2 ##sys#standard-output) (##sys#setslot buf 3 ##sys#standard-error) (##sys#setslot buf 4 ##sys#current-exception-handler) (##sys#setslot buf 5 ##sys#current-parameter-vector) ) ) (define (##sys#restore-thread-state-buffer thread) (let ([buf (##sys#slot thread 5)]) (set! ##sys#dynamic-winds (##sys#slot buf 0)) (set! ##sys#standard-input (##sys#slot buf 1)) (set! ##sys#standard-output (##sys#slot buf 2)) (set! ##sys#standard-error (##sys#slot buf 3)) (set! ##sys#current-exception-handler (##sys#slot buf 4)) (set! ##sys#current-parameter-vector (##sys#slot buf 5)) ) ) (set! ##sys#interrupt-hook (let ([oldhook ##sys#interrupt-hook]) (lambda (reason state) (when (fx= reason 255) ; C_TIMER_INTERRUPT_NUMBER (let ([ct ##sys#current-thread]) (##sys#setslot ct 1 (lambda () (oldhook reason state))) (##sys#schedule) ) ) ; expected not to return! (oldhook reason state) ) ) ) (define (##sys#remove-from-timeout-list t) (let ((entry (##sys#slot t 4))) (when entry (timeout-queue-remove-entry! entry) (##sys#setislot t 4 #f)))) (define (##sys#thread-block-for-timeout! t tm) (dbg t " blocks for " tm) ;; It wouldn't hurt if the thread structure where prepared to be ;; moved between thread queues, however that too much of a change at ;; once. (let ((ton (make-queue-entry tm t))) (##sys#setslot t 4 ton) (timeout-queue-insert-entry! ton)) (##sys#setslot t 3 'blocked) (##sys#setislot t 13 #f) ) (define (##sys#unblock-threads-for-timeout!) (let ([now (##sys#fudge 16)]) (dbg "timeout (" now ") list: " (##sys#timeout-list-empty?)) (let loop () (unless (##sys#timeout-list-empty?) (let* ((entry (timeout-queue-next)) (tmo (int-priority-queue-index entry))) (dbg " " now " -> " tmo) (if (>= now tmo) (let ((tto (int-priority-queue-value entry))) (if (not (eq? (##sys#slot tto 4) entry)) (print "(not (eq? (##sys#slot " (##sys#slot tto 4) " 4) " entry ")) ")) (timeout-queue-unqueue!) (##sys#setislot tto 4 #f) (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout (##sys#clear-i/o-state-for-thread! tto) ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;*** (##sys#thread-basic-unblock! tto) (loop) ) )))) ;; If there are no threads blocking on a select call (fd-list) but ;; there are threads in the timeout list then sleep for the number ;; of milliseconds of next thread to wake up and return #t if ;; interupted. (and (##sys#ready-queue-empty?) (##sys#waiting-queue-empty?) (##sys#fd-list-empty?) (not (##sys#timeout-list-empty?)) (let ([tmo (int-priority-queue-index (timeout-queue-next))]) (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo now)))) (foreign-value "C_signal_interrupted_p" bool) ) ) ))) (define (##sys#thread-block-for-termination! t t2) (dbg t " blocks for " t2) (let ([state (##sys#slot t2 3)]) (unless (or (eq? state 'dead) (eq? state 'terminated)) (##sys#setslot t2 12 (cons t (##sys#slot t2 12))) (##sys#setslot t 3 'blocked) (##sys#setislot t 13 #f) (##sys#setslot t 11 t2) ) ) ) (define (##sys#thread-kill! t s) (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12)) (##sys#abandon-mutexes t) (##sys#setslot t 3 s) (##sys#setislot t 11 #f) (##sys#setislot t 8 '()) (##sys#remove-from-timeout-list t) (let ([rs (##sys#slot t 12)]) (unless (null? rs) (for-each (lambda (t2) (dbg " checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11)) (when (eq? (##sys#slot t2 11) t) (##sys#thread-basic-unblock! t2) ) ) rs) ) ) (##sys#setislot t 12 '()) ) (define (##sys#thread-basic-unblock! t) (dbg "unblocking: " t) (##sys#setislot t 11 #f) (if (##sys#slot t 4) (begin (dbg "##sys#thread-basic-unblock! timout slot is still set!") (##sys#setislot t 4 #f))) (##sys#add-to-ready-queue t) ) (define ##sys#default-exception-handler (let ([print-error-message print-error-message] [display display] [print-call-chain print-call-chain] [open-output-string open-output-string] [get-output-string get-output-string] ) (lambda (arg) (let ([ct ##sys#current-thread]) (dbg "exception: " ct " -> " (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg)) (cond [(foreign-value "C_abort_on_thread_exceptions" bool) (let* ([pt ##sys#primordial-thread] [ptx (##sys#slot pt 1)] ) (##sys#setslot pt 1 (lambda () (##sys#signal arg) (ptx) ) ) (##sys#thread-unblock! pt) ) ] [##sys#warnings-enabled (let ([o (open-output-string)]) (display "Warning (" o) (display ct o) (display "): " o) (print-error-message arg ##sys#standard-error (get-output-string o)) (print-call-chain ##sys#standard-error 0 ct) ) ] ) (##sys#setslot ct 7 arg) (##sys#thread-kill! ct 'terminated) (##sys#schedule) ) ) ) ) ;;; `select()'-based blocking: (define (##sys#empty-fd-list!) (set! ##sys#fd-list (make-int-priority-queue))) (define (##sys#fd-list-add-thread! fd t) (let ((entry (int-priority-queue-lookup ##sys#fd-list fd))) (if entry (if (not (memq t (int-priority-queue-value entry))) (int-priority-queue-value-set! entry (cons t (int-priority-queue-value entry)))) (int-priority-queue-node-insert! ##sys#fd-list (make-queue-entry fd (list t)))))) (define ##sys#fdset-select-timeout (foreign-lambda* int ([bool to] [unsigned-long tm]) "struct timeval timeout;" "timeout.tv_sec = tm / 1000;" "timeout.tv_usec = (tm % 1000) * 1000;" "C_fdset_input_2 = C_fdset_input;" "C_fdset_output_2 = C_fdset_output;" "return(select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL));") ) (define ##sys#fdset-restore (foreign-lambda* void () "C_fdset_input = C_fdset_input_2;" "C_fdset_output = C_fdset_output_2;") ) ((foreign-lambda* void () "FD_ZERO(&C_fdset_input);" "FD_ZERO(&C_fdset_output);") ) (define ##sys#fdset-input-set (foreign-lambda* void ([int fd]) "FD_SET(fd, &C_fdset_input);" ) ) (define ##sys#fdset-output-set (foreign-lambda* void ([int fd]) "FD_SET(fd, &C_fdset_output);" ) ) (define ##sys#fdset-clear (foreign-lambda* void ([int fd]) "FD_CLR(fd, &C_fdset_input_2);" "FD_CLR(fd, &C_fdset_output_2);") ) (define (##sys#thread-block-for-i/o! t fd i/o) (dbg t " blocks for I/O " fd) (##sys#fd-list-add-thread! fd t) (case i/o ((#t #:input) (##sys#fdset-input-set fd)) ((#f #:output) (##sys#fdset-output-set fd)) ((#:all) (##sys#fdset-input-set fd) (##sys#fdset-output-set fd) ) ) (##sys#setslot t 3 'blocked) (##sys#setislot t 13 #f) (##sys#setslot t 11 (cons fd i/o)) ) (define-foreign-variable error-bad-file int "(errno == EBADF)") (define (##sys#unblock-threads-for-i/o) (dbg "fd-list: " (int-priority-queue-node-fold (lambda (n i) (cons (cons (int-priority-queue-index n) (int-priority-queue-value n)) i)) '() ##sys#fd-list)) (let* ([to? (not (##sys#timeout-list-empty?))] [rq? (not (and (##sys#ready-queue-empty?) (##sys#waiting-queue-empty?)))] (nix (dbg "to? " to? " rq? " rq? " re " ##sys#ready-queue-head " w " (##sys#waiting-queue))) [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd (or rq? to?) (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait (let* ([tmo (int-priority-queue-index (timeout-queue-next))] [now (##sys#fudge 16)]) (fxmax 0 (- tmo now))) 0) ) ] ) ; otherwise immediate timeout. (dbg n " fds ready") (cond [(eq? n 0) (##sys#fdset-restore)] [(eq? -1 n) (cond (error-bad-file (let ((node (##sys#call-with-current-continuation (lambda (exit) (int-priority-queue-node-for-each (lambda (node) (define fd (int-priority-queue-index node)) (dbg "check bad " fd) (if ((foreign-lambda* bool ((integer fd)) "struct stat buf;" "int i = ( (fstat(fd, &buf) == -1 && errno == EBADF) ? 1 : 0);" "return(i);") fd) (exit node))) ##sys#fd-list) (exit #f))))) (if node (let ((fd (int-priority-queue-index node)) (ts (int-priority-queue-value node))) (dbg "bad is " fd) (##sys#fdset-clear fd) (fd-list-remove-entry! node) (let ((node (int-priority-queue-lookup ##sys#fd-list fd))) (when node (print-tree ##sys#fd-list) (int-priority-queue-delete!/traced ##sys#fd-list fd) (print-tree ##sys#fd-list))) (for-each #;(lambda (thread) (thread-signal! thread (##sys#make-structure 'condition '(exn i/o) ;; better? '(exn i/o net) (list '(exn . message) "bad file descriptor" '(exn . arguments) (list fd) '(exn . location) thread) ))) (lambda (t) (let* ((p (##sys#slot t 11)) ) (when (and (pair? p) (eq? fd (car p)) (not (##sys#slot t 13) ) ) ; not unblocked by timeout (##sys#thread-basic-unblock! t) ) )) ts)))) (##sys#fdset-restore) (##sys#unblock-threads-for-i/o)) (else (##sys#force-primordial))) ] [(fx> n 0) (for-each (lambda (e) (fd-list-remove-entry! e)) (##sys#call-with-current-continuation (lambda (exit) (int-priority-queue-node-fold (lambda (node init) (define fd (int-priority-queue-index node)) (define threads (int-priority-queue-value node)) (if (zero? n) (exit init) (let* ([inf (##core#inline "C_fd_test_input" fd)] [outf (##core#inline "C_fd_test_output" fd)] ) (dbg "fd " fd " ready: input=" inf ", output=" outf) (if (or inf outf) (begin (for-each (lambda (t) (let* ((p (##sys#slot t 11)) ) (when (and (pair? p) (eq? fd (car p)) (not (##sys#slot t 13) ) ) ; not unblocked by timeout (##sys#thread-basic-unblock! t) ) )) threads) (##sys#fdset-clear fd) (set! n (sub1 n)) (cons node init)) init)))) '() ##sys#fd-list)))) (##sys#fdset-restore) ] ) ) ) ;;; Clear I/O state for unblocked thread (define (##sys#clear-i/o-state-for-thread! t) (when (pair? (##sys#slot t 11)) (let* ((fd (##sys#slot (##sys#slot t 11) 0)) (entry (int-priority-queue-lookup ##sys#fd-list fd))) (when entry (let ((ts (##sys#delq t (int-priority-queue-value entry)))) ; remove from fd-list entry (cond ((null? ts) ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error) (##sys#fdset-clear fd) ; no more threads waiting for this fd (##sys#fdset-restore) (fd-list-remove-entry! entry)) (else (int-priority-queue-value-set! entry ts)) ) ))))) ; fd-list entry is list with t removed ;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O: ; ; (contributed by Joerg Wittenberger) (cond-expand (rbtree (define (##sys#all-threads #!optional (cns (lambda (queue arg val init) (cons val init))) (init '())) (int-priority-queue-node-fold (lambda (n i) (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i)) i (int-priority-queue-value n))) (int-priority-queue-node-fold (lambda (n i) (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i)) (queue-fold (lambda (t i) (cns 'ready #f t i)) (queue-fold (lambda (t i) (cns 'waiting #f t i)) init ##sys#waiting-queue-head) ##sys#ready-queue-head) ##sys#timeout-list) ##sys#fd-list))) (else (define (##sys#all-threads #!optional (cns (lambda (queue arg val init) (cons val init))) (init '())) (int-priority-queue-node-fold (lambda (n i) (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i)) i (int-priority-queue-value n))) (let ((r (int-priority-queue-node-fold (lambda (n i) (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i)) (queue-fold (lambda (t i) (cns 'ready #f t i)) (queue-fold (lambda (t i) (cns 'waiting #f t i)) init ##sys#waiting-queue-head) ##sys#ready-queue-head) ##sys#timeout-list))) (let ((n ##sys#timeout-list-head)) (if n (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) r) r))) ##sys#fd-list)))) ;;; Remove all waiting threads from the relevant queues with the exception of the current thread: #| (define (##sys#fetch-and-clear-threads) (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)]) (set! ##sys#ready-queue-head '()) (set! ##sys#ready-queue-tail '()) (##sys#empty-fd-list!) (set! ##sys#timeout-list '()) all) ) ;;; Restore list of waiting threads: (define (##sys#restore-threads vec) (set! ##sys#ready-queue-head (##sys#slot vec 0)) (set! ##sys#ready-queue-tail (##sys#slot vec 1)) (set! ##sys#fd-list (##sys#slot vec 2)) (set! ##sys#timeout-list (##sys#slot vec 3)) ) |# ;;; Unblock thread cleanly: (define (##sys#thread-unblock! t) (when (eq? 'blocked (##sys#slot t 3)) (##sys#remove-from-timeout-list t) (##sys#clear-i/o-state-for-thread! t) (##sys#setislot t 12 '()) (##sys#thread-basic-unblock! t) ) ) ;;; Multithreaded breakpoints (define (##sys#break-entry name args) (when (or (not ##sys#break-in-thread) (eq? ##sys#break-in-thread ##sys#current-thread)) (##sys#call-with-current-continuation (lambda (k) (let* ((pk (if (eq? ##sys#current-thread ##sys#primordial-thread) '() (list '(exn . thread) ##sys#current-thread '(exn . primordial-continuation) (lambda _ ((##sys#slot ##sys#primordial-thread 1)))))) (exn (##sys#make-structure 'condition '(exn breakpoint) (append (list '(exn . message) "*** breakpoint ***" '(exn . arguments) (cons name args) '(exn . location) name '(exn . continuation) k) pk) ) ) ) (set! ##sys#last-breakpoint exn) (cond ((eq? ##sys#current-thread ##sys#primordial-thread) (##sys#signal exn) ) (else (##sys#setslot ##sys#current-thread 3 'suspended) (##sys#setslot ##sys#current-thread 1 (lambda () (k (##core#undefined)))) (let ([old (##sys#slot ##sys#primordial-thread 1)]) (##sys#setslot ##sys#primordial-thread 1 (lambda () (##sys#signal exn) (old) ) ) (##sys#thread-unblock! ##sys#primordial-thread) (##sys#schedule) ) ) ) ) ) ) ) ) (define (##sys#break-resume exn) ;; assumes current-thread is primordial (let* ((props (##sys#slot exn 2)) (a (member '(exn . continuation) props)) (t (member '(exn . thread) props)) (pk (or (member '(exn . primordial-continuation) props) a))) (when t (let ((t (cadr t))) (if a (##sys#setslot t 1 (lambda () ((cadr a) (##core#undefined)))) (##sys#signal-hook #:type-error "condition has no continuation" exn) ) (##sys#add-to-ready-queue t) ) ) (if pk ((cadr pk) (##core#undefined)) (##sys#signal-hook #:type-error "condition has no continuation" exn) ) ) )