gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r23094 - Extractor/src/main


From: gnunet
Subject: [GNUnet-SVN] r23094 - Extractor/src/main
Date: Sat, 4 Aug 2012 19:32:18 +0200

Author: grothoff
Date: 2012-08-04 19:32:18 +0200 (Sat, 04 Aug 2012)
New Revision: 23094

Modified:
   Extractor/src/main/extractor.c
   Extractor/src/main/extractor_datasource.c
   Extractor/src/main/extractor_datasource.h
   Extractor/src/main/extractor_ipc.c
   Extractor/src/main/extractor_ipc.h
   Extractor/src/main/extractor_ipc_gnu.c
   Extractor/src/main/extractor_plugin_main.c
   Extractor/src/main/extractor_plugins.h
   Extractor/src/main/test_extractor.c
Log:
gzip wip

Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c      2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor.c      2012-08-04 17:32:18 UTC (rev 23094)
@@ -82,7 +82,7 @@
   um.reserved2 = 0;
   um.shm_ready_bytes = (uint32_t) data_available;
   um.shm_off = (uint64_t) shm_off;
-  um.file_size = EXTRACTOR_datasource_get_size_ (ds);
+  um.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
   if (sizeof (um) !=
       EXTRACTOR_IPC_channel_send_ (plugin->channel,
                                   &um,
@@ -300,7 +300,7 @@
 {
   struct InProcessContext *ctx = cls;
   
-  return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds);
+  return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds, 0);
 }
 
 
@@ -373,8 +373,8 @@
   struct InProcessContext ctx;
   struct EXTRACTOR_ExtractContext ec;
   int64_t min_seek;
+  int64_t end;
   ssize_t data_available;
-  uint64_t last_position;
   uint32_t ready;
   int done;
 
@@ -386,8 +386,6 @@
   else
     ready = 0;
 
-  last_position = UINT64_MAX;
-
   prp.file_finished = 0;
   prp.proc = proc;
   prp.proc_cls = proc_cls;
@@ -397,7 +395,7 @@
   start.reserved = 0;
   start.reserved2 = 0;
   start.shm_ready_bytes = ready;
-  start.file_size = EXTRACTOR_datasource_get_size_ (ds);  
+  start.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
   for (pos = plugins; NULL != pos; pos = pos->next)
     {
       if ( (NULL != pos->channel) &&
@@ -453,18 +451,37 @@
          if ( (1 == pos->round_finished) ||
               (NULL == pos->channel) )
            continue; /* inactive plugin */
-         if (((last_position <= pos->seek_request) &&
-              (last_position + data_available > pos->seek_request)) &&
-              ((data_available > 0) ||
-              (last_position == EXTRACTOR_datasource_get_size_ (ds))))
+         if (-1 == pos->seek_request)
            {
-             done = 0; /* possibly more meta data at current position! */
+             /* possibly more meta data at current position, at least 
+                this plugin is still working on it... */
+             done = 0; 
              break;
            }
-         if ( (-1 == min_seek) ||
-              (min_seek > pos->seek_request) )
+         if (-1 != pos->seek_request)
            {
-             min_seek = pos->seek_request;
+             if (SEEK_END == pos->seek_whence)
+               {
+                 /* convert distance from end to absolute position */
+                 pos->seek_whence = 0;
+                 end = EXTRACTOR_datasource_get_size_ (ds, 1);
+                 if (pos->seek_request > end)
+                   {
+                     LOG ("Cannot seek to before the beginning of the 
file!\n");
+                     pos->seek_request = 0;
+                   }
+                 else
+                   {                 
+                     pos->seek_request = end - pos->seek_request;
+                   }
+               }
+             if ( (-1 == min_seek) ||
+                  (min_seek > pos->seek_request) )
+               {
+                 LOG ("Updating min seek to %llu\n",
+                      (unsigned long long) pos->seek_request);
+                 min_seek = pos->seek_request;         
+               }
            }
        }
       if ( (1 == done) &&
@@ -482,6 +499,10 @@
              abort_all_channels (plugins);
              break;
            }
+         LOG ("Seeking to %lld, got %d bytes ready there\n",
+              (long long) min_seek,
+              (int) data_available);
+
        }
       /* if 'prp.file_finished', send 'abort' to plugins;
         if not, send 'seek' notification to plugins in range */
@@ -496,23 +517,29 @@
              pos->round_finished = 1;
              pos->seek_request = -1; 
            }         
