[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r22879 - Extractor/src/main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r22879 - Extractor/src/main |
Date: |
Tue, 24 Jul 2012 23:45:25 +0200 |
Author: grothoff
Date: 2012-07-24 23:45:25 +0200 (Tue, 24 Jul 2012)
New Revision: 22879
Modified:
Extractor/src/main/extractor.c
Extractor/src/main/extractor_plugins.c
Extractor/src/main/extractor_plugins.h
Log:
-towards compilation
Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c 2012-07-24 19:48:28 UTC (rev 22878)
+++ Extractor/src/main/extractor.c 2012-07-24 21:45:25 UTC (rev 22879)
@@ -31,6 +31,12 @@
#include "extractor_plugins.h"
+/**
+ * Size used for the shared memory segment.
+ */
+#define DEFAULT_SHM_SIZE (16 * 1024)
+
+
#if 0
/**
* Open a file
@@ -489,190 +495,132 @@
#endif
+/**
+ * Closure for 'process_plugin_reply'
+ */
+struct PluginReplyProcessor
+{
+ /**
+ * Function to call if we receive meta data from the plugin.
+ */
+ EXTRACTOR_MetaDataProcessor proc;
+ /**
+ * Closure for 'proc'.
+ */
+ void *proc_cls;
+};
+
+
/**
+ * Handler for a message from one of the plugins.
+ *
+ * @param cls closure with our 'struct PluginReplyProcessor'
+ * @param plugin plugin of the channel sending the message
+ * @param meta_type type of the meta data
+ * @param meta_format format of the meta data
+ * @param value_len number of bytes in 'value'
+ * @param value 'data' send from the plugin
+ * @param mime mime string send from the plugin
+ */
+static void
+process_plugin_reply (void *cls,
+ struct EXTRACTOR_PluginList *plugin,
+ enum EXTRACTOR_MetaType meta_type,
+ enum EXTRACTOR_MetaFormat meta_format,
+ size_t value_len,
+ const void *value,
+ const char *mime)
+{
+ struct PluginReplyProcessor *prp = cls;
+
+ // FIXME...
+}
+
+
+/**
* Extract keywords using the given set of plugins.
*
* @param plugins the list of plugins to use
- * @param data data to process, or NULL if fds is not -1
- * @param fd file to read data from, or -1 if data is not NULL
- * @param filename name of the file to which fd belongs
- * @param cfs compressed file source for compressed stream (may be NULL)
- * @param fsize size of the file or data buffer
+ * @param shm shared memory object used by the plugins (NULL if
+ * all plugins are in-process)
+ * @param ds data to process
* @param proc function to call for each meta data item found
* @param proc_cls cls argument to proc
*/
static void
do_extract (struct EXTRACTOR_PluginList *plugins,
- const char *data,
- int fd,
- const char *filename,
- struct CompressedFileSource *cfs,
- int64_t fsize,
+ struct EXTRACTOR_SharedMemory *shm,
+ struct EXTRACTOR_Datasource *ds,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
- int operation_mode;
- int plugin_count = 0;
- int shm_result;
- unsigned char *shm_ptr;
-#if !WINDOWS
- int shm_id;
-#else
- HANDLE map_handle;
-#endif
- char shm_name[MAX_SHM_NAME + 1];
+ unsigned int plugin_count;
+ struct EXTRACTOR_PluginList *pos;
+ struct StartMessage start;
+ struct EXTRACTOR_Channel *channel;
+ struct PluginReplyProcessor prp;
+ uint32_t ready;
+ int done;
- struct EXTRACTOR_PluginList *ppos;
-
- int64_t position = 0;
- int64_t preserve = 0;
- size_t map_size;
- ssize_t read_result;
- int kill_plugins = 0;
-
- if (cfs != NULL)
- operation_mode = OPMODE_DECOMPRESS;
- else if (data != NULL)
- operation_mode = OPMODE_MEMORY;
- else if (fd != -1)
- operation_mode = OPMODE_FILE;
+ plugin_count = 0;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ plugin_count++;
+ if (NULL != shm)
+ ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE);
else
- return;
+ ready = 0;
- map_size = (fd == -1) ? fsize : MAX_READ;
+ prp.proc = proc;
+ prp.proc_cls = proc_cls;
- /* Make a shared memory object. Even if we're running in-process. Simpler
that way.
- * This is only for reading-from-memory case. For reading-from-file we will
use
- * the file itself; for uncompressing-on-the-fly the decompressor will make
its own
- * shared memory object and uncompress into it directly.
- */
- if (operation_mode == OPMODE_MEMORY)
+ /* send 'start' message */
+ start.opcode = MESSAGE_EXTRACT_START;
+ start.reserved = 0;
+ start.reserved2 = 0;
+ start.shm_ready_bytes = ready;
+ start.file_size = EXTRACTOR_datasource_get_size_ (ds);
{
- operation_mode = OPMODE_MEMORY;
-#if !WINDOWS
- shm_result = make_shm_posix ((void **) &shm_ptr, &shm_id, shm_name,
MAX_SHM_NAME,
- fsize);
-#else
- shm_result = make_shm_w32 ((void **) &shm_ptr, &map_handle, shm_name,
MAX_SHM_NAME,
- fsize);
-#endif
- if (shm_result != 0)
- return;
- memcpy (shm_ptr, data, fsize);
- }
- else if (operation_mode == OPMODE_FILE)
- {
-#if WINDOWS
- shm_result = make_file_backed_shm_w32 (&map_handle, (HANDLE)
_get_osfhandle (fd), shm_name, MAX_SHM_NAME);
- if (shm_result != 0)
- return;
-#endif
- }
+ struct EXTRACTOR_Channel *channels[plugin_count];
- /* This four-loops-instead-of-one construction is intended to increase
parallelism */
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- {
- start_process (ppos);
- plugin_count += 1;
- }
-
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- load_in_process_plugin (ppos);
-
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- write_plugin_data (ppos);
-
- if (operation_mode == OPMODE_DECOMPRESS)
- {
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- init_plugin_state (ppos, operation_mode, cfs->shm_name, -1);
- }
- else if (operation_mode == OPMODE_FILE)
- {
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-#if !WINDOWS
- init_plugin_state (ppos, operation_mode, filename, fsize);
-#else
- init_plugin_state (ppos, operation_mode, shm_name, fsize);
-#endif
- }
- else
- {
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- init_plugin_state (ppos, operation_mode, shm_name, fsize);
- }
-
- if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY)
- {
- int plugins_not_ready = 0;
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- plugins_not_ready += give_shm_to_plugin (ppos, position, map_size,
fsize, operation_mode);
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- ask_in_process_plugin (ppos, shm_ptr, proc, proc_cls);
- while (plugins_not_ready > 0 && !kill_plugins)
- {
- int ready = wait_for_reply (plugins, proc, proc_cls);
- if (ready <= 0)
- kill_plugins = 1;
- plugins_not_ready -= ready;
- }
- }
- else
- {
- read_result = cfs_read (cfs, preserve);
- if (read_result > 0)
- while (1)
- {
- int plugins_not_ready = 0;
-
- map_size = cfs->shm_buf_size;
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- plugins_not_ready += give_shm_to_plugin (ppos, position, map_size,
cfs->uncompressed_size, operation_mode);
- /* Can't block in in-process plugins, unless we ONLY have one plugin */
- if (plugin_count == 1)
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- {
- /* Pass this way. we'll need it to call cfs functions later on */
- /* This is a special case */
- ppos->pass_cfs = cfs;
- ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls);
- }
- while (plugins_not_ready > 0 && !kill_plugins)
+ plugin_count = 0;
+ for (pos = plugins; NULL != pos; pos = pos->next)
{
- int ready = wait_for_reply (plugins, proc, proc_cls);
- if (ready <= 0)
- kill_plugins = 1;
- plugins_not_ready -= ready;
+ channels[plugin_count] = pos->channel;
+ if ( (NULL != pos->channel) &&
+ (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
+ &start,
+ sizeof (start)) ) )
+ {
+ channels[plugin_count] = NULL;
+ EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ pos->channel = NULL;
+ }
+ plugin_count++;
}
- if (kill_plugins)
- break;
- position = seek_to_new_position (plugins, cfs, position, map_size);
- if (position < 0 || position == cfs->uncompressed_size)
- break;
- }
- }
+ done = 0;
+ while (! done)
+ {
+ done = 1;
- if (kill_plugins)
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- stop_process (ppos);
- for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- discard_plugin_state (ppos);
-
- if (operation_mode == OPMODE_MEMORY)
- {
-#if WINDOWS
- destroy_shm_w32 (shm_ptr, map_handle);
-#else
- destroy_shm_posix (shm_ptr, shm_id, (fd == -1) ? fsize : MAX_READ,
shm_name);
-#endif
+ // FIXME: need to handle 'seek' messages from plugins somewhere
+ if (-1 ==
+ EXTRACTOR_IPC_channel_recv_ (channels,
+ plugin_count,
+ &process_plugin_reply,
+ &prp))
+ break;
+ plugin_count = 0;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ channel = channels[plugin_count];
+ // ... FIXME ...
+ plugin_count++;
+ }
+ // FIXME: need to terminate once all plugins are done...
+ done = 0;
+ }
}
- else if (operation_mode == OPMODE_FILE)
- {
-#if WINDOWS
- destroy_file_backed_shm_w32 (map_handle);
-#endif
- }
}
@@ -699,14 +647,34 @@
void *proc_cls)
{
struct EXTRACTOR_Datasource *datasource;
+ struct EXTRACTOR_SharedMemory *shm;
+ struct EXTRACTOR_PluginList *pos;
+ if (NULL == plugins)
+ return;
if (NULL == filename)
- datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size);
+ datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size,
+ proc, proc_cls);
else
- datasource = EXTRACTOR_datasource_create_from_file_ (filename);
+ datasource = EXTRACTOR_datasource_create_from_file_ (filename,
+ proc, proc_cls);
if (NULL == datasource)
- return;
- do_extract (plugins, datasource, proc, proc_cls);
+ return;
+ shm = NULL;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ if (NULL != (shm = pos->shm))
+ break;
+ if (NULL == shm)
+ shm = EXTRACTOR_IPC_shared_memory_create_ (DEFAULT_SHM_SIZE);
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ if ( (NULL == pos->shm) &&
+ (0 == (pos->flags & EXTRACTOR_OPTION_IN_PROCESS)) )
+ {
+ pos->shm = shm;
+ pos->channel = EXTRACTOR_IPC_channel_create_ (pos,
+ shm);
+ }
+ do_extract (plugins, shm, datasource, proc, proc_cls);
EXTRACTOR_datasource_destroy_ (datasource);
}
@@ -721,7 +689,6 @@
#if ENABLE_NLS
BINDTEXTDOMAIN (PACKAGE, LOCALEDIR);
- BINDTEXTDOMAIN ("iso-639", ISOLOCALEDIR); /* used by wordextractor */
#endif
err = lt_dlinit ();
if (err > 0)
Modified: Extractor/src/main/extractor_plugins.c
===================================================================
--- Extractor/src/main/extractor_plugins.c 2012-07-24 19:48:28 UTC (rev
22878)
+++ Extractor/src/main/extractor_plugins.c 2012-07-24 21:45:25 UTC (rev
22879)
@@ -380,6 +380,8 @@
prev->next = pos->next;
if (NULL != pos->channel)
EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ // FIXME: need to also destroy pos->shm if this is
+ // the last user; need to add some RC to the SHM!
free (pos->short_libname);
free (pos->libname);
free (pos->plugin_options);
Modified: Extractor/src/main/extractor_plugins.h
===================================================================
--- Extractor/src/main/extractor_plugins.h 2012-07-24 19:48:28 UTC (rev
22878)
+++ Extractor/src/main/extractor_plugins.h 2012-07-24 21:45:25 UTC (rev
22879)
@@ -78,11 +78,16 @@
const char *specials;
/**
- * Channel to communicate with out-of-process plugin.
+ * Channel to communicate with out-of-process plugin, NULL if not setup.
*/
struct EXTRACTOR_Channel *channel;
/**
+ * Memory segment shared with the channel of this plugin, NULL for none.
+ */
+ struct EXTRACTOR_SharedMemory *shm;
+
+ /**
* A position this plugin wants us to seek to. -1 if it's finished.
* Starts at 0.
*/
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r22879 - Extractor/src/main,
gnunet <=