[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Help-smalltalk] [PATCH] More zlib...
From: |
Paolo Bonzini |
Subject: |
[Help-smalltalk] [PATCH] More zlib... |
Date: |
Fri, 18 May 2007 16:35:57 +0200 |
User-agent: |
Thunderbird 2.0.0.0 (Macintosh/20070326) |
One more patch for today, allowing the DeflateStream to decorate not
only a ReadStream but also a WriteStream. To do this, I just added a
generic class to turn a ReadStream decorator into a WriteStream
decorator (the PipeStream).
More practically, this means that you can do:
fs := FileStream open: 'foo.gz' mode: 'w'.
gzipStream := GZipDeflateStream compressingTo: fs.
Object fileOutOn: gzipStream.
gzipStream close.
fs close.
Paolo
* looking for address@hidden/smalltalk--devo--2.2--patch-324 to compare with
* comparing to address@hidden/smalltalk--devo--2.2--patch-324
M examples/zlib.st
M ChangeLog
M packages.xml.in
* modified files
2007-05-18 Paolo Bonzini <address@hidden>
* kernel/PipeStream.st: New.
* kernel/zlib.st: Use it.
--- orig/examples/zlib.st
+++ mod/examples/zlib.st
@@ -133,6 +133,13 @@ testError
^[ (InflateStream on: #[12 34 56] readStream) contents. false ]
on: ZlibError do: [ :ex | ex return: true ]!
+testWrite
+ "Test the WriteStream version of DeflateStream."
+ | dest |
+ dest := DeflateStream compressingTo: String new writeStream.
+ dest nextPutAll: self testVector.
+ ^dest contents asByteArray = self doDeflate asByteArray!
+
testRaw
"Test connecting a DeflateStream back-to-back with an InflateStream."
| deflate |
@@ -196,6 +203,7 @@ runTests
self testDirect printNl.
self testRaw printNl.
self testGZip printNl.
+ self testWrite printNl.
self bufferSize: oldBufSize! !
@@ -349,6 +357,16 @@ defaultCompressionLevel: anInteger
!RawDeflateStream class methodsFor: 'instance creation'!
+compressingTo: aStream
+ "Answer a stream that receives data via #nextPut: and compresses it onto
+ aStream."
+ ^PipeStream connectedTo: aStream via: [ :r | self on: r ]!
+
+compressingTo: aStream level: level
+ "Answer a stream that receives data via #nextPut: and compresses it onto
+ aStream with the given compression level."
+ ^PipeStream connectedTo: aStream via: [ :r | self on: r level: level ]!
+
on: aStream
"Answer a stream that compresses the data in aStream with the default
compression level."
@@ -405,7 +423,6 @@ position: anInteger
reset
"Reset the stream to the beginning of the compressed data."
- input stream or skipping compressed data."
source reset.
self destroyZlibObject; initializeZlibObject.
self resetBuffer!
@@ -416,7 +433,7 @@ copyFrom: start to: end
unlike the one in Collection, because a Stream's #position method
returns 0-based values. Notice that this class can only provide
the illusion of random access, by appropriately rewinding the input
- stream or skipping compressed data.""
+ stream or skipping compressed data."
| pos |
pos := self position.
^[ self position: start; next: end - start ]
--- orig/packages.xml.in
+++ mod/packages.xml.in
@@ -288,10 +288,12 @@
<package>
<name>ZLib</name>
+ <filein>PipeStream.st</filein>
<filein>zlib.st</filein>
<module>zlib</module>
<directory>examples</directory>
+ <file>PipeStream.st</file>
<file>zlib.st</file>
</package>
--- /dev/null
+++ mod/examples/PipeStream.st
@@ -0,0 +1,234 @@
+"======================================================================
+|
+| PipeStream class (part of the ZLib bindings)
+|
+|
+ ======================================================================"
+
+
+"======================================================================
+|
+| Copyright 2007 Free Software Foundation, Inc.
+| Written by Paolo Bonzini
+|
+| This file is part of GNU Smalltalk.
+|
+| GNU Smalltalk is free software; you can redistribute it and/or modify it
+| under the terms of the GNU General Public License as published by the Free
+| Software Foundation; either version 2, or (at your option) any later version.
+|
+| GNU Smalltalk is distributed in the hope that it will be useful, but WITHOUT
+| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+| FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+| details.
+|
+| You should have received a copy of the GNU General Public License along with
+| GNU Smalltalk; see the file COPYING. If not, write to the Free Software
+| Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+|
+ ======================================================================"
+
+
+PositionableStream subclass: #PipeStream
+ instanceVariableNames: 'full data empty contents'
+ classVariableNames: 'BufferSize'
+ poolDictionaries: ''
+ category: 'Examples-Processes'!
+
+PipeStream comment:
+'Used internally by the zlib bindings, the PipeStream provides two
+pieces of functionality. The first is to provide a dual-ended FIFO
+stream, which can be read and written by independent processes. The
+second is to provide a WriteStream-to-ReadStream adaptor, where the
+data is written to the PipeStream (the writing side), fueled to
+an object expecting a ReadStream (possibly as a decorator), and taken
+from there into the destination stream. The effect is to turn a
+ReadStream decorator into a WriteStream decorator.'!
+
+!PipeStream class methodsFor: 'accessing'!
+
+bufferSize
+ "Answer the size of the output buffers that are passed to zlib. Each
+ zlib stream uses a buffer of this size."
+ BufferSize isNil ifTrue: [ BufferSize := 512 ].
+ ^BufferSize!
+
+bufferSize: anInteger
+ "Set the size of the output buffers that are passed to zlib. Each
+ zlib stream uses a buffer of this size."
+ BufferSize := anInteger!
+
+!PipeStream class methodsFor: 'instance creation'!
+
+on: aCollection
+ "Answer a new stream using aCollection as its buffer."
+ aCollection size = 0 ifTrue: [ self halt ].
+ ^self basicNew initCollection: aCollection!
+
+connectedTo: writeStream via: aBlock
+ "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+ The pipe is passed to the 1-parameter block aBlock, which should use
+ the pipe as a ReadStream and return another ReadStream. The data that
+ will be written to the pipe will go through the return value of aBlock,
+ and then written to aStream.
+
+ Example:
+ dest := PipeStream on: fileStream via: [ :r | DeflateStream on: r ].
+ dest next: 100 put: $A."
+
+ ^(self on: (writeStream species new: self bufferSize))
+ connectTo: writeStream via: aBlock;
+ yourself!
+
+on: aCollection via: aBlock
+ "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+ The pipe is passed to the 1-parameter block aBlock, which should use
+ the pipe as a ReadStream and return another ReadStream. The data that
+ will be written to the pipe will be placed into aCollection, and can
+ be retrieved using the #contents method of the PipeStream.
+
+ Example:
+ dest := PipeStream on: String new via: [ :r | DeflateStream on: r ].
+ dest next: 100 put: $A.
+ dest contents printNl"
+
+ ^self connectedTo: aCollection writeStream via: aBlock!
+
+!PipeStream methodsFor: 'instance creation'!
+
+close
+ "Close the pipe, causing all blocked reads and writes to terminate
+ immediately."
+ | sema |
+ sema := full.
+ full := nil.
+ sema notifyAll.
+
+ sema := empty.
+ empty := nil.
+ sema notifyAll.
+
+ sema := data.
+ data := nil.
+ sema notifyAll!
+
+notConnected
+ "Answer whether the communication channel has been closed."
+ ^full isNil!
+
+isConnected
+ "Answer whether the communication channel is still open."
+ ^full notNil!
+
+atEnd
+ "Answer whether the communication channel is closed and there is no
+ data in the buffer."
+ ^super atEnd and: [ self notConnected ]!
+
+isEmpty
+ "Answer whether there is data in the buffer."
+ ^super atEnd!
+
+isFull
+ "Answer whether there is room in the buffer."
+ ^endPtr = collection size!
+
+next
+ "Retrieve the next byte of data from the pipe, blocking if there is none."
+ | result |
+ [ self isEmpty ] whileTrue: [
+ self isConnected ifFalse: [ ^self pastEnd ].
+ data wait ].
+ result := super next.
+ empty notifyAll.
+ ^result!
+
+peek
+ "Retrieve the next byte of data from the pipe, without gobbling it and
+ blocking if there is none."
+ [ self isEmpty ] whileTrue: [
+ self isConnected ifFalse: [ ^self pastEnd ].
+ data wait ].
+ ^super peek!
+
+nextPut: anObject
+ "Put anObject in the pipe, blocking if it is full."
+ [ self isFull ] whileTrue: [
+ self isConnected ifFalse: [ ^self pastEnd ].
+ empty wait ].
+ endPtr := endPtr + 1.
+ collection at: endPtr put: anObject.
+ data notifyAll.
+ self isFull ifTrue: [ full notifyAll ].
+ ^anObject!
+
+nextHunk
+ "Return a buffer worth of data, blocking until it is full or the pipe
+ is closed."
+ [ self isEmpty and: [ self isConnected ] ] whileTrue: [ full wait ].
+
+ "Here, the buffer is full and all writers are locked, so there is no
+ contention between the writer and the reader."
+ ^self bufferContents!
+
+contents
+ "Close the channel and return the full contents of the stream. For
+ pipes created with #on:, #contents closes the stream and returns the
+ leftover contents of buffer."
+ self close.
+ ^contents isNil
+ ifTrue: [ self bufferContents ]
+ ifFalse: [ contents value value ]!
+
+readStream
+ "Close the channel and return a readStream on the full contents of
+ the stream. For pipes created with #on:, the stream is created on the
+ leftover contents of buffer."
+ ^self contents readStream!
+
+reset
+ "Drop all data currently in the buffer. This should not be used
+ concurrently with other next or nextPut: operations."
+
+ endPtr := 0.
+ ptr := 1.
+ empty notifyAll!
+
+!PipeStream methodsFor: 'private methods'!
+
+bufferContents
+ "Return the current contents of the buffer and empty it. This is private
+ because it requires a lock even in presence of a single reader and a
single
+ writer."
+ | result |
+ result := collection copyFrom: ptr to: endPtr.
+ self reset.
+ ^result!
+
+connectTo: writeStream via: aBlock
+ "Establish a channel as explained in the class method #to:via:."
+
+ "Overwrite the block with a Promise object, so that we complete processing
+ and return the entire contents of the underlying stream."
+ contents := Promise new.
+ [
+ | readStream |
+ readStream := aBlock value: self.
+ [
+ "This blocks the reader process if there is no data in the buffer."
+ writeStream nextPutAll: readStream nextHunk.
+ self isConnected and: [ readStream atEnd not ] ] whileTrue.
+ writeStream nextPutAll: readStream contents.
+
+ "Don't evaluate unless requested."
+ contents value: [ writeStream contents ] ] fork!
+
+initCollection: aCollection
+ collection := aCollection.
+ ptr := 1.
+ endPtr := 0.
+ data := Semaphore new.
+ empty := Semaphore new.
+ full := Semaphore new.
+ contents := nil.
+! !
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Help-smalltalk] [PATCH] More zlib...,
Paolo Bonzini <=