[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r23101 - Extractor/src/main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r23101 - Extractor/src/main |
Date: |
Sat, 4 Aug 2012 21:44:41 +0200 |
Author: grothoff
Date: 2012-08-04 21:44:40 +0200 (Sat, 04 Aug 2012)
New Revision: 23101
Modified:
Extractor/src/main/TODO
Extractor/src/main/extractor.c
Extractor/src/main/extractor_datasource.c
Log:
implementing bz2 support
Modified: Extractor/src/main/TODO
===================================================================
--- Extractor/src/main/TODO 2012-08-04 19:26:28 UTC (rev 23100)
+++ Extractor/src/main/TODO 2012-08-04 19:44:40 UTC (rev 23101)
@@ -1,5 +1,2 @@
-* bz2 decompression (not implemented)
-* extract-from-bz2-file test fails!
-
* MAX_META_DATA buffer of 32 MB is a bit big as a non-growing default size;
also, valgrind reports it is leaked even though printf-debugging shows it is
not (!?)
Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c 2012-08-04 19:26:28 UTC (rev 23100)
+++ Extractor/src/main/extractor.c 2012-08-04 19:44:40 UTC (rev 23101)
@@ -377,6 +377,7 @@
ssize_t data_available;
ssize_t ready;
int done;
+ int have_in_memory;
plugin_count = 0;
for (pos = plugins; NULL != pos; pos = pos->next)
@@ -385,7 +386,7 @@
ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE);
else
ready = 0;
-
+ have_in_memory = 0;
prp.file_finished = 0;
prp.proc = proc;
prp.proc_cls = proc_cls;
@@ -398,6 +399,8 @@
start.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
for (pos = plugins; NULL != pos; pos = pos->next)
{
+ if (EXTRACTOR_OPTION_IN_PROCESS == pos->flags)
+ have_in_memory = 1;
if ( (NULL != pos->channel) &&
(-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
&start,
@@ -537,6 +540,8 @@
}
}
+ if (0 == have_in_memory)
+ return;
/* run in-process plugins */
ctx.finished = 0;
ctx.ds = ds;
@@ -548,7 +553,10 @@
ec.get_size = &in_process_get_size;
ec.proc = &in_process_proc;
if (-1 == EXTRACTOR_datasource_seek_ (ds, 0, SEEK_SET))
- return;
+ {
+ LOG ("Failed to seek to 0 for in-memory plugins\n");
+ return;
+ }
for (pos = plugins; NULL != pos; pos = pos->next)
{
Modified: Extractor/src/main/extractor_datasource.c
===================================================================
--- Extractor/src/main/extractor_datasource.c 2012-08-04 19:26:28 UTC (rev
23100)
+++ Extractor/src/main/extractor_datasource.c 2012-08-04 19:44:40 UTC (rev
23101)
@@ -449,97 +449,6 @@
#if HAVE_ZLIB
/**
- * Reset gz-compressed data stream to the beginning.
- *
- * @return 1 on success, 0 to terminate extraction,
- * -1 on decompressor initialization failure
- */
-static int
-cfs_reset_stream_zlib (struct CompressedFileSource *cfs)
-{
- if (cfs->gzip_header_length !=
- bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
- return -1;
- memset (&cfs->strm, 0, sizeof (z_stream));
- cfs->strm.avail_out = COM_CHUNK_SIZE;
-
- /*
- * note: maybe plain inflateInit(&strm) is adequate,
- * it looks more backward-compatible also ;
- *
- * ZLIB_VERNUM isn't defined by zlib version 1.1.4 ;
- * there might be a better check.
- */
- if (Z_OK != inflateInit2 (&cfs->strm,
-#ifdef ZLIB_VERNUM
- 15 + 32
-#else
- - MAX_WBITS
-#endif
- ))
- {
- LOG ("Failed to initialize zlib decompression\n");
- return -1;
- }
- cfs->fpos = 0;
- return 1;
-}
-#endif
-
-
-#if HAVE_LIBBZ2
-/**
- * Reset bz2-compressed data stream to the beginning.
- *
- * @return 1 on success, 0 to terminate extraction,
- * -1 on decompressor initialization failure
- */
-static int
-cfs_reset_stream_bz2 (struct CompressedFileSource *cfs)
-{
- BZ2_bzDecompressEnd (&cfs->bstrm);
- if (BZ_OK !=
- BZ2_bzDecompressInit (&cfs->bstrm, 0, 0))
- {
- LOG ("Failed to reinitialize BZ2 decompressor\n");
- return -1;
- }
- return 1;
-}
-#endif
-
-
-/**
- * Resets the compression stream to begin uncompressing
- * from the beginning. Used at initialization time, and when
- * seeking backward.
- *
- * @param cfs cfs to reset
- * @return 1 on success, 0 to terminate extraction,
- * -1 on error
- */
-static int
-cfs_reset_stream (struct CompressedFileSource *cfs)
-{
- switch (cfs->compression_type)
- {
-#if HAVE_ZLIB
- case COMP_TYPE_ZLIB:
- return cfs_reset_stream_zlib (cfs);
-#endif
-#if HAVE_LIBBZ2
- case COMP_TYPE_BZ2:
- return cfs_reset_stream_bz2 (cfs);
-#endif
- default:
- LOG ("invalid compression type selected\n");
- return -1;
- }
-}
-
-
-#if HAVE_ZLIB
-/**
* Initializes gz-decompression object. Might report metadata about
* compresse stream, if available. Resets the stream to the beginning.
*
@@ -591,10 +500,11 @@
return -1;
}
len = cptr - fname;
- if (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_FILENAME,
- EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
- fname,
- len))
+ if ( (NULL != proc) &&
+ (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_FILENAME,
+ EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
+ fname,
+ len)) )
return 0; /* done */
gzip_header_length += len + 1;
}
@@ -624,10 +534,11 @@
return -1;
}
len = cptr - fcomment;
- if (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_COMMENT,
- EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
- (const char *) fcomment,
- len))
+ if ( (NULL != proc) &&
+ (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_COMMENT,
+ EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
+ (const char *) fcomment,
+ len)) )
return 0; /* done */
gzip_header_length += len + 1;
}
@@ -640,7 +551,33 @@
gzip_header_length = 0;
#endif
cfs->gzip_header_length = gzip_header_length;
- return cfs_reset_stream_zlib (cfs);
+
+ if (cfs->gzip_header_length !=
+ bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
+ {
+ LOG ("Failed to seek to start to initialize gzip decompressor\n");
+ return -1;
+ }
+ cfs->strm.avail_out = COM_CHUNK_SIZE;
+ /*
+ * note: maybe plain inflateInit(&strm) is adequate,
+ * it looks more backward-compatible also ;
+ *
+ * ZLIB_VERNUM isn't defined by zlib version 1.1.4 ;
+ * there might be a better check.
+ */
+ if (Z_OK != inflateInit2 (&cfs->strm,
+#ifdef ZLIB_VERNUM
+ 15 + 32
+#else
+ - MAX_WBITS
+#endif
+ ))
+ {
+ LOG ("Failed to initialize zlib decompression\n");
+ return -1;
+ }
+ return 1;
}
#endif
@@ -659,9 +596,20 @@
cfs_init_decompressor_bz2 (struct CompressedFileSource *cfs,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
+ if (0 !=
+ bfds_seek (cfs->bfds, 0, SEEK_SET))
+ {
+ LOG ("Failed to seek to start to initialize BZ2 decompressor\n");
+ return -1;
+ }
+ memset (&cfs->bstrm, 0, sizeof (bz_stream));
if (BZ_OK !=
BZ2_bzDecompressInit (&cfs->bstrm, 0, 0))
- return -1;
+ {
+ LOG ("Failed to initialize BZ2 decompressor\n");
+ return -1;
+ }
+ cfs->bstrm.avail_out = COM_CHUNK_SIZE;
return 1;
}
#endif
@@ -680,6 +628,8 @@
cfs_init_decompressor (struct CompressedFileSource *cfs,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
+ cfs->result_pos = 0;
+ cfs->fpos = 0;
switch (cfs->compression_type)
{
#if HAVE_ZLIB
@@ -756,6 +706,24 @@
/**
+ * Resets the compression stream to begin uncompressing
+ * from the beginning. Used at initialization time, and when
+ * seeking backward.
+ *
+ * @param cfs cfs to reset
+ * @return 1 on success, 0 to terminate extraction,
+ * -1 on error
+ */
+static int
+cfs_reset_stream (struct CompressedFileSource *cfs)
+{
+ if (-1 == cfs_deinit_decompressor (cfs))
+ return -1;
+ return cfs_init_decompressor (cfs, NULL, NULL);
+}
+
+
+/**
* Destroy compressed file source.
*
* @param cfs source to destroy
@@ -826,7 +794,7 @@
int ret;
size_t rc;
ssize_t in;
- char buf[COM_CHUNK_SIZE];
+ unsigned char buf[COM_CHUNK_SIZE];
if (cfs->fpos == cfs->uncompressed_size)
{
@@ -856,7 +824,12 @@
LOG ("unexpected EOF\n");
return -1; /* unexpected EOF */
}
- cfs->strm.next_in = (unsigned char *) buf;
+ if (0 == in)
+ {
+ cfs->uncompressed_size = cfs->fpos;
+ return rc;
+ }
+ cfs->strm.next_in = buf;
cfs->strm.avail_in = (uInt) in;
cfs->strm.next_out = (unsigned char *) cfs->result;
cfs->strm.avail_out = COM_CHUNK_SIZE;
@@ -905,8 +878,74 @@
void *data,
size_t size)
{
- LOG ("bz2 decompression not implemented\n");
- return -1;
+ char *dst = data;
+ int ret;
+ size_t rc;
+ ssize_t in;
+ char buf[COM_CHUNK_SIZE];
+
+ if (cfs->fpos == cfs->uncompressed_size)
+ {
+ /* end of file */
+ return 0;
+ }
+ rc = 0;
+ if (COM_CHUNK_SIZE > cfs->bstrm.avail_out + cfs->result_pos)
+ {
+ /* got left-over decompressed data from previous round! */
+ in = COM_CHUNK_SIZE - (cfs->bstrm.avail_out + cfs->result_pos);
+ if (in > size)
+ in = size;
+ memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
+ cfs->fpos += in;
+ cfs->result_pos += in;
+ rc += in;
+ }
+ ret = BZ_OK;
+ while ( (rc < size) && (BZ_STREAM_END != ret) )
+ {
+ /* read block from original data source */
+ in = bfds_read (cfs->bfds,
+ buf, sizeof (buf));
+ if (in < 0)
+ {
+ LOG ("unexpected EOF\n");
+ return -1; /* unexpected EOF */
+ }
+ if (0 == in)
+ {
+ cfs->uncompressed_size = cfs->fpos;
+ return rc;
+ }
+ cfs->bstrm.next_in = buf;
+ cfs->bstrm.avail_in = (uInt) in;
+ cfs->bstrm.next_out = cfs->result;
+ cfs->bstrm.avail_out = COM_CHUNK_SIZE;
+ cfs->result_pos = 0;
+ ret = BZ2_bzDecompress (&cfs->bstrm);
+ if ( (BZ_OK != ret) && (BZ_STREAM_END != ret) )
+ {
+ LOG ("unexpected bzip2 decompress error: %d\n", ret);
+ return -1; /* unexpected error */
+ }
+ /* go backwards by the number of bytes left in the buffer */
+ if (-1 == bfds_seek (cfs->bfds, - (int64_t) cfs->bstrm.avail_in,
SEEK_CUR))
+ {
+ LOG ("seek failed\n");
+ return -1;
+ }
+ /* copy decompressed bytes to target buffer */
+ in = COM_CHUNK_SIZE - cfs->bstrm.avail_out;
+ if (in > size - rc)
+ in = size - rc;
+ memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
+ cfs->fpos += in;
+ cfs->result_pos += in;
+ rc += in;
+ }
+ if (BZ_STREAM_END == ret)
+ cfs->uncompressed_size = cfs->fpos;
+ return rc;
}
#endif
@@ -1041,6 +1080,12 @@
LOG ("Failed to read decompressed stream for seek operation\n");
return -1;
}
+ if (0 == ret)
+ {
+ LOG ("Reached unexpected end of stream during seek operation\n");
+ return -1;
+ }
+ ASSERT (ret <= delta);
delta -= ret;
}
return cfs->fpos;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r23101 - Extractor/src/main,
gnunet <=