-         if ( (-1 != pos->seek_request) && ((last_position > 
pos->seek_request) ||
-               (last_position + data_available <= pos->seek_request)) &&
+         if ( (-1 != pos->seek_request) && 
               (min_seek <= pos->seek_request) &&
-              ((min_seek + data_available > pos->seek_request) ||
-               (min_seek == EXTRACTOR_datasource_get_size_ (ds))) )
+              ( (min_seek + data_available > pos->seek_request) ||
+                (min_seek == EXTRACTOR_datasource_get_size_ (ds, 0))) )
            {
+             LOG ("Notifying plugin about seek\n");
              send_update_message (pos,
                                   min_seek,
                                   data_available,
                                   ds);
              pos->seek_request = -1; 
            }
+         else
+           {
+             if (-1 != pos->seek_request)
+               LOG ("Skipping plugin, seek %lld not in range %llu-%llu\n",
+                    (long long) pos->seek_request,
+                    min_seek,
+                    min_seek + data_available);
+           }
          if (0 == pos->round_finished)
            done = 0; /* can't be done, plugin still active */  
        }
-      if (min_seek >= 0)
-        last_position = min_seek;
     }
 
   /* run in-process plugins */

Modified: Extractor/src/main/extractor_datasource.c
===================================================================
--- Extractor/src/main/extractor_datasource.c   2012-08-04 07:45:52 UTC (rev 
23093)
+++ Extractor/src/main/extractor_datasource.c   2012-08-04 17:32:18 UTC (rev 
23094)
@@ -59,7 +59,8 @@
  * Data is read from the source and shoved into decompressor
  * in chunks this big.
  */
-#define COM_CHUNK_SIZE (10 * 1024)
+#define COM_CHUNK_SIZE (12345)
+// #define COM_CHUNK_SIZE (10 * 1024)
 
 
 /**
@@ -230,7 +231,10 @@
     }
   position = (int64_t) LSEEK (bfds->fd, pos, SEEK_SET);
   if (position < 0)
-    return -1;
+    {
+      LOG_STRERROR ("lseek");
+      return -1;
+    }
   bfds->fpos = position;
   bfds->buffer_pos = 0;
   rd = read (bfds->fd, bfds->buffer, bfds->buffer_size);
@@ -374,18 +378,19 @@
          return -1;
        }
       if ( (NULL == bfds->buffer) ||
-          ( (bfds->buffer_pos <= pos) &&
-            (bfds->buffer_pos + bfds->buffer_bytes > pos) ) )
+          ( (bfds->buffer_pos + bfds->fpos <= pos) &&
+            (bfds->buffer_pos + bfds->fpos + bfds->buffer_bytes > pos) ) )
        {
-         bfds->buffer_pos = pos; 
-         return bfds->buffer_pos;
+         bfds->buffer_pos = pos - bfds->fpos; 
+         return pos;
        }
       if (0 != bfds_pick_next_buffer_at (bfds, pos))
        {
          LOG ("seek operation failed\n");
          return -1;
        }
-      return bfds->fpos;
+      ASSERT (pos == bfds->fpos + bfds->buffer_pos);
+      return pos;
     }
   return -1;
 }
@@ -453,13 +458,9 @@
 {
   if (cfs->gzip_header_length != 
       bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
-    return 0;
-  cfs->strm.next_in = NULL;
-  cfs->strm.avail_in = 0;
-  cfs->strm.total_in = 0;
-  cfs->strm.zalloc = NULL;
-  cfs->strm.zfree = NULL;
-  cfs->strm.opaque = NULL;
+    return -1;
+  memset (&cfs->strm, 0, sizeof (z_stream));
+  cfs->strm.avail_out = COM_CHUNK_SIZE;
 
   /*
    * note: maybe plain inflateInit(&strm) is adequate,
@@ -624,7 +625,6 @@
   /* zlib will take care of its header */
   gzip_header_length = 0;
 #endif
-
   cfs->gzip_header_length = gzip_header_length;
   return cfs_reset_stream_zlib (cfs);
 }
@@ -797,12 +797,15 @@
   char buf[COM_CHUNK_SIZE];
 
   if (cfs->fpos == cfs->uncompressed_size)
-    return 0;
+    {
+      LOG ("fpos at EOF, returning 0 from cfs_read_zlib\n");
+      return 0;
+    }
   rc = 0;
