[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Help-smalltalk] [PATCH] More stream-to-stream protocol
From: |
Paolo Bonzini |
Subject: |
[Help-smalltalk] [PATCH] More stream-to-stream protocol |
Date: |
Tue, 05 Aug 2008 16:00:15 +0200 |
User-agent: |
Thunderbird 2.0.0.16 (Macintosh/20080707) |
This patch adds #nextHunkPutAllOn: which is simple to implement and
provides a first example of stream-to-stream communication. The Sockets
microbenchmarks are faster by up to 5x when they use it.
Next on the list, #next:putAllOn: and #nextAvailable:putAllOn:. These
will speed up Swazoo too.
Actually, the file-serving loop in Swazoo is like this:
[[[rs atEnd] whileFalse: [aStream nextPutAll: (rs nextAvailable: 2000)]]
ensure: [rs close]]
It would be much faster in GNU Smalltalk to do just "aStream nextPutAll:
rs". After today's changes, it wouldn't create a single object. :-P
Janko, can you check if it is portable, and if so if it is also fast in
other dialects?
Paolo
implement #nextHunkPutAllOn:
2008-08-05 Paolo Bonzini <address@hidden>
* kernel/FileDescr.st: Move setting atEnd to true into #pastEnd.
Add #nextHunkPutAllOn:. Remove duplicate #copyFrom:to: in #nextHunk.
* kernel/FileStream.st: Add #nextHunkPutAllOn:.
* kernel/Stream.st: Extract pieces of #nextPutAllOn: into
#nextHunkPutAllOn:.
2008-08-05 Paolo Bonzini <address@hidden>
packages/sockets:
2008-08-05 Paolo Bonzini <address@hidden>
* Buffers.st: Add #nextHunk and #nextHunkPutAllOn:.
* Sockets.st: Remove the lookahead instance variable. Delegate
more stuff to the readBuffer, including #nextHunk. Implement
#nextHunkPutAllOn:.
* Tests.st: Modify test to use #nextHunkPutAllOn: for a 3-4x
speed increase. :-P :-P :-P
2008-08-05 Paolo Bonzini <address@hidden>
packages/zlib:
2008-08-05 Paolo Bonzini <address@hidden>
* ZLibReadStream.st: Add #nextHunkPutAllOn:.
* zlibtests.st: Test it.
diff --git a/NEWS b/NEWS
index 6dfc10d..3bbcce2 100644
--- a/NEWS
+++ b/NEWS
@@ -32,12 +32,6 @@ o FileDescriptor and FileStream raise an exception if
#next: cannot
o FileDescriptor is now a subclass of Stream.
-o ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
- the instance of GNU Smalltalk that produced the snapshot, and
- true in the instance of GNU Smalltalk that was restored from the
- snapshot. Note that this does not apply to CallinProcesses, since
- those are stopped in saved images (will this be true in 3.1?).
-
o If possible, the installation is made relocatable. To this end,
the following conditions should be satisfied: 1) the exec-prefix
and prefix should be identical; 2) the installation should reside
@@ -74,9 +68,18 @@ o It is possible to create C call-outs that are not
attached
override the #link method (the existing CFunctionDescriptor class
is now implemented on top of this).
+o A new method #nextHunkPutAllOn: allows to copy from stream to stream
+ while minimizing the number of allocated objects.
+
o ObjectDumper now accepts normal String streams. The class ByteStream
has been removed.
+o ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
+ the instance of GNU Smalltalk that produced the snapshot, and
+ true in the instance of GNU Smalltalk that was restored from the
+ snapshot. Note that this does not apply to CallinProcesses, since
+ those are stopped in saved images (will this be true in 3.1?).
+
o The VFS subsystem was rewritten. Virtual filesystems are now
accessible via special methods on File (such as File>>#zip,
for example "(File name: 'abc.zip') zip") and not anymore with
diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st
index b6576a3..340b6e6 100644
--- a/kernel/FileDescr.st
+++ b/kernel/FileDescr.st
@@ -346,7 +346,7 @@ do arbitrary processing on the files.'>
data := data at: 1].
^result > 0
ifTrue: [data]
- ifFalse: [atEnd := true. self pastEnd]
+ ifFalse: [self pastEnd]
]
peekFor: anObject [
@@ -371,9 +371,7 @@ do arbitrary processing on the files.'>
result := self read: data from: 1 to: 1.
^result > 0
ifTrue: [peek := data at: 1]
- ifFalse:
- [atEnd := true.
- self pastEnd]
+ ifFalse: [self pastEnd]
]
nextByte [
@@ -817,6 +815,16 @@ do arbitrary processing on the files.'>
^true
]
+ nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to aStream."
+
+ <category: 'low-level access'>
+ | count coll |
+ count := self read: (coll := self species new: 1024).
+ count = 0 ifTrue: [^self pastEnd].
+ aStream next: count putAll: coll startingAt: 1
+ ]
+
nextHunk [
"Answer the next buffers worth of stuff in the Stream represented
by the receiver. Do at most one actual input operation."
@@ -825,11 +833,16 @@ do arbitrary processing on the files.'>
| count answer |
count := self read: (answer := self species new: 1024).
count < answer size ifTrue: [answer := answer copyFrom: 1 to: count].
- count = 0
- ifTrue:
- [atEnd := true.
- ^self pastEnd].
- ^answer copyFrom: 1 to: count
+ count = 0 ifTrue: [^self pastEnd].
+ ^answer
+ ]
+
+ pastEnd [
+ "The end of the stream has been reached. Signal a Notification."
+
+ <category: 'polymorphism'>
+ atEnd := true.
+ super pastEnd
]
read: byteArray [
diff --git a/kernel/FileStream.st b/kernel/FileStream.st
index 82ac553..1156e16 100644
--- a/kernel/FileStream.st
+++ b/kernel/FileStream.st
@@ -611,6 +611,19 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
self flush
]
+ nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to
+ aStream. For n consecutive calls to this method, we do
+ n - 1 or n actual input operation."
+
+ <category: 'buffering'>
+ writePtr notNil ifTrue: [self flush].
+ (ptr > endPtr or: [endPtr < collection size]) ifTrue: [self fill].
+ ptr > endPtr ifTrue: [^self pastEnd].
+ aStream next: endPtr - ptr + 1 putAll: collection startingAt: ptr.
+ ptr := endPtr + 1.
+ ]
+
nextHunk [
"Answer the next buffers worth of stuff in the Stream represented
by the receiver. For n consecutive calls to this method, we do
diff --git a/kernel/Stream.st b/kernel/Stream.st
index 7658610..77facd7 100644
--- a/kernel/Stream.st
+++ b/kernel/Stream.st
@@ -232,13 +232,8 @@ provide for writing collections sequentially.'>
nextPutAllOn: aStream [
"Write all the objects in the receiver to aStream"
- | coll |
[self atEnd] whileFalse:
- [coll := self nextHunk.
- aStream
- next: coll size
- putAll: coll
- startingAt: 1].
+ [self nextHunkPutAllOn: aStream].
]
next: anInteger put: anObject [
@@ -507,6 +502,23 @@ provide for writing collections sequentially.'>
repeat
]
+ nextHunkPutAllOn: aStream [
+ "Copy to aStream a more-or-less arbitrary amount of data. When used
+ on files, this does at most one I/O operation. For other kinds of
+ stream, the definition may vary. This method is used by the VM
+ when loading data from a Smalltalk stream, and by various kind
+ of Stream decorators supplied with GNU Smalltalk (including
+ zlib streams). Subclasses that implement nextHunk can implement
+ this method to avoid useless work."
+
+ | coll |
+ coll := self nextHunk.
+ aStream
+ next: coll size
+ putAll: coll
+ startingAt: 1
+ ]
+
nextHunk [
"Answer a more-or-less arbitrary amount of data. When used on files,
this
does at most one I/O operation. For other kinds of stream, the
definition
diff --git a/packages/sockets/Buffers.st b/packages/sockets/Buffers.st
index ebdd47b..b822b96 100644
--- a/packages/sockets/Buffers.st
+++ b/packages/sockets/Buffers.st
@@ -120,6 +120,28 @@ evaluates an user defined block to try to get some more
data.'>
^contents
]
+ nextHunkPutAllOn: aStream [
+ "Copy a buffer's worth of data from the receiver to aStream, doing
+ at most one call to the fill block."
+
+ <category: 'buffer handling'>
+ self atEnd ifTrue: [^super pastEnd].
+ aStream next: endPtr - ptr + 1 putAll: self collection startingAt: ptr.
+ endPtr := ptr - 1. "Empty the buffer"
+ ]
+
+ nextHunk [
+ "Answer a buffer's worth of data, doing at most one call
+ to the fill block."
+
+ <category: 'buffer handling'>
+ | contents |
+ self atEnd ifTrue: [^super pastEnd].
+ contents := self collection copyFrom: ptr to: endPtr.
+ endPtr := ptr - 1. "Empty the buffer"
+ ^contents
+ ]
+
availableBytes [
"Answer how many bytes are available in the buffer."
diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st
index 02cd87e..b2c11f2 100644
--- a/packages/sockets/Sockets.st
+++ b/packages/sockets/Sockets.st
@@ -1013,7 +1013,7 @@ this class simply redirect their calls to an
implementation class.'>
AbstractSocket subclass: StreamSocket [
- | lookahead peerDead readBuffer outOfBand |
+ | peerDead readBuffer outOfBand |
<category: 'Sockets-Streams'>
<comment: '
@@ -1163,25 +1163,17 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
or from the operating system."
<category: 'stream protocol'>
- | lookaheadBytes |
- lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ].
- self canRead ifFalse: [ ^lookaheadBytes ].
+ self canRead ifFalse: [ ^0 ].
self readBuffer isEmpty ifTrue: [ self readBuffer fill ].
- ^lookaheadBytes + self readBuffer availableBytes
+ ^self readBuffer availableBytes
]
bufferContents [
"Answer the current contents of the read buffer"
<category: 'stream protocol'>
- | result |
readBuffer isNil ifTrue: [^self pastEnd].
- result := self readBuffer bufferContents.
- lookahead isNil
- ifFalse:
- [result := lookahead asString , result.
- lookahead := nil].
- ^result
+ ^self readBuffer bufferContents
]
close [
@@ -1212,11 +1204,8 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
Smalltalk Processes."
<category: 'stream protocol'>
- | result |
- lookahead isNil ifTrue: [^self readBuffer next].
- result := lookahead.
- lookahead := nil.
- ^result
+ readBuffer isNil ifTrue: [^self pastEnd].
+ ^self readBuffer next
]
nextAvailable: anInteger [
@@ -1225,6 +1214,8 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
<category: 'accessing-reading'>
| buffer available stream |
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+
self ensureReadable.
available := self availableBytes.
available >= anInteger ifTrue: [ ^self next: anInteger ].
@@ -1238,11 +1229,22 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
needed."
stream := WriteStream with: buffer.
[ (available := self availableBytes min: anInteger - stream size) > 0 ]
- whileTrue: [ stream nextPutAll: (self next: available) ].
+ whileTrue: [ stream nextPutAll: (self readBuffer next: available) ].
^stream contents
]
+ nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to aStream.
+ Do at most one actual input operation."
+
+ <category: 'stream protocol'>
+ "Ensure that the buffer is full"
+
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ self readBuffer nextHunkPutAllOn: aStream
+ ]
+
nextHunk [
"Answer the next buffers worth of stuff in the Stream represented
by the receiver. Do at most one actual input operation."
@@ -1250,8 +1252,8 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
"Ensure that the buffer is full"
<category: 'stream protocol'>
- self peek.
- ^self bufferContents
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ ^self readBuffer nextHunk
]
next: count [
@@ -1260,10 +1262,8 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
<category: 'stream protocol'>
| result |
- lookahead isNil ifTrue: [^self readBuffer next: count].
- result := (String with: lookahead) , (self readBuffer next: count - 1).
- lookahead := nil.
- ^result
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ ^self readBuffer next: count
]
peek [
@@ -1272,12 +1272,9 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
Smalltalk Processes."
<category: 'stream protocol'>
- lookahead isNil
- ifTrue:
- [self readBuffer isNil ifTrue: [^nil].
- self readBuffer atEnd ifTrue: [^nil].
- lookahead := self readBuffer next].
- ^lookahead
+ self readBuffer isNil ifTrue: [^nil].
+ self readBuffer atEnd ifTrue: [^nil].
+ ^self readBuffer peek
]
peekFor: anObject [
@@ -1286,16 +1283,9 @@ This class adds a read buffer to the basic model of
AbstractSocket.'>
control to other Smalltalk Processes."
<category: 'stream protocol'>
- lookahead isNil
- ifTrue:
- [self readBuffer isNil ifTrue: [^false].
- self readBuffer atEnd ifTrue: [^false].
- lookahead := self readBuffer next].
- ^lookahead = anObject
- ifTrue:
- [lookahead := nil.
- true]
- ifFalse: [false]
+ self readBuffer isNil ifTrue: [^false].
+ self readBuffer atEnd ifTrue: [^false].
+ ^self readBuffer peekFor: anObject
]
readBufferSize: size [
diff --git a/packages/sockets/Tests.st b/packages/sockets/Tests.st
index 29101d1..d11f9c5 100644
--- a/packages/sockets/Tests.st
+++ b/packages/sockets/Tests.st
@@ -1,3 +1,14 @@
+Stream subclass: DummyStream [
+ <category: 'Sockets-Tests'>
+
+ | n |
+ DummyStream class >> new [ ^super new initialize ]
+ initialize [ n := 0 ]
+ nextPut: anObject [ n := n + 1 ]
+ next: anInteger putAll: aCollection startingAt: pos [ n := n + anInteger ]
+ size [ ^n ]
+]
+
Socket class extend [
microTest [
@@ -65,7 +76,8 @@ Socket class extend [
output buffer sizes, and the address class (family) to use."
<category: 'tests'>
- | queue server client bytesToSend sendBuf bytesSent bytesReceived t
extraBytes timeout process |
+ | queue server client bytesToSend sendBuf bytesSent bytesReceived
+ t extraBytes timeout process recvBuf |
Transcript
cr;
show: 'starting loopback test';
@@ -94,13 +106,15 @@ Socket class extend [
cr.
bytesToSend := 5000000.
sendBuf := String new: 4000 withAll: $x.
+ recvBuf := DummyStream new.
bytesSent := bytesReceived := 0.
t := Time millisecondsToRun:
[
[server nextPutAll: sendBuf.
bytesSent := bytesSent + sendBuf size.
[client canRead] whileTrue:
- [bytesReceived := bytesReceived + client
nextHunk size].
+ [client nextHunkPutAllOn: recvBuf.
+ bytesReceived := recvBuf size].
bytesSent >= bytesToSend and: [bytesReceived =
bytesSent]]
whileFalse].
Transcript
@@ -173,11 +187,14 @@ Socket class extend [
sema signal]
fork.
consumer :=
- [queueReady wait.
+ [| recvBuf |
+ recvBuf := DummyStream new.
+ queueReady wait.
client := Socket remote: queue localAddress
port: (self testPortFor: addressClass).
[[client canRead] whileTrue:
- [bytesReceived := bytesReceived +
client nextHunk size].
+ [client nextHunkPutAllOn: recvBuf.
+ bytesReceived := recvBuf size].
bytesSent >= bytesToSend and: [bytesReceived =
bytesSent]]
whileFalse: [Processor yield].
sema signal]
diff --git a/packages/zlib/ZLibReadStream.st b/packages/zlib/ZLibReadStream.st
index 5e03ff5..17fbea5 100644
--- a/packages/zlib/ZLibReadStream.st
+++ b/packages/zlib/ZLibReadStream.st
@@ -70,6 +70,19 @@ used for communication with zlib.'>
^result
]
+ nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to
+ aStream. Do at most one actual compression/decompression
+ operation."
+
+ <category: 'streaming'>
+ | result |
+ self atEnd ifTrue: [^self pastEnd].
+ aStream next: endPtr - ptr putAll: outBytes startingAt: ptr + 1.
+ ptr := endPtr.
+ ^result
+ ]
+
nextHunk [
"Answer the next buffers worth of stuff in the Stream represented
by the receiver. Do at most one actual compression/decompression
diff --git a/packages/zlib/zlibtests.st b/packages/zlib/zlibtests.st
index a6bb592..5309368 100644
--- a/packages/zlib/zlibtests.st
+++ b/packages/zlib/zlibtests.st
@@ -166,6 +166,17 @@ TestCase subclass: ZlibStreamTest [
self assertFooVector: data
]
+ testNextHunkPutAllOn [
+ "Test accessing data with nextHunkPutAllOn."
+
+ <category: 'testing'>
+ | stream data |
+ stream := InflateStream on: self doDeflate readStream.
+ data := String new writeStream.
+ [stream atEnd] whileFalse: [stream nextHunkPutAllOn: data].
+ self assertFooVector: data contents
+ ]
+
testRandomAccess [
"Test random access to deflated data."
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Help-smalltalk] [PATCH] More stream-to-stream protocol,
Paolo Bonzini <=