From: Paolo Bonzini
Subject: [Help-smalltalk] [PATCH] More zlib...
Date: Fri, 18 May 2007 16:35:57 +0200
User-agent: Thunderbird (Macintosh/20070326)

One more patch for today, allowing the DeflateStream to decorate not only a ReadStream but also a WriteStream. To do this, I just added a generic class to turn a ReadStream decorator into a WriteStream decorator (the PipeStream).

More practically, this means that you can do:

   fs := FileStream open: 'foo.gz' mode: 'w'.
   gzipStream := GZipDeflateStream compressingTo: fs.
   Object fileOutOn: gzipStream.
   gzipStream close.
   fs close.

* looking for address@hidden/smalltalk--devo--2.2--patch-324 to compare with
* comparing to address@hidden/smalltalk--devo--2.2--patch-324
M  examples/
M  ChangeLog

* modified files

2007-05-18  Paolo Bonzini  <address@hidden>

        * kernel/ New.
        * kernel/ Use it.

--- orig/examples/
+++ mod/examples/
@@ -133,6 +133,13 @@ testError
     ^[ (InflateStream on: #[12 34 56] readStream) contents. false ]
        on: ZlibError do: [ :ex | ex return: true ]!
+    "Test the WriteStream version of DeflateStream."
+    | dest |
+    dest := DeflateStream compressingTo: String new writeStream.
+    dest nextPutAll: self testVector.
+    ^dest contents asByteArray = self doDeflate asByteArray!
     "Test connecting a DeflateStream back-to-back with an InflateStream."
     | deflate |
@@ -196,6 +203,7 @@ runTests
     self testDirect printNl.
     self testRaw printNl.
     self testGZip printNl.
+    self testWrite printNl.
     self bufferSize: oldBufSize! !
@@ -349,6 +357,16 @@ defaultCompressionLevel: anInteger
 !RawDeflateStream class methodsFor: 'instance creation'!
+compressingTo: aStream
+    "Answer a stream that receives data via #nextPut: and compresses it onto
+     aStream."
+    ^PipeStream connectedTo: aStream via: [ :r | self on: r ]!
+compressingTo: aStream level: level
+    "Answer a stream that receives data via #nextPut: and compresses it onto
+     aStream with the given compression level."
+    ^PipeStream connectedTo: aStream via: [ :r | self on: r level: level ]!
 on: aStream
     "Answer a stream that compresses the data in aStream with the default
      compression level."
@@ -405,7 +423,6 @@ position: anInteger
     "Reset the stream to the beginning of the compressed data."
-     input stream or skipping compressed data."
     source reset.
     self destroyZlibObject; initializeZlibObject.
     self resetBuffer!
@@ -416,7 +433,7 @@ copyFrom: start to: end
      unlike the one in Collection, because a Stream's #position method
      returns 0-based values.  Notice that this class can only provide
      the illusion of random access, by appropriately rewinding the input
-     stream or skipping compressed data.""
+     stream or skipping compressed data."
     | pos |
     pos := self position.
     ^[ self position: start; next: end - start ]

--- orig/
+++ mod/
@@ -288,10 +288,12 @@
+  <filein></filein>
+  <file></file>

--- /dev/null
+++ mod/examples/
@@ -0,0 +1,234 @@
+|   PipeStream class (part of the ZLib bindings)
+ ======================================================================"
+| Copyright 2007 Free Software Foundation, Inc.
+| Written by Paolo Bonzini
+| This file is part of GNU Smalltalk.
+| GNU Smalltalk is free software; you can redistribute it and/or modify it
+| under the terms of the GNU General Public License as published by the Free
+| Software Foundation; either version 2, or (at your option) any later version.
+| GNU Smalltalk is distributed in the hope that it will be useful, but WITHOUT
+| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+| FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+| details.
+| You should have received a copy of the GNU General Public License along with
+| GNU Smalltalk; see the file COPYING.  If not, write to the Free Software
+| Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ ======================================================================"
+PositionableStream subclass: #PipeStream
+       instanceVariableNames: 'full data empty contents'
+       classVariableNames: 'BufferSize'
+       poolDictionaries: ''
+       category: 'Examples-Processes'!
+PipeStream comment:
+'Used internally by the zlib bindings, the PipeStream provides two
+pieces of functionality.  The first is to provide a dual-ended FIFO
+stream, which can be read and written by independent processes.  The
+second is to provide a WriteStream-to-ReadStream adaptor, where the
+data is written to the PipeStream (the writing side), fueled to
+an object expecting a ReadStream (possibly as a decorator), and taken
+from there into the destination stream.  The effect is to turn a
+ReadStream decorator into a WriteStream decorator.'!
+!PipeStream class methodsFor: 'accessing'!
+    "Answer the size of the output buffers that are passed to zlib.  Each
+     zlib stream uses a buffer of this size."
+    BufferSize isNil ifTrue: [ BufferSize := 512 ].
+    ^BufferSize!
+bufferSize: anInteger
+    "Set the size of the output buffers that are passed to zlib.  Each
+     zlib stream uses a buffer of this size."
+    BufferSize := anInteger!
+!PipeStream class methodsFor: 'instance creation'!
+on: aCollection 
+    "Answer a new stream using aCollection as its buffer."
+    aCollection size = 0 ifTrue: [ self halt ].
+    ^self basicNew initCollection: aCollection!
+connectedTo: writeStream via: aBlock
+    "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+     The pipe is passed to the 1-parameter block aBlock, which should use
+     the pipe as a ReadStream and return another ReadStream.  The data that
+     will be written to the pipe will go through the return value of aBlock,
+     and then written to aStream.
+     Example:
+       dest := PipeStream on: fileStream via: [ :r | DeflateStream on: r ].
+       dest next: 100 put: $A."
+    ^(self on: (writeStream species new: self bufferSize))
+       connectTo: writeStream via: aBlock;
+       yourself!
+on: aCollection via: aBlock
+    "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+     The pipe is passed to the 1-parameter block aBlock, which should use
+     the pipe as a ReadStream and return another ReadStream.  The data that
+     will be written to the pipe will be placed into aCollection, and can
+     be retrieved using the #contents method of the PipeStream.
+     Example:
+       dest := PipeStream on: String new via: [ :r | DeflateStream on: r ].
+       dest next: 100 put: $A.
+        dest contents printNl"
+    ^self connectedTo: aCollection writeStream via: aBlock!
+!PipeStream methodsFor: 'instance creation'!
+    "Close the pipe, causing all blocked reads and writes to terminate
+     immediately."
+    | sema |
+    sema := full.
+    full := nil.
+    sema notifyAll.
+    sema := empty.
+    empty := nil.
+    sema notifyAll.
+    sema := data.
+    data := nil.
+    sema notifyAll!
+    "Answer whether the communication channel has been closed."
+    ^full isNil!
+    "Answer whether the communication channel is still open."
+    ^full notNil!
+    "Answer whether the communication channel is closed and there is no
+     data in the buffer."
+    ^super atEnd and: [ self notConnected ]!
+    "Answer whether there is data in the buffer."
+    ^super atEnd!
+    "Answer whether there is room in the buffer."
+    ^endPtr = collection size!
+    "Retrieve the next byte of data from the pipe, blocking if there is none."
+    | result |
+    [ self isEmpty ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       data wait ].
+    result := super next.
+    empty notifyAll.
+    ^result!
+    "Retrieve the next byte of data from the pipe, without gobbling it and
+     blocking if there is none."
+    [ self isEmpty ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       data wait ].
+    ^super peek!
+nextPut: anObject
+    "Put anObject in the pipe, blocking if it is full."
+    [ self isFull ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       empty wait ].
+    endPtr := endPtr + 1.
+    collection at: endPtr put: anObject.
+    data notifyAll.
+    self isFull ifTrue: [ full notifyAll ].
+    ^anObject!
+    "Return a buffer worth of data, blocking until it is full or the pipe
+     is closed."
+    [ self isEmpty and: [ self isConnected ] ] whileTrue: [ full wait ].
+    "Here, the buffer is full and all writers are locked, so there is no
+     contention between the writer and the reader."
+    ^self bufferContents!
+    "Close the channel and return the full contents of the stream.  For
+     pipes created with #on:, #contents closes the stream and returns the
+     leftover contents of buffer."
+    self close.
+    ^contents isNil
+       ifTrue: [ self bufferContents ]
+        ifFalse: [ contents value value ]!
+    "Close the channel and return a readStream on the full contents of
+     the stream.  For pipes created with #on:, the stream is created on the
+     leftover contents of buffer."
+    ^self contents readStream!
+    "Drop all data currently in the buffer.  This should not be used
+     concurrently with other next or nextPut: operations."
+    endPtr := 0.
+    ptr := 1.
+    empty notifyAll!
+!PipeStream methodsFor: 'private methods'!
+    "Return the current contents of the buffer and empty it.  This is private
+     because it requires a lock even in presence of a single reader and a 
+     writer."
+    | result |
+    result := collection copyFrom: ptr to: endPtr.
+    self reset.
+    ^result!
+connectTo: writeStream via: aBlock
+    "Establish a channel as explained in the class method #to:via:."
+    "Overwrite the block with a Promise object, so that we complete processing
+     and return the entire contents of the underlying stream."
+    contents := Promise new.
+    [
+       | readStream |
+       readStream := aBlock value: self.
+       [
+           "This blocks the reader process if there is no data in the buffer."
+           writeStream nextPutAll: readStream nextHunk.
+           self isConnected and: [ readStream atEnd not ] ] whileTrue.
+        writeStream nextPutAll: readStream contents.
+       "Don't evaluate unless requested."
+        contents value: [ writeStream contents ] ] fork!
+initCollection: aCollection
+    collection := aCollection.
+    ptr := 1.
+    endPtr := 0.
+    data := Semaphore new.
+    empty := Semaphore new.
+    full := Semaphore new.
+    contents := nil.
+! !

