[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r22959 - Extractor/src/main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r22959 - Extractor/src/main |
Date: |
Sun, 29 Jul 2012 23:49:48 +0200 |
Author: grothoff
Date: 2012-07-29 23:49:48 +0200 (Sun, 29 Jul 2012)
New Revision: 22959
Modified:
Extractor/src/main/extractor.c
Log:
-towards a theoretically working implementation...
Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c 2012-07-29 21:32:40 UTC (rev 22958)
+++ Extractor/src/main/extractor.c 2012-07-29 21:49:48 UTC (rev 22959)
@@ -102,10 +102,59 @@
*/
void *proc_cls;
+ /**
+ * Are we done with processing this file? 0 to continue, 1 to terminate.
+ */
+ int file_finished;
+
};
+
/**
+ * Send a 'discard state' message to the plugin and mark it as finished
+ * for this round.
+ *
+ * @param plugin plugin to notify
+ */
+static void
+send_discard_message (struct EXTRACTOR_PluginList *plugin)
+{
+ static unsigned char disc_msg = MESSAGE_DISCARD_STATE;
+
+ if (sizeof (disc_msg) !=
+ EXTRACTOR_IPC_channel_send_ (plugin->channel,
+ &disc_msg,
+ sizeof (disc_msg)) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+ plugin->channel = NULL;
+ plugin->round_finished = 1;
+ }
+}
+
+
+/**
+ * We had some serious trouble. Abort all channels.
+ *
+ * @param plugins list of plugins with channels to abort
+ */
+static void
+abort_all_channels (struct EXTRACTOR_PluginList *plugins)
+{
+ struct EXTRACTOR_PluginList *pos;
+
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if (NULL == pos->channel)
+ continue;
+ EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ pos->channel = NULL;
+ }
+}
+
+
+/**
* Handler for a message from one of the plugins.
*
* @param cls closure with our 'struct PluginReplyProcessor'
@@ -125,9 +174,35 @@
const void *value,
const char *mime)
{
- // struct PluginReplyProcessor *prp = cls;
+ static unsigned char cont_msg = MESSAGE_CONTINUE_EXTRACTING;
+ struct PluginReplyProcessor *prp = cls;
- // FIXME...
+ if (0 != prp->file_finished)
+ {
+ /* client already aborted, ignore message, tell plugin about abort */
+ return;
+ }
+ if (0 != prp->proc (prp->proc_cls,
+ plugin->short_libname,
+ meta_type,
+ meta_format,
+ mime,
+ value,
+ value_len))
+ {
+ prp->file_finished = 1;
+ send_discard_message (plugin);
+ return;
+ }
+ if (sizeof (cont_msg) !=
+ EXTRACTOR_IPC_channel_send_ (plugin->channel,
+ &cont_msg,
+ sizeof (cont_msg)) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+ plugin->channel = NULL;
+ plugin->round_finished = 1;
+ }
}
@@ -152,6 +227,8 @@
struct StartMessage start;
struct EXTRACTOR_Channel *channel;
struct PluginReplyProcessor prp;
+ int64_t min_seek;
+ ssize_t data_available;
uint32_t ready;
int done;
@@ -163,6 +240,7 @@
else
ready = 0;
+ prp.file_finished = 0;
prp.proc = proc;
prp.proc_cls = proc_cls;
@@ -171,51 +249,101 @@
start.reserved = 0;
start.reserved2 = 0;
start.shm_ready_bytes = ready;
- start.file_size = EXTRACTOR_datasource_get_size_ (ds);
- {
- struct EXTRACTOR_Channel *channels[plugin_count];
+ start.file_size = EXTRACTOR_datasource_get_size_ (ds);
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if ( (NULL != pos->channel) &&
+ (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
+ &start,
+ sizeof (start)) ) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ pos->channel = NULL;
+ }
+ }
+ done = 0;
+ while (! done)
+ {
+ struct EXTRACTOR_Channel *channels[plugin_count];
+
+ /* calculate current 'channels' array */
+ plugin_count = 0;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ channels[plugin_count] = pos->channel;
+ plugin_count++;
+ }
+
+ /* give plugins chance to send us meta data, seek or finished messages */
+ if (-1 ==
+ EXTRACTOR_IPC_channel_recv_ (channels,
+ plugin_count,
+ &process_plugin_reply,
+ &prp))
+ {
+ /* serious problem in IPC; reset *all* channels */
+ abort_all_channels (plugins);
+ break;
+ }
+ /* calculate minimum seek request (or set done=0 to continue here) */
+ done = 1;
+ min_seek = -1;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if ( (1 == pos->round_finished) ||
+ (NULL == pos->channel) )
+ continue; /* inactive plugin */
+ if (-1 == pos->seek_request)
+ {
+ done = 0; /* possibly more meta data at current position! */
+ break;
+ }
+ if ( (-1 == min_seek) ||
+ (min_seek > pos->seek_request) )
+ {
+ min_seek = pos->seek_request;
+ }
+ }
+ if ( (1 == done) &&
+ (-1 != min_seek) )
+ {
+ /* current position done, but seek requested */
+ done = 0;
+ if (-1 ==
+ (data_available = EXTRACTOR_IPC_shared_memory_set_ (shm,
+ ds,
+ min_seek,
+
DEFAULT_SHM_SIZE)))
+ {
+ abort_all_channels (plugins);
+ break;
+ }
+ }
+ /* if 'prp.file_finished', send 'abort' to plugins;
+ if not, send 'seek' notification to plugins in range */
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if (NULL == (channel = channels[plugin_count]))
+ continue;
+ if ( (-1 != pos->seek_request) &&
+ (min_seek <= pos->seek_request) &&
+ (min_seek + data_available > pos->seek_request) )
+ {
- plugin_count = 0;
- for (pos = plugins; NULL != pos; pos = pos->next)
- {
- channels[plugin_count] = pos->channel;
- if ( (NULL != pos->channel) &&
- (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
- &start,
- sizeof (start)) ) )
- {
- channels[plugin_count] = NULL;
- EXTRACTOR_IPC_channel_destroy_ (pos->channel);
- pos->channel = NULL;
- }
- plugin_count++;
- }
- done = 0;
- while (! done)
- {
- done = 1;
+ /* FIXME: notify plugin about seek! */
+ pos->seek_request = -1;
+ }
+ if ( (-1 != pos->seek_request) &&
+ (1 == prp.file_finished) )
+ {
+ send_discard_message (pos);
+ pos->round_finished = 1;
+ }
+ if (0 == pos->round_finished)
+ done = 0; /* can't be done, plugin still active */
+ }
+ }
- // FIXME: need to handle 'seek' messages from plugins somewhere
- if (-1 ==
- EXTRACTOR_IPC_channel_recv_ (channels,
- plugin_count,
- &process_plugin_reply,
- &prp))
- break;
- plugin_count = 0;
- for (pos = plugins; NULL != pos; pos = pos->next)
- {
- if (NULL != (channel = channels[plugin_count]))
- {
- // ... FIXME ...
- }
- plugin_count++;
- }
- // FIXME: need to terminate once all plugins are done...
- done = 0;
- }
- }
-
/* run in-process plugins */
for (pos = plugins; NULL != pos; pos = pos->next)
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r22959 - Extractor/src/main,
gnunet <=