-  if (cfs->strm.avail_out > 0)
+  if (COM_CHUNK_SIZE > cfs->strm.avail_out + cfs->result_pos)
     {
       /* got left-over decompressed data from previous round! */
-      in = cfs->strm.avail_out;
+      in = COM_CHUNK_SIZE - (cfs->strm.avail_out + cfs->result_pos);
       if (in > size)
        in = size;
       memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
@@ -816,8 +819,11 @@
       /* read block from original data source */
       in = bfds_read (cfs->bfds,
                      buf, sizeof (buf));
-      if (in <= 0)
-       return -1; /* unexpected EOF */
+      if (in < 0)      
+       {
+         LOG ("unexpected EOF\n");
+         return -1; /* unexpected EOF */
+       }
       cfs->strm.next_in = (unsigned char *) buf;
       cfs->strm.avail_in = (uInt) in;
       cfs->strm.next_out = (unsigned char *) cfs->result;
@@ -825,12 +831,18 @@
       cfs->result_pos = 0;
       ret = inflate (&cfs->strm, Z_SYNC_FLUSH);
       if ( (Z_OK != ret) && (Z_STREAM_END != ret) )
-       return -1; /* unexpected error */
+       {
+         LOG ("unexpected gzip inflate error: %d\n", ret);
+         return -1; /* unexpected error */
+       }
       /* go backwards by the number of bytes left in the buffer */
       if (-1 == bfds_seek (cfs->bfds, - (int64_t) cfs->strm.avail_in, 
SEEK_CUR))
-       return -1;
+       {
+         LOG ("seek failed\n");
+         return -1;
+       }
       /* copy decompressed bytes to target buffer */
-      in = cfs->strm.total_out;
+      in = COM_CHUNK_SIZE - cfs->strm.avail_out;
       if (in > size - rc)
        in = size - rc;
       memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
@@ -840,6 +852,8 @@
     }
   if (Z_STREAM_END == ret)
     cfs->uncompressed_size = cfs->fpos;
+  LOG ("returning %d from cfs_read_zlib\n",
+       (int) rc);
   return rc;
 }
 
@@ -963,7 +977,6 @@
       LOG ("Invalid seek operation\n");
       return -1;
     }
