/* ** Copyright (C) 2012 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.227.7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract FA8721-05-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** mod_fan_in.lzz ** ** Implementation of the "Fan-In" module. ** ** The Fan-In module copies blocks of data from a user-specified ** number of inputs to one output. ** */ #hdr #include RCSIDENTVAR(rcsID_MOD_FAN_OUT_H, "$Id$"); #include "module.hh" #end #src RCSIDENT("$Id$"); #include "modsys.hh" #define MOD_FAN_IN_CLASS_NAME "Fan-In" #define DEFAULT_READ_BLOCK_SIZE 10000 #end namespace silk { class ModFanIn : public Module { friend int mod_fan_in_so_loader( uint16_t major_version, uint16_t minor_version, void *so_loader_data); private: static const Module::port_t ports_out[] = { {"out", "Record output"}, SK_MODULE_PORT_SENTINEL }; std::vector ports_in; std::vector port_names; enum params_id_t { OPT_INPUT_COUNT }; /** The definition of parameters for the ModFanOut module. */ struct class_params_t { /** name of the parameter */ const char *key; /** an integer identifier for it */ int val; /** min number of values it supports */ long lower; /** max number of values it supports */ long upper; /** description of parameter */ const char *description; }; static ModFanIn::class_params_t class_params[] = { {"input_count", OPT_INPUT_COUNT, 1, -1, "Number of input connectors to create"}, {0, 0, 0, 0, 0} /* sentinel entry */ }; ModFanIn() { conns_out.push_back(NULL); } ~ModFanIn() {} public: const char *get_type(void) const { return MOD_FAN_IN_CLASS_NAME; } private: static Module *_create() { return new ModFanIn(); } void _init() { unsigned int i; if (conns_out[0] == NULL) { std::ostringstream os; os << "The " << get_type() << " module's " << ports_out[0].name << " connector is not connected"; throw sk_error_module_bad_param_t(os); } for (i = 0; i < conns_in.size(); i++) { if (conns_in[i]) { continue; } std::ostringstream os; os << "The " << get_type() << " module's " << ports_in[i].name << " connector is not connected"; throw sk_error_module_bad_param_t(os); } } void _run() { std::vector ready; std::vector::iterator conn; Connector *out = conns_out[0]; void *in_block; void *out_block = NULL; size_t in_size; size_t out_size; size_t active_inputs; int rv; /* create and initialize the connector poll object */ PollConn poller(PollConn::READ); for (conn = conns_in.begin(); conn != conns_in.end(); ++conn) { (*conn)->add_to_poll(&poller); } active_inputs = conns_in.size(); /* run until no one is giving us data */ while (active_inputs > 0) { /* Which of the upstream connectors have data? If none have * data, wait forever for data to become available. */ poller.poll(1, ready); if (stopped || ready.empty()) { goto END; } in_size = DEFAULT_READ_BLOCK_SIZE; /* Process all the connectors that have data */ for (conn = ready.begin(); conn != ready.end(); ++conn) { rv = (*conn)->get_read_block(&in_block, &in_size); if (rv) { --active_inputs; (*conn)->remove_from_poll(&poller); continue; } /* Copy the block to the downstream connector */ out_size = in_size; rv = out->get_write_block(&out_block, &out_size); if (rv || stopped) { (*conn)->return_read_block(in_block); goto END; } if (stopped) { (*conn)->return_read_block(in_block); goto END; } assert(out_size >= in_size); memcpy(out_block, in_block, in_size); out->return_write_block(out_block, in_size); out_block = NULL; (*conn)->return_read_block(in_block); if (stopped) { goto END; } } } END: if (out_block) { out->return_read_block(out_block); } out->stop_no_more_data(); for (conn = conns_in.begin(); conn != conns_in.end(); ++conn) { (*conn)->stop(); try { (*conn)->remove_from_poll(&poller); } catch (sk_error_conn_poll_add_remove_t &e) { // ignore this exception } } } void _get_ports( std::vector &inputs, std::vector &outputs) { const Module::port_t *port; inputs = ports_in; outputs.clear(); for (port = ports_out; port->name; ++port) { outputs.push_back(*port); } } static Parameterized::param_registry_t *create_param_registry() { param_registry_t *registry = new param_registry_t(); class_params_t *param = class_params; while (param->key) { registry->insert( Definition(param->key, param->description, ¶m->val, param->lower, param->upper)); ++param; } return registry; } static std::auto_ptr param_registry = std::auto_ptr( ModFanIn::create_param_registry()); const Parameterized::param_registry_t& get_param_registry() const { return *param_registry; } void _set_param( const Definition& defn, const std::vector& values) throw (Error) { int rv; uint32_t u32; unsigned int i; switch (*(params_id_t *)(defn.get_context())) { case OPT_INPUT_COUNT: if (!conns_in.empty()) { std::ostringstream os; os << "Invalid " << defn.get_name() << " '" << values[0] << "': Cannot change value once connectors are created"; throw sk_error_module_bad_param_t(os); } rv = skStringParseUint32(&u32, values[0].c_str(), 1, 0); if (rv) { goto PARSE_ERROR; } for (i = 0; i < u32; i++) { Module::port_t port = {NULL, NULL}; char name[32]; snprintf(name, sizeof(name), "in.%u", i); port_names.push_back(name); port.name = port_names.back().c_str(); ports_in.push_back(port); conns_in.push_back(NULL); } break; } return; PARSE_ERROR: std::ostringstream os; os << "Invalid " << defn.get_name() << " '" << values[0] << "': " << skStringParseStrerror(rv); throw sk_error_conn_param_value_t(os.str()); } }; int mod_fan_in_so_loader( uint16_t major_version, uint16_t minor_version, void *so_loader_data) { ModuleSystem::loader_data_t *so_loader = static_cast(so_loader_data); ModuleSystem *module_system = so_loader->modsys; // ADD VERSION CHECK try { module_system->register_module( MOD_FAN_IN_CLASS_NAME, &ModFanIn::_create); } catch (Error &e) { skAppPrintErr("Failed to load " MOD_FAN_IN_CLASS_NAME ": %s", e.get_string().c_str()); return -1; } catch (...) { skAppPrintErr("Failed to load " MOD_FAN_IN_CLASS_NAME ": %s", "Unknown error"); return -1; } return 0; } } /* ** Local Variables: ** mode:c++ ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */