help-smalltalk
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Help-smalltalk] [PATCH] More stream-to-stream protocol


From: Paolo Bonzini
Subject: [Help-smalltalk] [PATCH] More stream-to-stream protocol
Date: Tue, 05 Aug 2008 16:00:15 +0200
User-agent: Thunderbird 2.0.0.16 (Macintosh/20080707)

This patch adds #nextHunkPutAllOn: which is simple to implement and provides a first example of stream-to-stream communication. The Sockets microbenchmarks are faster by up to 5x when they use it.

Next on the list, #next:putAllOn: and #nextAvailable:putAllOn:. These will speed up Swazoo too.

Actually, the file-serving loop in Swazoo is like this:

[[[rs atEnd] whileFalse: [aStream nextPutAll: (rs nextAvailable: 2000)]]
     ensure: [rs close]]

It would be much faster in GNU Smalltalk to do just "aStream nextPutAll: rs". After today's changes, it wouldn't create a single object. :-P

Janko, can you check if it is portable, and if so if it is also fast in other dialects?

Paolo
implement #nextHunkPutAllOn:

2008-08-05  Paolo Bonzini  <address@hidden>

        * kernel/FileDescr.st: Move setting atEnd to true into #pastEnd.
        Add #nextHunkPutAllOn:.  Remove duplicate #copyFrom:to: in #nextHunk.
        * kernel/FileStream.st: Add #nextHunkPutAllOn:.
        * kernel/Stream.st: Extract pieces of #nextPutAllOn: into
        #nextHunkPutAllOn:.

2008-08-05  Paolo Bonzini  <address@hidden>

packages/sockets:
2008-08-05  Paolo Bonzini  <address@hidden>

        * Buffers.st: Add #nextHunk and #nextHunkPutAllOn:.
        * Sockets.st: Remove the lookahead instance variable.  Delegate
        more stuff to the readBuffer, including #nextHunk.  Implement
        #nextHunkPutAllOn:.
        * Tests.st: Modify test to use #nextHunkPutAllOn: for a 3-4x
        speed increase. :-P :-P :-P

2008-08-05  Paolo Bonzini  <address@hidden>

packages/zlib:
2008-08-05  Paolo Bonzini  <address@hidden>

        * ZLibReadStream.st: Add #nextHunkPutAllOn:.
        * zlibtests.st: Test it.

diff --git a/NEWS b/NEWS
index 6dfc10d..3bbcce2 100644
--- a/NEWS
+++ b/NEWS
@@ -32,12 +32,6 @@ o   FileDescriptor and FileStream raise an exception if 
#next: cannot
 
 o   FileDescriptor is now a subclass of Stream.
 
-o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
-    the instance of GNU Smalltalk that produced the snapshot, and
-    true in the instance of GNU Smalltalk that was restored from the
-    snapshot.  Note that this does not apply to CallinProcesses, since
-    those are stopped in saved images (will this be true in 3.1?).
-
 o   If possible, the installation is made relocatable.  To this end,
     the following conditions should be satisfied: 1) the exec-prefix
     and prefix should be identical; 2) the installation should reside
@@ -74,9 +68,18 @@ o   It is possible to create C call-outs that are not 
attached
     override the #link method (the existing CFunctionDescriptor class
     is now implemented on top of this).
 
+o   A new method #nextHunkPutAllOn: allows to copy from stream to stream
+    while minimizing the number of allocated objects.
+
 o   ObjectDumper now accepts normal String streams.  The class ByteStream
     has been removed.
 
+o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
+    the instance of GNU Smalltalk that produced the snapshot, and
+    true in the instance of GNU Smalltalk that was restored from the
+    snapshot.  Note that this does not apply to CallinProcesses, since
+    those are stopped in saved images (will this be true in 3.1?).
+
 o   The VFS subsystem was rewritten.  Virtual filesystems are now
     accessible via special methods on File (such as File>>#zip,
     for example "(File name: 'abc.zip') zip") and not anymore with
diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st
index b6576a3..340b6e6 100644
--- a/kernel/FileDescr.st
+++ b/kernel/FileDescr.st
@@ -346,7 +346,7 @@ do arbitrary processing on the files.'>
                data := data at: 1].
        ^result > 0 
            ifTrue: [data]
-           ifFalse: [atEnd := true.  self pastEnd]
+           ifFalse: [self pastEnd]
     ]
 
     peekFor: anObject [
@@ -371,9 +371,7 @@ do arbitrary processing on the files.'>
        result := self read: data from: 1 to: 1.
        ^result > 0 
            ifTrue: [peek := data at: 1]
-           ifFalse: 
-               [atEnd := true.
-               self pastEnd]
+           ifFalse: [self pastEnd]
     ]
 
     nextByte [
@@ -817,6 +815,16 @@ do arbitrary processing on the files.'>
        ^true
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy the next buffers worth of stuff from the receiver to aStream."
+
+       <category: 'low-level access'>
+       | count coll |
+       count := self read: (coll := self species new: 1024).
+       count = 0 ifTrue: [^self pastEnd].
+       aStream next: count putAll: coll startingAt: 1
+    ]
+
     nextHunk [
        "Answer the next buffers worth of stuff in the Stream represented
         by the receiver.  Do at most one actual input operation."
@@ -825,11 +833,16 @@ do arbitrary processing on the files.'>
        | count answer |
        count := self read: (answer := self species new: 1024).
        count < answer size ifTrue: [answer := answer copyFrom: 1 to: count].
-       count = 0 
-           ifTrue: 
-               [atEnd := true.
-               ^self pastEnd].
-       ^answer copyFrom: 1 to: count
+       count = 0 ifTrue: [^self pastEnd].
+       ^answer
+    ]
+
+    pastEnd [
+        "The end of the stream has been reached.  Signal a Notification."
+
+        <category: 'polymorphism'>
+       atEnd := true.
+       super pastEnd
     ]
 
     read: byteArray [
diff --git a/kernel/FileStream.st b/kernel/FileStream.st
index 82ac553..1156e16 100644
--- a/kernel/FileStream.st
+++ b/kernel/FileStream.st
@@ -611,6 +611,19 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
        self flush
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy the next buffers worth of stuff from the receiver to
+        aStream.  For n consecutive calls to this method, we do
+        n - 1 or n actual input operation."
+
+       <category: 'buffering'>
+       writePtr notNil ifTrue: [self flush].
+       (ptr > endPtr or: [endPtr < collection size]) ifTrue: [self fill].
+       ptr > endPtr ifTrue: [^self pastEnd].
+       aStream next: endPtr - ptr + 1 putAll: collection startingAt: ptr.
+       ptr := endPtr + 1.
+    ]
+
     nextHunk [
        "Answer the next buffers worth of stuff in the Stream represented
         by the receiver.  For n consecutive calls to this method, we do
diff --git a/kernel/Stream.st b/kernel/Stream.st
index 7658610..77facd7 100644
--- a/kernel/Stream.st
+++ b/kernel/Stream.st
@@ -232,13 +232,8 @@ provide for writing collections sequentially.'>
     nextPutAllOn: aStream [
         "Write all the objects in the receiver to aStream"
 
-       | coll |
        [self atEnd] whileFalse: 
-               [coll := self nextHunk.
-               aStream 
-                   next: coll size
-                   putAll: coll
-                   startingAt: 1].
+               [self nextHunkPutAllOn: aStream].
     ]
 
     next: anInteger put: anObject [
@@ -507,6 +502,23 @@ provide for writing collections sequentially.'>
                repeat
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy to aStream a more-or-less arbitrary amount of data.  When used
+        on files, this does at most one I/O operation.  For other kinds of
+        stream, the definition may vary.  This method is used by the VM
+        when loading data from a Smalltalk stream, and by various kind
+        of Stream decorators supplied with GNU Smalltalk (including
+        zlib streams).  Subclasses that implement nextHunk can implement
+        this method to avoid useless work."
+
+       | coll |
+       coll := self nextHunk.
+       aStream 
+           next: coll size
+           putAll: coll
+           startingAt: 1
+    ]
+
     nextHunk [
        "Answer a more-or-less arbitrary amount of data.  When used on files, 
this
         does at most one I/O operation.  For other kinds of stream, the 
definition
diff --git a/packages/sockets/Buffers.st b/packages/sockets/Buffers.st
index ebdd47b..b822b96 100644
--- a/packages/sockets/Buffers.st
+++ b/packages/sockets/Buffers.st
@@ -120,6 +120,28 @@ evaluates an user defined block to try to get some more 
data.'>
        ^contents
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy a buffer's worth of data from the receiver to aStream, doing
+        at most one call to the fill block."
+
+       <category: 'buffer handling'>
+       self atEnd ifTrue: [^super pastEnd].
+       aStream next: endPtr - ptr + 1 putAll: self collection startingAt: ptr.
+       endPtr := ptr - 1.      "Empty the buffer"
+    ]
+
+    nextHunk [
+       "Answer a buffer's worth of data, doing at most one call
+        to the fill block."
+
+       <category: 'buffer handling'>
+       | contents |
+       self atEnd ifTrue: [^super pastEnd].
+       contents := self collection copyFrom: ptr to: endPtr.
+       endPtr := ptr - 1.      "Empty the buffer"
+       ^contents
+    ]
+
     availableBytes [
         "Answer how many bytes are available in the buffer."
 
diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st
index 02cd87e..b2c11f2 100644
--- a/packages/sockets/Sockets.st
+++ b/packages/sockets/Sockets.st
@@ -1013,7 +1013,7 @@ this class simply redirect their calls to an 
implementation class.'>
 
 
 AbstractSocket subclass: StreamSocket [
-    | lookahead peerDead readBuffer outOfBand |
+    | peerDead readBuffer outOfBand |
     
     <category: 'Sockets-Streams'>
     <comment: '
@@ -1163,25 +1163,17 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         or from the operating system."
 
        <category: 'stream protocol'>
-       | lookaheadBytes |
-       lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ].
-       self canRead ifFalse: [ ^lookaheadBytes ].
+       self canRead ifFalse: [ ^0 ].
        self readBuffer isEmpty ifTrue: [ self readBuffer fill ].
-       ^lookaheadBytes + self readBuffer availableBytes
+       ^self readBuffer availableBytes
     ]
 
     bufferContents [
        "Answer the current contents of the read buffer"
 
        <category: 'stream protocol'>
-       | result |
        readBuffer isNil ifTrue: [^self pastEnd].
-       result := self readBuffer bufferContents.
-       lookahead isNil 
-           ifFalse: 
-               [result := lookahead asString , result.
-               lookahead := nil].
-       ^result
+       ^self readBuffer bufferContents
     ]
 
     close [
@@ -1212,11 +1204,8 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         Smalltalk Processes."
 
        <category: 'stream protocol'>
-       | result |
-       lookahead isNil ifTrue: [^self readBuffer next].
-       result := lookahead.
-       lookahead := nil.
-       ^result
+       readBuffer isNil ifTrue: [^self pastEnd].
+       ^self readBuffer next
     ]
        
     nextAvailable: anInteger [
@@ -1225,6 +1214,8 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
 
         <category: 'accessing-reading'>
        | buffer available stream |
+       readBuffer isNil ifTrue: [ ^self pastEnd ].
+
        self ensureReadable.
        available := self availableBytes.
        available >= anInteger ifTrue: [ ^self next: anInteger ].
@@ -1238,11 +1229,22 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         needed."
        stream := WriteStream with: buffer.
        [ (available := self availableBytes min: anInteger - stream size) > 0 ]
-           whileTrue: [ stream nextPutAll: (self next: available) ].
+           whileTrue: [ stream nextPutAll: (self readBuffer next: available) ].
 
        ^stream contents
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy the next buffers worth of stuff from the receiver to aStream.
+        Do at most one actual input operation."
+
+       <category: 'stream protocol'>
+       "Ensure that the buffer is full"
+
+       readBuffer isNil ifTrue: [ ^self pastEnd ].
+       self readBuffer nextHunkPutAllOn: aStream
+    ]
+
     nextHunk [
        "Answer the next buffers worth of stuff in the Stream represented
         by the receiver.  Do at most one actual input operation."
@@ -1250,8 +1252,8 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
        "Ensure that the buffer is full"
 
        <category: 'stream protocol'>
-       self peek.
-       ^self bufferContents
+       readBuffer isNil ifTrue: [ ^self pastEnd ].
+       ^self readBuffer nextHunk
     ]
 
     next: count [
@@ -1260,10 +1262,8 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
 
        <category: 'stream protocol'>
        | result |
-       lookahead isNil ifTrue: [^self readBuffer next: count].
-       result := (String with: lookahead) , (self readBuffer next: count - 1).
-       lookahead := nil.
-       ^result
+       readBuffer isNil ifTrue: [ ^self pastEnd ].
+       ^self readBuffer next: count
     ]
 
     peek [
@@ -1272,12 +1272,9 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         Smalltalk Processes."
 
        <category: 'stream protocol'>
-       lookahead isNil 
-           ifTrue: 
-               [self readBuffer isNil ifTrue: [^nil].
-               self readBuffer atEnd ifTrue: [^nil].
-               lookahead := self readBuffer next].
-       ^lookahead
+       self readBuffer isNil ifTrue: [^nil].
+       self readBuffer atEnd ifTrue: [^nil].
+       ^self readBuffer peek
     ]
 
     peekFor: anObject [
@@ -1286,16 +1283,9 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         control to other Smalltalk Processes."
 
        <category: 'stream protocol'>
-       lookahead isNil 
-           ifTrue: 
-               [self readBuffer isNil ifTrue: [^false].
-               self readBuffer atEnd ifTrue: [^false].
-               lookahead := self readBuffer next].
-       ^lookahead = anObject 
-           ifTrue: 
-               [lookahead := nil.
-               true]
-           ifFalse: [false]
+       self readBuffer isNil ifTrue: [^false].
+       self readBuffer atEnd ifTrue: [^false].
+       ^self readBuffer peekFor: anObject
     ]
 
     readBufferSize: size [
diff --git a/packages/sockets/Tests.st b/packages/sockets/Tests.st
index 29101d1..d11f9c5 100644
--- a/packages/sockets/Tests.st
+++ b/packages/sockets/Tests.st
@@ -1,3 +1,14 @@
+Stream subclass: DummyStream [
+    <category: 'Sockets-Tests'>
+
+    | n |
+    DummyStream class >> new [ ^super new initialize ]
+    initialize [ n := 0 ]
+    nextPut: anObject [ n := n + 1 ]
+    next: anInteger putAll: aCollection startingAt: pos [ n := n + anInteger ]
+    size [ ^n ]
+]
+
 Socket class extend [
 
     microTest [
@@ -65,7 +76,8 @@ Socket class extend [
         output buffer sizes, and the address class (family) to use."
 
        <category: 'tests'>
-       | queue server client bytesToSend sendBuf bytesSent bytesReceived t 
extraBytes timeout process |
+       | queue server client bytesToSend sendBuf bytesSent bytesReceived
+         t extraBytes timeout process recvBuf |
        Transcript
            cr;
            show: 'starting loopback test';
@@ -94,13 +106,15 @@ Socket class extend [
            cr.
        bytesToSend := 5000000.
        sendBuf := String new: 4000 withAll: $x.
+       recvBuf := DummyStream new.
        bytesSent := bytesReceived := 0.
        t := Time millisecondsToRun: 
                        [
                        [server nextPutAll: sendBuf.
                        bytesSent := bytesSent + sendBuf size.
                        [client canRead] whileTrue: 
-                               [bytesReceived := bytesReceived + client 
nextHunk size].
+                               [client nextHunkPutAllOn: recvBuf.
+                               bytesReceived := recvBuf size].
                        bytesSent >= bytesToSend and: [bytesReceived = 
bytesSent]] 
                                whileFalse].
        Transcript
@@ -173,11 +187,14 @@ Socket class extend [
                                sema signal] 
                                        fork.
                        consumer := 
-                               [queueReady wait.
+                               [| recvBuf |
+                               recvBuf := DummyStream new.
+                               queueReady wait.
                                client := Socket remote: queue localAddress 
port: (self testPortFor: addressClass).
                                
                                [[client canRead] whileTrue: 
-                                       [bytesReceived := bytesReceived + 
client nextHunk size].
+                                       [client nextHunkPutAllOn: recvBuf.
+                                       bytesReceived := recvBuf size].
                                bytesSent >= bytesToSend and: [bytesReceived = 
bytesSent]] 
                                        whileFalse: [Processor yield].
                                sema signal] 
diff --git a/packages/zlib/ZLibReadStream.st b/packages/zlib/ZLibReadStream.st
index 5e03ff5..17fbea5 100644
--- a/packages/zlib/ZLibReadStream.st
+++ b/packages/zlib/ZLibReadStream.st
@@ -70,6 +70,19 @@ used for communication with zlib.'>
        ^result
     ]
 
+    nextHunkPutAllOn: aStream [
+       "Copy the next buffers worth of stuff from the receiver to
+        aStream.  Do at most one actual compression/decompression
+        operation."
+
+       <category: 'streaming'>
+       | result |
+       self atEnd ifTrue: [^self pastEnd].
+       aStream next: endPtr - ptr putAll: outBytes startingAt: ptr + 1.
+       ptr := endPtr.
+       ^result
+    ]
+
     nextHunk [
        "Answer the next buffers worth of stuff in the Stream represented
         by the receiver.  Do at most one actual compression/decompression
diff --git a/packages/zlib/zlibtests.st b/packages/zlib/zlibtests.st
index a6bb592..5309368 100644
--- a/packages/zlib/zlibtests.st
+++ b/packages/zlib/zlibtests.st
@@ -166,6 +166,17 @@ TestCase subclass: ZlibStreamTest [
        self assertFooVector: data
     ]
 
+    testNextHunkPutAllOn [
+       "Test accessing data with nextHunkPutAllOn."
+
+       <category: 'testing'>
+       | stream data |
+       stream := InflateStream on: self doDeflate readStream.
+       data := String new writeStream.
+       [stream atEnd] whileFalse: [stream nextHunkPutAllOn: data].
+       self assertFooVector: data contents
+    ]
+
     testRandomAccess [
        "Test random access to deflated data."
 

reply via email to

[Prev in Thread] Current Thread [Next in Thread]