chicken-users
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Chicken-users] Wish: Parallel Map


From: F. Wittenberger
Subject: Re: [Chicken-users] Wish: Parallel Map
Date: Wed, 29 Dec 2010 11:56:45 +0100

Maybe this is better for you?

Use like this:

(process-call (lambda () 7)) => 7


/Jörg

(declare
 (disable-interrupts)
 (foreign-declare #<<EOF

#include <errno.h>

EOF
)
 )

(define process-io-ports
  (let ([make-input-port make-input-port]
        [make-output-port make-output-port] 
        [make-string make-string] 
        [substring substring]
        [file-close-fd! (foreign-lambda* int ((int fd)) "return(close(fd));")]
        )
    (lambda (pid fdr fdw)
      (##sys#file-nonblocking! fdw)
      (##sys#file-nonblocking! fdr)     ; should not be required
      (let* ([buf (make-string buffer-size)]
             ;; [data (vector #f #f #f)]
             [buflen 0]
             [bufindex 0]
             [iclosed #f]
             [oclosed #f]
             [in
              (make-input-port
               (let-location
                ((again bool #f))
                (lambda ()
                  (when (and (fx>= bufindex buflen) (not iclosed))
                        (let loop ()
                          (let ([n ((foreign-lambda*
                                     int
                                     ((int fd) (pointer buf) (int s) 
((c-pointer bool) again))
                                     "int r = read(fd, buf, s); 
*again=(r==-1)&&(errno == EAGAIN);
return(r);")
                                    fdr buf buffer-size (location again))])
                            (cond
                             (again
                              (thread-wait-for-i/o! fdr #:input)
                              (loop))
                             ((eq? -1 n)
                              (set! iclosed #t)
                              (file-close-fd! fdr))
                             (else
                              (set! buflen n)
                              (set! bufindex 0))) )) )
                  (if (fx>= bufindex buflen)
                      (end-of-file)
                      (let ([c (##core#inline "C_subchar" buf bufindex)])
                        (set! bufindex (fx+ bufindex 1))
                        c) ) ))
               (lambda ()
                 (when iclosed
                   (##sys#signal-hook #:process-error "input port is
closed" fdr))
                 #t )
               (lambda ()
                 (unless iclosed
                   (set! iclosed #t)
                   (when (eq? -1 (file-close-fd! fdr))
                     (##sys#update-errno)
                     (##sys#signal-hook #:process-error 'process-io-close
                                        "can not close fd input port"
fdr) )
                   (if oclosed
                       (receive (p f s) (process-wait pid #f) s)
                       (receive (p f s) (process-wait pid #t)
                          (when (eqv? p pid)
                            (set! oclosed #t)
                            (when (eq? -1 (file-close-fd! fdw))
                               (##sys#update-errno)
                               (##sys#signal-hook #:process-error 
'process-io-close
                                                  "can not close fd output 
port" fdw) ) s)))
                    ) ) ) ]
             [out
              (let-location
               ((again bool #f))
               (make-output-port
                (lambda (s)
                  (define start-time (current-milliseconds))
                  (let loop ([len (##sys#size s)]
                             [off 0])
                    (if oclosed
                        (##sys#signal-hook
                         #:process-error 'process-io-write "fd output port is 
closed" fdw)
                        (let ([n ((foreign-lambda*
                                   int
                                   ((int fd) (pointer buf) (int off) (int 
count) ((c-pointer bool)
again))
                                   "int r = write(fd, ((char*)buf)+off, count);
*again=(r==-1)&&(errno == EAGAIN); return(r);")
                                  fdw s off len (location again))])
                          (cond [again
                                 ;; (thread-wait-for-i/o! fdw #f)
                                 (thread-wait-for-i/o! fdw #:output)
                                 (loop len off)]
                                [(eq? -1 n)
                                 (##sys#update-errno)
                                 (##sys#signal-hook
                                  #:process-error 'process-io-write
                                  "can not write to fd" fdw len strerror) ]
                                [(fx< n len)
                                 (loop (fx- len n) (fx+ off n)) ] )
                          (process-io-update-bandwith! (##sys#size s) 
start-time) ))))
                (lambda ()
                  (unless oclosed
                          (set! oclosed #t)
                          (when (eq? -1 (file-close-fd! fdw))
                                (##sys#update-errno)
                                (##sys#signal-hook #:process-error 
'process-io-close
                                                   "can not close fd output 
port" fdw) )
                          (if iclosed
                              (receive (p f s) (process-wait pid #f) s)
                              (receive (p f s) (process-wait pid #t)
                                       (when (eqv? p pid)
                                             (set! iclosed #t)
                                             (when (eq? -1 (file-close-fd! fdr))
                                                   (##sys#update-errno)
                                                   (##sys#signal-hook 
#:process-error 'process-io-close
                                                                      "can not 
close fd input port" fdr) ) s)))) ))) ] )
        (values in out) ) ) ) )

(define (process-call thunk)
  (receive
   (pid out in)
   (receive (tr tw) (create-pipe)
            (receive (fr fw) (create-pipe)
                     (gc #f)
                     (let ((pid (process-fork)))
                       (if (eqv? pid 0)
                           (guard
                            (ex (else (_exit 1)))
                            (receive
                             (in out)
                             (##process#io-ports (current-process-id) tr fw)
                             (write (thunk) out)
                             (close-output-port out)
                             (close-input-port in)
                             (_exit 0)))
                           (begin
                             (file-close tr) (file-close fw)
                             (receive (in out) (##process#io-ports pid fr tw)
                                      (values pid out in)))))))
   (let ((result (read in)))
     (close-output-port out)
     (close-input-port in)
     result)))





reply via email to

[Prev in Thread] Current Thread [Next in Thread]