-  
   delta = nposition - cfs->fpos;
   if (delta < 0)
     {
@@ -1253,15 +1266,36 @@
  * Determine the overall size of the data source (after compression).
  * 
  * @param cls must be a 'struct EXTRACTOR_Datasource'
+ * @param force force computing the size if it is unavailable
  * @return overall file size, UINT64_MAX on error or unknown
  */ 
 int64_t 
-EXTRACTOR_datasource_get_size_ (void *cls)
+EXTRACTOR_datasource_get_size_ (void *cls,
+                               int force)
 {
   struct EXTRACTOR_Datasource *ds = cls;
+  char buf[32 * 1024];
+  uint64_t pos;  
 
   if (NULL != ds->cfs)
-    return ds->cfs->uncompressed_size;
+    {
+      if ( (force) &&
+          (-1 == ds->cfs->uncompressed_size) )
+       {
+         pos = ds->cfs->fpos;
+         LOG ("seeking to end\n");
+         while ( (-1 == ds->cfs->uncompressed_size) &&
+                 (-1 != cfs_read (ds->cfs, buf, sizeof (buf))) ) ;
+         if (-1 == cfs_seek (ds->cfs, pos, SEEK_SET))
+           {
+             LOG ("Serious problem, I moved the buffer to determine the file 
size but could not restore it...\n");
+             return -1;
+           } 
+         LOG ("File size is %llu bytes decompressed\n",
+              (unsigned long long) ds->cfs->uncompressed_size);
+       }       
+      return ds->cfs->uncompressed_size;
+    }
   return ds->bfds->fsize;
 }
 

Modified: Extractor/src/main/extractor_datasource.h
===================================================================
--- Extractor/src/main/extractor_datasource.h   2012-08-04 07:45:52 UTC (rev 
23093)
+++ Extractor/src/main/extractor_datasource.h   2012-08-04 17:32:18 UTC (rev 
23094)
@@ -105,10 +105,12 @@
  * Determine the overall size of the data source (after compression).
  * 
  * @param cls must be a 'struct EXTRACTOR_Datasource'
+ * @param force force computing the size if it is unavailable
  * @return overall file size, -1 on error or unknown
  */ 
 int64_t 
-EXTRACTOR_datasource_get_size_ (void *cls);
+EXTRACTOR_datasource_get_size_ (void *cls,
+                               int force);
 
 
 #endif

Modified: Extractor/src/main/extractor_ipc.c
===================================================================
--- Extractor/src/main/extractor_ipc.c  2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor_ipc.c  2012-08-04 17:32:18 UTC (rev 23094)
@@ -68,7 +68,11 @@
              return 0;
            }
          memcpy (&seek, cdata, sizeof (seek));
-         plugin->seek_request = seek.file_offset;
+         plugin->seek_request = (int64_t) seek.file_offset;
+         plugin->seek_whence = seek.whence;
+         LOG ("Received %d-seek request to %llu\n",
+              (int) seek.whence,
+              (unsigned long long) seek.file_offset);
          return sizeof (struct SeekRequestMessage);
        case MESSAGE_META: /* Meta */
          if (size < sizeof (struct MetaMessage))

Modified: Extractor/src/main/extractor_ipc.h
===================================================================
--- Extractor/src/main/extractor_ipc.h  2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/extractor_ipc.h  2012-08-04 17:32:18 UTC (rev 23094)
@@ -231,9 +231,11 @@
   unsigned char reserved;
 
   /**
-   * Always zero.
+   * 'whence' value for the seek operation;
+   * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END.
+   * Note that 'SEEK_CUR' is never used here.
    */
-  uint16_t reserved2;
+  uint16_t whence;
 
   /**
    * Number of bytes requested for SHM.
@@ -241,7 +243,10 @@
   uint32_t requested_bytes;
 
   /**
-   * Requested offset.
+   * Requested offset; a positive value from the end of the
+   * file is used of 'whence' is SEEK_END; a postive value
+   * from the start is used of 'whence' is SEEK_SET.  
+   * 'SEEK_CUR' is never used.
    */
   uint64_t file_offset;
 

Modified: Extractor/src/main/extractor_ipc_gnu.c
===================================================================
--- Extractor/src/main/extractor_ipc_gnu.c      2012-08-04 07:45:52 UTC (rev 
23093)
+++ Extractor/src/main/extractor_ipc_gnu.c      2012-08-04 17:32:18 UTC (rev 
23094)
@@ -228,7 +228,10 @@
 {
   if (-1 ==
       EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET))
-    return -1;
+    {
+      LOG ("Failed to set IPC memory due to seek error\n");
+      return -1;
+    }
   if (size > shm->shm_size)
     size = shm->shm_size;
   return EXTRACTOR_datasource_read_ (ds,

Modified: Extractor/src/main/extractor_plugin_main.c
===================================================================
--- Extractor/src/main/extractor_plugin_main.c  2012-08-04 07:45:52 UTC (rev 
23093)
+++ Extractor/src/main/extractor_plugin_main.c  2012-08-04 17:32:18 UTC (rev 
23094)
@@ -117,6 +117,7 @@
   struct UpdateMessage um;
   uint64_t npos;
   unsigned char reply;
+  uint16_t wval;
 
   switch (whence)
     {
@@ -133,6 +134,7 @@
          return -1;
        }
       npos = (uint64_t) (pc->read_position + pos);
+      wval = 0;
       break;
     case SEEK_END:
       if (pos > 0)
@@ -140,6 +142,12 @@
          LOG ("Invalid seek operation\n");
          return -1;
        }
+      if (UINT64_MAX == pc->file_size)
+       {
+         wval = 2;
+         npos = (uint64_t) - pos;
+         break;
+       }
       pos = (int64_t) (pc->file_size + pos);
       /* fall-through! */
     case SEEK_SET:
@@ -149,13 +157,15 @@
          return -1;
        }
       npos = (uint64_t) pos;
+      wval = 0;
       break;
     default:
       LOG ("Invalid seek operation\n");
       return -1;
     }
   if ( (pc->shm_off <= npos) &&
-       (pc->shm_off + pc->shm_ready_bytes > npos) )
+       (pc->shm_off + pc->shm_ready_bytes > npos) &&
+       (0 == wval) )
     {
       pc->read_position = npos;
       return (int64_t) npos;
@@ -163,10 +173,17 @@
   /* need to seek */
   srm.opcode = MESSAGE_SEEK;
   srm.reserved = 0;
-  srm.reserved2 = 0;
+  srm.whence = wval;
   srm.requested_bytes = pc->shm_map_size;
-  if (srm.requested_bytes > pc->file_size - npos)
-    srm.requested_bytes = pc->file_size - npos;
+  if (0 == wval)
+    {
+      if (srm.requested_bytes > pc->file_size - npos)
+       srm.requested_bytes = pc->file_size - npos;
+    }
+  else
+    {
+      srm.requested_bytes = npos;
+    }
   srm.file_offset = npos;
   if (-1 == EXTRACTOR_write_all_ (pc->out, &srm, sizeof (srm)))
     {
@@ -181,7 +198,10 @@
       return -1;
     }
   if (MESSAGE_UPDATED_SHM != reply)    
-    return -1; /* was likely a MESSAGE_DISCARD_STATE */
+    {
+      LOG ("Unexpected reply %d to seek\n", reply);
+      return -1; /* was likely a MESSAGE_DISCARD_STATE */
+    }
   if (-1 == EXTRACTOR_read_all_ (pc->in, &um.reserved, sizeof (um) - 1))
     {
       LOG ("Failed to read MESSAGE_UPDATED_SHM\n");
@@ -190,6 +210,11 @@
   pc->shm_off = um.shm_off;
   pc->shm_ready_bytes = um.shm_ready_bytes;
   pc->file_size = um.file_size;
+  if (2 == wval)
+    {
+      /* convert offset to be absolute from beginning of the file */
+      npos = pc->file_size - npos;
+    }
   if ( (pc->shm_off <= npos) &&
        ((pc->shm_off + pc->shm_ready_bytes > npos) ||
        (pc->file_size == pc->shm_off)) )
@@ -199,7 +224,11 @@
     }
   /* oops, serious missunderstanding, we asked to seek
      and then were notified about a different position!? */
-  LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my seek\n");
+  LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my %d-seek (%llu not in 
%llu-%llu)\n",
+       (int) wval,
+       (unsigned long long) npos,
+       (unsigned long long) pc->shm_off,
+       (unsigned long long) pc->shm_off + pc->shm_ready_bytes);
   return -1;
 }
 

