[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r23094 - Extractor/src/main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r23094 - Extractor/src/main |
Date: |
Sat, 4 Aug 2012 19:32:18 +0200 |
Author: grothoff
Date: 2012-08-04 19:32:18 +0200 (Sat, 04 Aug 2012)
New Revision: 23094
Modified:
Extractor/src/main/extractor.c
Extractor/src/main/extractor_datasource.c
Extractor/src/main/extractor_datasource.h
Extractor/src/main/extractor_ipc.c
Extractor/src/main/extractor_ipc.h
Extractor/src/main/extractor_ipc_gnu.c
Extractor/src/main/extractor_plugin_main.c
Extractor/src/main/extractor_plugins.h
Extractor/src/main/test_extractor.c
Log:
gzip wip
Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c 2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor.c 2012-08-04 17:32:18 UTC (rev 23094)
@@ -82,7 +82,7 @@
um.reserved2 = 0;
um.shm_ready_bytes = (uint32_t) data_available;
um.shm_off = (uint64_t) shm_off;
- um.file_size = EXTRACTOR_datasource_get_size_ (ds);
+ um.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
if (sizeof (um) !=
EXTRACTOR_IPC_channel_send_ (plugin->channel,
&um,
@@ -300,7 +300,7 @@
{
struct InProcessContext *ctx = cls;
- return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds);
+ return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds, 0);
}
@@ -373,8 +373,8 @@
struct InProcessContext ctx;
struct EXTRACTOR_ExtractContext ec;
int64_t min_seek;
+ int64_t end;
ssize_t data_available;
- uint64_t last_position;
uint32_t ready;
int done;
@@ -386,8 +386,6 @@
else
ready = 0;
- last_position = UINT64_MAX;
-
prp.file_finished = 0;
prp.proc = proc;
prp.proc_cls = proc_cls;
@@ -397,7 +395,7 @@
start.reserved = 0;
start.reserved2 = 0;
start.shm_ready_bytes = ready;
- start.file_size = EXTRACTOR_datasource_get_size_ (ds);
+ start.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
for (pos = plugins; NULL != pos; pos = pos->next)
{
if ( (NULL != pos->channel) &&
@@ -453,18 +451,37 @@
if ( (1 == pos->round_finished) ||
(NULL == pos->channel) )
continue; /* inactive plugin */
- if (((last_position <= pos->seek_request) &&
- (last_position + data_available > pos->seek_request)) &&
- ((data_available > 0) ||
- (last_position == EXTRACTOR_datasource_get_size_ (ds))))
+ if (-1 == pos->seek_request)
{
- done = 0; /* possibly more meta data at current position! */
+ /* possibly more meta data at current position, at least
+ this plugin is still working on it... */
+ done = 0;
break;
}
- if ( (-1 == min_seek) ||
- (min_seek > pos->seek_request) )
+ if (-1 != pos->seek_request)
{
- min_seek = pos->seek_request;
+ if (SEEK_END == pos->seek_whence)
+ {
+ /* convert distance from end to absolute position */
+ pos->seek_whence = 0;
+ end = EXTRACTOR_datasource_get_size_ (ds, 1);
+ if (pos->seek_request > end)
+ {
+ LOG ("Cannot seek to before the beginning of the
file!\n");
+ pos->seek_request = 0;
+ }
+ else
+ {
+ pos->seek_request = end - pos->seek_request;
+ }
+ }
+ if ( (-1 == min_seek) ||
+ (min_seek > pos->seek_request) )
+ {
+ LOG ("Updating min seek to %llu\n",
+ (unsigned long long) pos->seek_request);
+ min_seek = pos->seek_request;
+ }
}
}
if ( (1 == done) &&
@@ -482,6 +499,10 @@
abort_all_channels (plugins);
break;
}
+ LOG ("Seeking to %lld, got %d bytes ready there\n",
+ (long long) min_seek,
+ (int) data_available);
+
}
/* if 'prp.file_finished', send 'abort' to plugins;
if not, send 'seek' notification to plugins in range */
@@ -496,23 +517,29 @@
pos->round_finished = 1;
pos->seek_request = -1;
}
- if ( (-1 != pos->seek_request) && ((last_position >
pos->seek_request) ||
- (last_position + data_available <= pos->seek_request)) &&
+ if ( (-1 != pos->seek_request) &&
(min_seek <= pos->seek_request) &&
- ((min_seek + data_available > pos->seek_request) ||
- (min_seek == EXTRACTOR_datasource_get_size_ (ds))) )
+ ( (min_seek + data_available > pos->seek_request) ||
+ (min_seek == EXTRACTOR_datasource_get_size_ (ds, 0))) )
{
+ LOG ("Notifying plugin about seek\n");
send_update_message (pos,
min_seek,
data_available,
ds);
pos->seek_request = -1;
}
+ else
+ {
+ if (-1 != pos->seek_request)
+ LOG ("Skipping plugin, seek %lld not in range %llu-%llu\n",
+ (long long) pos->seek_request,
+ min_seek,
+ min_seek + data_available);
+ }
if (0 == pos->round_finished)
done = 0; /* can't be done, plugin still active */
}
- if (min_seek >= 0)
- last_position = min_seek;
}
/* run in-process plugins */
Modified: Extractor/src/main/extractor_datasource.c
===================================================================
--- Extractor/src/main/extractor_datasource.c 2012-08-04 07:45:52 UTC (rev
23093)
+++ Extractor/src/main/extractor_datasource.c 2012-08-04 17:32:18 UTC (rev
23094)
@@ -59,7 +59,8 @@
* Data is read from the source and shoved into decompressor
* in chunks this big.
*/
-#define COM_CHUNK_SIZE (10 * 1024)
+#define COM_CHUNK_SIZE (12345)
+// #define COM_CHUNK_SIZE (10 * 1024)
/**
@@ -230,7 +231,10 @@
}
position = (int64_t) LSEEK (bfds->fd, pos, SEEK_SET);
if (position < 0)
- return -1;
+ {
+ LOG_STRERROR ("lseek");
+ return -1;
+ }
bfds->fpos = position;
bfds->buffer_pos = 0;
rd = read (bfds->fd, bfds->buffer, bfds->buffer_size);
@@ -374,18 +378,19 @@
return -1;
}
if ( (NULL == bfds->buffer) ||
- ( (bfds->buffer_pos <= pos) &&
- (bfds->buffer_pos + bfds->buffer_bytes > pos) ) )
+ ( (bfds->buffer_pos + bfds->fpos <= pos) &&
+ (bfds->buffer_pos + bfds->fpos + bfds->buffer_bytes > pos) ) )
{
- bfds->buffer_pos = pos;
- return bfds->buffer_pos;
+ bfds->buffer_pos = pos - bfds->fpos;
+ return pos;
}
if (0 != bfds_pick_next_buffer_at (bfds, pos))
{
LOG ("seek operation failed\n");
return -1;
}
- return bfds->fpos;
+ ASSERT (pos == bfds->fpos + bfds->buffer_pos);
+ return pos;
}
return -1;
}
@@ -453,13 +458,9 @@
{
if (cfs->gzip_header_length !=
bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
- return 0;
- cfs->strm.next_in = NULL;
- cfs->strm.avail_in = 0;
- cfs->strm.total_in = 0;
- cfs->strm.zalloc = NULL;
- cfs->strm.zfree = NULL;
- cfs->strm.opaque = NULL;
+ return -1;
+ memset (&cfs->strm, 0, sizeof (z_stream));
+ cfs->strm.avail_out = COM_CHUNK_SIZE;
/*
* note: maybe plain inflateInit(&strm) is adequate,
@@ -624,7 +625,6 @@
/* zlib will take care of its header */
gzip_header_length = 0;
#endif
-
cfs->gzip_header_length = gzip_header_length;
return cfs_reset_stream_zlib (cfs);
}
@@ -797,12 +797,15 @@
char buf[COM_CHUNK_SIZE];
if (cfs->fpos == cfs->uncompressed_size)
- return 0;
+ {
+ LOG ("fpos at EOF, returning 0 from cfs_read_zlib\n");
+ return 0;
+ }
rc = 0;
- if (cfs->strm.avail_out > 0)
+ if (COM_CHUNK_SIZE > cfs->strm.avail_out + cfs->result_pos)
{
/* got left-over decompressed data from previous round! */
- in = cfs->strm.avail_out;
+ in = COM_CHUNK_SIZE - (cfs->strm.avail_out + cfs->result_pos);
if (in > size)
in = size;
memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
@@ -816,8 +819,11 @@
/* read block from original data source */
in = bfds_read (cfs->bfds,
buf, sizeof (buf));
- if (in <= 0)
- return -1; /* unexpected EOF */
+ if (in < 0)
+ {
+ LOG ("unexpected EOF\n");
+ return -1; /* unexpected EOF */
+ }
cfs->strm.next_in = (unsigned char *) buf;
cfs->strm.avail_in = (uInt) in;
cfs->strm.next_out = (unsigned char *) cfs->result;
@@ -825,12 +831,18 @@
cfs->result_pos = 0;
ret = inflate (&cfs->strm, Z_SYNC_FLUSH);
if ( (Z_OK != ret) && (Z_STREAM_END != ret) )
- return -1; /* unexpected error */
+ {
+ LOG ("unexpected gzip inflate error: %d\n", ret);
+ return -1; /* unexpected error */
+ }
/* go backwards by the number of bytes left in the buffer */
if (-1 == bfds_seek (cfs->bfds, - (int64_t) cfs->strm.avail_in,
SEEK_CUR))
- return -1;
+ {
+ LOG ("seek failed\n");
+ return -1;
+ }
/* copy decompressed bytes to target buffer */
- in = cfs->strm.total_out;
+ in = COM_CHUNK_SIZE - cfs->strm.avail_out;
if (in > size - rc)
in = size - rc;
memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
@@ -840,6 +852,8 @@
}
if (Z_STREAM_END == ret)
cfs->uncompressed_size = cfs->fpos;
+ LOG ("returning %d from cfs_read_zlib\n",
+ (int) rc);
return rc;
}
@@ -963,7 +977,6 @@
LOG ("Invalid seek operation\n");
return -1;
}
-
delta = nposition - cfs->fpos;
if (delta < 0)
{
@@ -1253,15 +1266,36 @@
* Determine the overall size of the data source (after compression).
*
* @param cls must be a 'struct EXTRACTOR_Datasource'
+ * @param force force computing the size if it is unavailable
* @return overall file size, UINT64_MAX on error or unknown
*/
int64_t
-EXTRACTOR_datasource_get_size_ (void *cls)
+EXTRACTOR_datasource_get_size_ (void *cls,
+ int force)
{
struct EXTRACTOR_Datasource *ds = cls;
+ char buf[32 * 1024];
+ uint64_t pos;
if (NULL != ds->cfs)
- return ds->cfs->uncompressed_size;
+ {
+ if ( (force) &&
+ (-1 == ds->cfs->uncompressed_size) )
+ {
+ pos = ds->cfs->fpos;
+ LOG ("seeking to end\n");
+ while ( (-1 == ds->cfs->uncompressed_size) &&
+ (-1 != cfs_read (ds->cfs, buf, sizeof (buf))) ) ;
+ if (-1 == cfs_seek (ds->cfs, pos, SEEK_SET))
+ {
+ LOG ("Serious problem, I moved the buffer to determine the file
size but could not restore it...\n");
+ return -1;
+ }
+ LOG ("File size is %llu bytes decompressed\n",
+ (unsigned long long) ds->cfs->uncompressed_size);
+ }
+ return ds->cfs->uncompressed_size;
+ }
return ds->bfds->fsize;
}
Modified: Extractor/src/main/extractor_datasource.h
===================================================================
--- Extractor/src/main/extractor_datasource.h 2012-08-04 07:45:52 UTC (rev
23093)
+++ Extractor/src/main/extractor_datasource.h 2012-08-04 17:32:18 UTC (rev
23094)
@@ -105,10 +105,12 @@
* Determine the overall size of the data source (after compression).
*
* @param cls must be a 'struct EXTRACTOR_Datasource'
+ * @param force force computing the size if it is unavailable
* @return overall file size, -1 on error or unknown
*/
int64_t
-EXTRACTOR_datasource_get_size_ (void *cls);
+EXTRACTOR_datasource_get_size_ (void *cls,
+ int force);
#endif
Modified: Extractor/src/main/extractor_ipc.c
===================================================================
--- Extractor/src/main/extractor_ipc.c 2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor_ipc.c 2012-08-04 17:32:18 UTC (rev 23094)
@@ -68,7 +68,11 @@
return 0;
}
memcpy (&seek, cdata, sizeof (seek));
- plugin->seek_request = seek.file_offset;
+ plugin->seek_request = (int64_t) seek.file_offset;
+ plugin->seek_whence = seek.whence;
+ LOG ("Received %d-seek request to %llu\n",
+ (int) seek.whence,
+ (unsigned long long) seek.file_offset);
return sizeof (struct SeekRequestMessage);
case MESSAGE_META: /* Meta */
if (size < sizeof (struct MetaMessage))
Modified: Extractor/src/main/extractor_ipc.h
===================================================================
--- Extractor/src/main/extractor_ipc.h 2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor_ipc.h 2012-08-04 17:32:18 UTC (rev 23094)
@@ -231,9 +231,11 @@
unsigned char reserved;
/**
- * Always zero.
+ * 'whence' value for the seek operation;
+ * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END.
+ * Note that 'SEEK_CUR' is never used here.
*/
- uint16_t reserved2;
+ uint16_t whence;
/**
* Number of bytes requested for SHM.
@@ -241,7 +243,10 @@
uint32_t requested_bytes;
/**
- * Requested offset.
+ * Requested offset; a positive value from the end of the
+ * file is used of 'whence' is SEEK_END; a postive value
+ * from the start is used of 'whence' is SEEK_SET.
+ * 'SEEK_CUR' is never used.
*/
uint64_t file_offset;
Modified: Extractor/src/main/extractor_ipc_gnu.c
===================================================================
--- Extractor/src/main/extractor_ipc_gnu.c 2012-08-04 07:45:52 UTC (rev
23093)
+++ Extractor/src/main/extractor_ipc_gnu.c 2012-08-04 17:32:18 UTC (rev
23094)
@@ -228,7 +228,10 @@
{
if (-1 ==
EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET))
- return -1;
+ {
+ LOG ("Failed to set IPC memory due to seek error\n");
+ return -1;
+ }
if (size > shm->shm_size)
size = shm->shm_size;
return EXTRACTOR_datasource_read_ (ds,
Modified: Extractor/src/main/extractor_plugin_main.c
===================================================================
--- Extractor/src/main/extractor_plugin_main.c 2012-08-04 07:45:52 UTC (rev
23093)
+++ Extractor/src/main/extractor_plugin_main.c 2012-08-04 17:32:18 UTC (rev
23094)
@@ -117,6 +117,7 @@
struct UpdateMessage um;
uint64_t npos;
unsigned char reply;
+ uint16_t wval;
switch (whence)
{
@@ -133,6 +134,7 @@
return -1;
}
npos = (uint64_t) (pc->read_position + pos);
+ wval = 0;
break;
case SEEK_END:
if (pos > 0)
@@ -140,6 +142,12 @@
LOG ("Invalid seek operation\n");
return -1;
}
+ if (UINT64_MAX == pc->file_size)
+ {
+ wval = 2;
+ npos = (uint64_t) - pos;
+ break;
+ }
pos = (int64_t) (pc->file_size + pos);
/* fall-through! */
case SEEK_SET:
@@ -149,13 +157,15 @@
return -1;
}
npos = (uint64_t) pos;
+ wval = 0;
break;
default:
LOG ("Invalid seek operation\n");
return -1;
}
if ( (pc->shm_off <= npos) &&
- (pc->shm_off + pc->shm_ready_bytes > npos) )
+ (pc->shm_off + pc->shm_ready_bytes > npos) &&
+ (0 == wval) )
{
pc->read_position = npos;
return (int64_t) npos;
@@ -163,10 +173,17 @@
/* need to seek */
srm.opcode = MESSAGE_SEEK;
srm.reserved = 0;
- srm.reserved2 = 0;
+ srm.whence = wval;
srm.requested_bytes = pc->shm_map_size;
- if (srm.requested_bytes > pc->file_size - npos)
- srm.requested_bytes = pc->file_size - npos;
+ if (0 == wval)
+ {
+ if (srm.requested_bytes > pc->file_size - npos)
+ srm.requested_bytes = pc->file_size - npos;
+ }
+ else
+ {
+ srm.requested_bytes = npos;
+ }
srm.file_offset = npos;
if (-1 == EXTRACTOR_write_all_ (pc->out, &srm, sizeof (srm)))
{
@@ -181,7 +198,10 @@
return -1;
}
if (MESSAGE_UPDATED_SHM != reply)
- return -1; /* was likely a MESSAGE_DISCARD_STATE */
+ {
+ LOG ("Unexpected reply %d to seek\n", reply);
+ return -1; /* was likely a MESSAGE_DISCARD_STATE */
+ }
if (-1 == EXTRACTOR_read_all_ (pc->in, &um.reserved, sizeof (um) - 1))
{
LOG ("Failed to read MESSAGE_UPDATED_SHM\n");
@@ -190,6 +210,11 @@
pc->shm_off = um.shm_off;
pc->shm_ready_bytes = um.shm_ready_bytes;
pc->file_size = um.file_size;
+ if (2 == wval)
+ {
+ /* convert offset to be absolute from beginning of the file */
+ npos = pc->file_size - npos;
+ }
if ( (pc->shm_off <= npos) &&
((pc->shm_off + pc->shm_ready_bytes > npos) ||
(pc->file_size == pc->shm_off)) )
@@ -199,7 +224,11 @@
}
/* oops, serious missunderstanding, we asked to seek
and then were notified about a different position!? */
- LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my seek\n");
+ LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my %d-seek (%llu not in
%llu-%llu)\n",
+ (int) wval,
+ (unsigned long long) npos,
+ (unsigned long long) pc->shm_off,
+ (unsigned long long) pc->shm_off + pc->shm_ready_bytes);
return -1;
}
Modified: Extractor/src/main/extractor_plugins.h
===================================================================
--- Extractor/src/main/extractor_plugins.h 2012-08-04 07:45:52 UTC (rev
23093)
+++ Extractor/src/main/extractor_plugins.h 2012-08-04 17:32:18 UTC (rev
23094)
@@ -89,7 +89,9 @@
/**
* A position this plugin wants us to seek to. -1 if it's finished.
- * Starts at 0.
+ * A positive value from the end of the file is used of 'whence' is
+ * SEEK_END; a postive value from the start is used of 'whence' is
+ * SEEK_SET. 'SEEK_CUR' is never used.
*/
int64_t seek_request;
@@ -104,6 +106,13 @@
*/
int round_finished;
+ /**
+ * 'whence' value for the seek operation;
+ * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END.
+ * Note that 'SEEK_CUR' is never used here.
+ */
+ uint16_t seek_whence;
+
};
Modified: Extractor/src/main/test_extractor.c
===================================================================
--- Extractor/src/main/test_extractor.c 2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/test_extractor.c 2012-08-04 17:32:18 UTC (rev 23094)
@@ -67,7 +67,6 @@
fprintf (stderr, "Unexpected file size returned (expected 150k)\n");
abort ();
}
-
if (1024 * 100 + 4 != ec->seek (ec->cls, 1024 * 100 + 4, SEEK_SET))
{
fprintf (stderr, "Failure to seek (SEEK_SET)\n");
@@ -83,7 +82,6 @@
fprintf (stderr, "Unexpected data at offset 100k + 4\n");
abort ();
}
-
if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) !=
ec->seek (ec->cls, - (1024 * 50 + 7), SEEK_CUR))
{
@@ -97,10 +95,10 @@
}
if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) % 256 != * (unsigned char *) dp)
{
- fprintf (stderr, "Unexpected data at offset 50k - 3\n");
+ fprintf (stderr, "Unexpected data at offset 100k - 3\n");
abort ();
}
-
+ fprintf (stderr, "Seeking to 150k\n");
if (1024 * 150 != ec->seek (ec->cls, 0, SEEK_END))
{
fprintf (stderr, "Failure to seek (SEEK_END)\n");
@@ -111,12 +109,12 @@
fprintf (stderr, "Failed to receive EOF at 150k\n");
abort ();
}
-
if (1024 * 150 - 2 != ec->seek (ec->cls, -2, SEEK_END))
{
fprintf (stderr, "Failure to seek (SEEK_END - 2)\n");
abort ();
}
+ fprintf (stderr, "Reading at 150k - 2\n");
if (1 != ec->read (ec->cls, &dp, 1))
{
fprintf (stderr, "Failure to read at 150k - 3\n");
@@ -127,7 +125,7 @@
fprintf (stderr, "Unexpected data at offset 150k - 3\n");
abort ();
}
-
+ fprintf (stderr, "Good at 150k\n");
if (0 != ec->proc (ec->cls, "test", EXTRACTOR_METATYPE_COMMENT,
EXTRACTOR_METAFORMAT_UTF8, "<no mime>", "Hello world!",
strlen ("Hello world!") + 1))
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r23094 - Extractor/src/main,
gnunet <=