[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Chicken-users] Wish: Parallel Map
From: |
F. Wittenberger |
Subject: |
Re: [Chicken-users] Wish: Parallel Map |
Date: |
Wed, 29 Dec 2010 11:56:45 +0100 |
Maybe this is better for you?
Use like this:
(process-call (lambda () 7)) => 7
/Jörg
(declare
(disable-interrupts)
(foreign-declare #<<EOF
#include <errno.h>
EOF
)
)
(define process-io-ports
(let ([make-input-port make-input-port]
[make-output-port make-output-port]
[make-string make-string]
[substring substring]
[file-close-fd! (foreign-lambda* int ((int fd)) "return(close(fd));")]
)
(lambda (pid fdr fdw)
(##sys#file-nonblocking! fdw)
(##sys#file-nonblocking! fdr) ; should not be required
(let* ([buf (make-string buffer-size)]
;; [data (vector #f #f #f)]
[buflen 0]
[bufindex 0]
[iclosed #f]
[oclosed #f]
[in
(make-input-port
(let-location
((again bool #f))
(lambda ()
(when (and (fx>= bufindex buflen) (not iclosed))
(let loop ()
(let ([n ((foreign-lambda*
int
((int fd) (pointer buf) (int s)
((c-pointer bool) again))
"int r = read(fd, buf, s);
*again=(r==-1)&&(errno == EAGAIN);
return(r);")
fdr buf buffer-size (location again))])
(cond
(again
(thread-wait-for-i/o! fdr #:input)
(loop))
((eq? -1 n)
(set! iclosed #t)
(file-close-fd! fdr))
(else
(set! buflen n)
(set! bufindex 0))) )) )
(if (fx>= bufindex buflen)
(end-of-file)
(let ([c (##core#inline "C_subchar" buf bufindex)])
(set! bufindex (fx+ bufindex 1))
c) ) ))
(lambda ()
(when iclosed
(##sys#signal-hook #:process-error "input port is
closed" fdr))
#t )
(lambda ()
(unless iclosed
(set! iclosed #t)
(when (eq? -1 (file-close-fd! fdr))
(##sys#update-errno)
(##sys#signal-hook #:process-error 'process-io-close
"can not close fd input port"
fdr) )
(if oclosed
(receive (p f s) (process-wait pid #f) s)
(receive (p f s) (process-wait pid #t)
(when (eqv? p pid)
(set! oclosed #t)
(when (eq? -1 (file-close-fd! fdw))
(##sys#update-errno)
(##sys#signal-hook #:process-error
'process-io-close
"can not close fd output
port" fdw) ) s)))
) ) ) ]
[out
(let-location
((again bool #f))
(make-output-port
(lambda (s)
(define start-time (current-milliseconds))
(let loop ([len (##sys#size s)]
[off 0])
(if oclosed
(##sys#signal-hook
#:process-error 'process-io-write "fd output port is
closed" fdw)
(let ([n ((foreign-lambda*
int
((int fd) (pointer buf) (int off) (int
count) ((c-pointer bool)
again))
"int r = write(fd, ((char*)buf)+off, count);
*again=(r==-1)&&(errno == EAGAIN); return(r);")
fdw s off len (location again))])
(cond [again
;; (thread-wait-for-i/o! fdw #f)
(thread-wait-for-i/o! fdw #:output)
(loop len off)]
[(eq? -1 n)
(##sys#update-errno)
(##sys#signal-hook
#:process-error 'process-io-write
"can not write to fd" fdw len strerror) ]
[(fx< n len)
(loop (fx- len n) (fx+ off n)) ] )
(process-io-update-bandwith! (##sys#size s)
start-time) ))))
(lambda ()
(unless oclosed
(set! oclosed #t)
(when (eq? -1 (file-close-fd! fdw))
(##sys#update-errno)
(##sys#signal-hook #:process-error
'process-io-close
"can not close fd output
port" fdw) )
(if iclosed
(receive (p f s) (process-wait pid #f) s)
(receive (p f s) (process-wait pid #t)
(when (eqv? p pid)
(set! iclosed #t)
(when (eq? -1 (file-close-fd! fdr))
(##sys#update-errno)
(##sys#signal-hook
#:process-error 'process-io-close
"can not
close fd input port" fdr) ) s)))) ))) ] )
(values in out) ) ) ) )
(define (process-call thunk)
(receive
(pid out in)
(receive (tr tw) (create-pipe)
(receive (fr fw) (create-pipe)
(gc #f)
(let ((pid (process-fork)))
(if (eqv? pid 0)
(guard
(ex (else (_exit 1)))
(receive
(in out)
(##process#io-ports (current-process-id) tr fw)
(write (thunk) out)
(close-output-port out)
(close-input-port in)
(_exit 0)))
(begin
(file-close tr) (file-close fw)
(receive (in out) (##process#io-ports pid fr tw)
(values pid out in)))))))
(let ((result (read in)))
(close-output-port out)
(close-input-port in)
result)))