Modified: Extractor/src/main/extractor_plugins.h
===================================================================
--- Extractor/src/main/extractor_plugins.h      2012-08-04 07:45:52 UTC (rev 
23093)
+++ Extractor/src/main/extractor_plugins.h      2012-08-04 17:32:18 UTC (rev 
23094)
@@ -89,7 +89,9 @@
 
   /**
    * A position this plugin wants us to seek to. -1 if it's finished.
-   * Starts at 0.
+   * A positive value from the end of the file is used of 'whence' is
+   * SEEK_END; a postive value from the start is used of 'whence' is
+   * SEEK_SET.  'SEEK_CUR' is never used.
    */
   int64_t seek_request;
 
@@ -104,6 +106,13 @@
    */
   int round_finished;
 
+  /**
+   * 'whence' value for the seek operation;
+   * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END.
+   * Note that 'SEEK_CUR' is never used here.
+   */
+  uint16_t seek_whence;
+
 };
 
 

Modified: Extractor/src/main/test_extractor.c
===================================================================
--- Extractor/src/main/test_extractor.c 2012-08-04 07:45:52 UTC (rev 23093)
+++ Extractor/src/main/test_extractor.c 2012-08-04 17:32:18 UTC (rev 23094)
@@ -67,7 +67,6 @@
       fprintf (stderr, "Unexpected file size returned (expected 150k)\n");
       abort (); 
     }              
-
   if (1024 * 100 + 4 != ec->seek (ec->cls, 1024 * 100 + 4, SEEK_SET))
     {
       fprintf (stderr, "Failure to seek (SEEK_SET)\n");
@@ -83,7 +82,6 @@
       fprintf (stderr, "Unexpected data at offset 100k + 4\n");
       abort ();
     }
-
   if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) !=
       ec->seek (ec->cls, - (1024 * 50 + 7), SEEK_CUR))
     {
@@ -97,10 +95,10 @@
     }
   if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) % 256 != * (unsigned char *) dp)
     {
-      fprintf (stderr, "Unexpected data at offset 50k - 3\n");
+      fprintf (stderr, "Unexpected data at offset 100k - 3\n");
       abort ();
     }
-
+  fprintf (stderr, "Seeking to 150k\n");
   if (1024 * 150 != ec->seek (ec->cls, 0, SEEK_END))
     {
       fprintf (stderr, "Failure to seek (SEEK_END)\n");
@@ -111,12 +109,12 @@
       fprintf (stderr, "Failed to receive EOF at 150k\n");
       abort ();
     }
-
   if (1024 * 150 - 2 != ec->seek (ec->cls, -2, SEEK_END))
     {
       fprintf (stderr, "Failure to seek (SEEK_END - 2)\n");
       abort ();
     }
+  fprintf (stderr, "Reading at 150k - 2\n");
   if (1 != ec->read (ec->cls, &dp, 1))
     {
       fprintf (stderr, "Failure to read at 150k - 3\n");
@@ -127,7 +125,7 @@
       fprintf (stderr, "Unexpected data at offset 150k - 3\n");
       abort ();
     }
-
+  fprintf (stderr, "Good at 150k\n");
   if (0 != ec->proc (ec->cls, "test", EXTRACTOR_METATYPE_COMMENT,
       EXTRACTOR_METAFORMAT_UTF8, "<no mime>", "Hello world!",
       strlen ("Hello world!") + 1))




reply via email to

[Prev in Thread] Current Thread [Next in Thread]