[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30185 - gnunet-mqtt/src/mqtt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30185 - gnunet-mqtt/src/mqtt |
Date: |
Mon, 14 Oct 2013 15:22:11 +0200 |
Author: grothoff
Date: 2013-10-14 15:22:11 +0200 (Mon, 14 Oct 2013)
New Revision: 30185
Modified:
gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c
gnunet-mqtt/src/mqtt/mqtt.conf.in
gnunet-mqtt/src/mqtt/mqtt.h
gnunet-mqtt/src/mqtt/mqtt_api.c
gnunet-mqtt/src/mqtt/template.conf
gnunet-mqtt/src/mqtt/template_single_peer.conf
gnunet-mqtt/src/mqtt/test_mqtt_multiple_peers.c
gnunet-mqtt/src/mqtt/test_mqtt_regex_hash.c
gnunet-mqtt/src/mqtt/test_mqtt_regex_plus.c
gnunet-mqtt/src/mqtt/test_mqtt_single_peer.c
gnunet-mqtt/src/mqtt/test_mqtt_unsubscribe.c
Log:
-fix testcase build, replace slists by DLLs, doxygen, indentation, minor
bugfixes
Modified: gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c
===================================================================
--- gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c 2013-10-14 13:21:42 UTC (rev
30184)
+++ gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c 2013-10-14 13:22:11 UTC (rev
30185)
@@ -35,14 +35,24 @@
#include "mqtt.h"
#include <regex.h>
-#define FIXME_PORT 424
/**
* Struct representing the context for the regex search
*/
struct RegexSearchContext
{
+
/**
+ * Pointer to next item in the list
+ */
+ struct RegexSearchContext *next;
+
+ /**
+ * Pointer to previous item in the list
+ */
+ struct RegexSearchContext *prev;
+
+ /**
* Pointer to the publish message
*/
struct GNUNET_MQTT_ClientPublishMessage *publish_msg;
@@ -76,6 +86,7 @@
struct GNUNET_REGEX_search_handle *regex_search_handle;
};
+
/**
* Struct representing a message that needs to be sent to a client.
*/
@@ -103,6 +114,7 @@
struct RegexSearchContext *context;
};
+
/**
* Struct containing information about a client,
* handle to connect to it, and any pending messages
@@ -194,6 +206,16 @@
struct Subscription
{
/**
+ * Element in a DLL.
+ */
+ struct Subscription *prev;
+
+ /**
+ * Element in a DLL.
+ */
+ struct Subscription *next;
+
+ /**
* Handle used to cancel the annnouncement
*/
struct GNUNET_REGEX_announce_handle *regex_announce_handle;
@@ -214,6 +236,7 @@
regex_t automaton;
};
+
/**
* Our configuration.
*/
@@ -240,11 +263,16 @@
static struct GNUNET_PeerIdentity my_id;
/**
- * Singly linked list storing active subscriptions.
+ * Head of active subscriptions.
*/
-static struct GNUNET_CONTAINER_SList *subscriptions;
+static struct Subscription *subscription_head;
/**
+ * Head of active subscriptions.
+ */
+static struct Subscription *subscription_tail;
+
+/**
* List of active clients.
*/
static struct ClientInfo *client_head;
@@ -267,14 +295,19 @@
/**
* Path to the current directory (configuration directory)
*/
-static char *current_dir_name;
+static char *folder_name;
/**
- * Singly linked list storing active regex search contexts.
+ * Tail of doubly-Linked list storing active regex search contexts.
*/
-static struct GNUNET_CONTAINER_SList *search_contexts;
+static struct RegexSearchContext *sc_head;
/**
+ * Tail of doubly-Linked list storing active regex search contexts.
+ */
+static struct RegexSearchContext *sc_tail;
+
+/**
* The time the peer that received a publish message waits before it deletes
it after it was sent to a subscrier
*/
static struct GNUNET_TIME_Relative message_delete_time;
@@ -294,33 +327,33 @@
*/
static const char *hash_regex =
"(/(a|b|c|d|e|f|g|h|i|j|k|l|m|n|o|p|q|r|s|t|u|v|w|x|y|z|A|B|C|D|E|F|G|H|I|J|K|L|M|N|O|P|Q|R|S|T|U|V|W|X|Y|Z|0|1|2|3|4|5|6|7|8|9)+)*";
+
/**
* Adds the prefix to the toopic (App ID + Version + Padding)
*
* @param topic topic of subscription as provided by the subscriber
* @param regex_topic client identification of the client
- *
*/
static void
-add_prefix (const char *topic, char **prefixed_topic)
+add_prefix (const char *topic,
+ char **prefixed_topic)
{
- int n, i;
- *prefixed_topic = GNUNET_malloc(strlen(prefix) + strlen(topic)+1);
- n = 0;
-
- for (i = 0; prefix[i] != '\0'; i++)
- {
+ int n;
+ int i;
+
+ *prefixed_topic = GNUNET_malloc(strlen(prefix) + strlen(topic)+1);
+ n = 0;
+ for (i = 0; prefix[i] != '\0'; i++)
(*prefixed_topic)[i] = prefix[i];
- }
- n = i;
-
- for (i = 0; topic[i] != '\0'; i++)
+ n = i;
+
+ for (i = 0; topic[i] != '\0'; i++)
{
(*prefixed_topic)[n] = topic[i];
- n++;
- }
-
- (*prefixed_topic)[n] = '\0';
+ n++;
+ }
+
+ (*prefixed_topic)[n] = '\0';
}
@@ -329,33 +362,35 @@
*
* @param topic topic of subscription as provided by the subscriber
* @param regex_topic client identification of the client
- *
*/
static void
-get_regex (char *topic, char **regex_topic)
+get_regex (char *topic,
+ char **regex_topic)
{
char *plus;
char *hash;
char *prefixed_topic;
- int i, j, k;
+ int i;
+ int j;
+ int k;
int plus_counter = 0;
int hash_exists = 0;
plus = strchr(topic,'+');
- while (plus != NULL)
+ while (plus != NULL)
{
- plus_counter +=1;
- plus=strchr(plus+1,'+');
- }
+ plus_counter +=1;
+ plus=strchr(plus+1,'+');
+ }
hash = strchr(topic,'#');
if (hash != NULL)
{
- hash_exists = 1;
+ hash_exists = 1;
}
add_prefix(topic, &prefixed_topic);
- *regex_topic = GNUNET_malloc(strlen(prefixed_topic) - plus_counter -
hash_exists + plus_counter*strlen(plus_regex) +
hash_exists*strlen(hash_regex)+1);
+ *regex_topic = GNUNET_malloc (strlen(prefixed_topic) - plus_counter -
hash_exists + plus_counter*strlen(plus_regex) +
hash_exists*strlen(hash_regex)+1);
j = 0;
for (i = 0; prefixed_topic[i] != '\0'; i++)
{
@@ -375,15 +410,15 @@
(*regex_topic)[j] = hash_regex[k];
j++;
}
- }else
- {
+ }
+ else
+ {
(*regex_topic)[j] = prefixed_topic[i];
j++;
- }
}
- (*regex_topic)[j] = '\0';
}
-
+ (*regex_topic)[j] = '\0';
+}
/**
@@ -400,11 +435,11 @@
{
struct PendingMessage *pm;
- if (NULL != client_info->transmit_handle) {
+ if (NULL != client_info->transmit_handle)
+ {
GNUNET_SERVER_notify_transmit_ready_cancel (client_info->transmit_handle);
client_info->transmit_handle = NULL;
}
-
while (NULL != (pm = client_info->pending_head))
{
GNUNET_CONTAINER_DLL_remove (client_info->pending_head,
@@ -412,7 +447,6 @@
GNUNET_free (pm->msg);
GNUNET_free (pm);
}
-
GNUNET_CONTAINER_DLL_remove (client_head, client_tail, client_info);
GNUNET_free (client_info);
}
@@ -440,12 +474,10 @@
return pos;
pos = pos->next;
}
-
ret = GNUNET_new (struct ClientInfo);
ret->client_handle = client;
ret->transmit_handle = NULL;
GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret);
-
return ret;
}
@@ -488,7 +520,6 @@
GNUNET_MESH_notify_transmit_ready_cancel (subscriber->transmit_handle);
subscriber->transmit_handle = NULL;
}
-
while (NULL != (pm = subscriber->pending_head))
{
GNUNET_CONTAINER_DLL_remove (subscriber->pending_head,
@@ -496,7 +527,6 @@
GNUNET_free (pm->msg);
GNUNET_free (pm);
}
-
GNUNET_MESH_tunnel_destroy (subscriber->tunnel);
GNUNET_free (subscriber);
}
@@ -522,18 +552,11 @@
if (GNUNET_SCHEDULER_NO_TASK != context->free_task)
GNUNET_SCHEDULER_cancel (context->free_task);
- if (remove (filepath) == 0)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "file `%s` deleted successfully.\n", filepath);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "unable to delete file `%s`\n", filepath);
- }
+ if (0 != UNLINK (filepath))
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
+ "unlink",
+ filepath);
}
-
GNUNET_CONTAINER_multipeermap_destroy (context->subscribers);
GNUNET_REGEX_search_cancel (context->regex_search_handle);
GNUNET_free (context->publish_msg);
@@ -542,8 +565,6 @@
}
-
-
/**
* Function called when the timer expires for a delivered message,
* triggering its deletion.
@@ -555,21 +576,10 @@
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct RegexSearchContext *context = cls;
- struct GNUNET_CONTAINER_SList_Iterator it;
- it = GNUNET_CONTAINER_slist_begin (search_contexts);
-
- while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES)
- {
- if (GNUNET_CONTAINER_slist_get (&it, NULL) == context)
- {
- GNUNET_CONTAINER_slist_erase (&it);
- break;
- }
- else
- GNUNET_CONTAINER_slist_next (&it);
- }
-
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ context);
context->free_task = GNUNET_SCHEDULER_NO_TASK;
regex_search_context_free (context);
}
@@ -609,11 +619,12 @@
* @param cls closure to this call
* @param size maximum number of bytes available to send
* @param buf where to copy the actual message to
- *
* @return the number of bytes actually copied, 0 indicates failure
*/
static size_t
-send_msg_to_subscriber (void *cls, size_t size, void *buf)
+send_msg_to_subscriber (void *cls,
+ size_t size,
+ void *buf)
{
struct RemoteSubscriberInfo *subscriber = cls;
char *cbuf = buf;
@@ -623,8 +634,7 @@
subscriber->transmit_handle = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Send message to subscriber.\n");
-
+ "Send message to subscriber.\n");
if (buf == NULL)
{
/* subscriber disconnected */
@@ -643,15 +653,13 @@
GNUNET_CONTAINER_DLL_remove (subscriber->pending_head,
subscriber->pending_tail, pm);
memcpy (&cbuf[off], pm->msg, msize);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting %u bytes to subscriber %s\n", msize,
GNUNET_i2s (&subscriber->id));
off += msize;
- set_timer_for_deleting_message(pm);
-
- GNUNET_free (pm->msg);
- GNUNET_free (pm);
+ set_timer_for_deleting_message(pm);
+ GNUNET_free (pm->msg);
+ GNUNET_free (pm);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -715,18 +723,19 @@
static void
-deliver_incoming_publish (struct GNUNET_MQTT_ClientPublishMessage *msg, struct
RegexSearchContext *context);
+deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
+ struct RegexSearchContext *context);
/**
* Search callback function called when a subscribed peer is found.
*
- * @param cls closure provided in GNUNET_REGEX_search
+ * @param cls closure provided in GNUNET_REGEX_search()
* @param id peer providing a regex that matches the string
* @param get_path path of the get request
- * @param get_path_length lenght of get_path
+ * @param get_path_length length of @a get_path
* @param put_path Path of the put request
- * @param put_path_length length of the put_path
+ * @param put_path_length length of the @a put_path
*/
static void
subscribed_peer_found (void *cls, const struct GNUNET_PeerIdentity *id,
@@ -742,7 +751,8 @@
size_t msg_len = ntohs (context->publish_msg->header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "an active subscription found from %s\n", GNUNET_i2s (id));
+ "an active subscription found from %s\n",
+ GNUNET_i2s (id));
/*
* We may have delivered the message to the peer already if it has
@@ -784,7 +794,7 @@
subscriber->tunnel = GNUNET_MESH_tunnel_create (mesh_handle,
NULL,
id,
- FIXME_PORT,
+
GNUNET_APPLICATION_TYPE_MQTT,
GNUNET_NO, GNUNET_YES);
subscriber->peer_added = GNUNET_NO;
subscriber->peer_connecting = GNUNET_NO;
@@ -807,7 +817,6 @@
*
* @param topic of the message identification of the client
* @param publish_msg the publish message
-
*/
static void
search_for_subscribers (const char *topic,
@@ -825,10 +834,9 @@
context->regex_search_handle = GNUNET_REGEX_search (dht_handle, topic,
subscribed_peer_found,
context, NULL);
-
- GNUNET_CONTAINER_slist_add (search_contexts,
- GNUNET_CONTAINER_SLIST_DISPOSITION_STATIC,
- context, sizeof (struct RegexSearchContext));
+ GNUNET_CONTAINER_DLL_insert (sc_head,
+ sc_tail,
+ context);
}
@@ -838,8 +846,6 @@
* @param cls closure
* @param client identification of the client
* @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
*/
static void
handle_mqtt_publish (void *cls, struct GNUNET_SERVER_Client *client,
@@ -854,7 +860,12 @@
size_t msg_len = ntohs (msg->size);
struct GNUNET_MQTT_ClientPublishMessage *publish_msg;
-
+
+ if (NULL == folder_name)
+ {
+ GNUNET_break (0);
+ return;
+ }
/* Extract topic */
publish_msg = GNUNET_malloc (msg_len);
memcpy (publish_msg, msg, msg_len);
@@ -869,41 +880,34 @@
strncpy(message, ((char *) (publish_msg + 1)) + publish_msg->topic_len,
message_len);
message[message_len - 1] = '\0';
-
add_prefix (topic, &prefixed_topic);
-
- if (NULL != current_dir_name)
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
+ &file_name_hash);
+ file_name = GNUNET_h2s_full(&file_name_hash);
+ GNUNET_asprintf (&file_path,
+ "%s%s%s",
+ folder_name,
+ DIR_SEPARATOR_STR,
+ file_name);
+
+ if (NULL != (persistence_file = fopen(file_path, "w+")))
{
- GNUNET_CRYPTO_hash_create_random(GNUNET_CRYPTO_QUALITY_WEAK,
&file_name_hash);
- file_name = GNUNET_h2s_full(&file_name_hash);
-
- GNUNET_asprintf (&file_path, "%s%s%s", current_dir_name,
DIR_SEPARATOR_STR, file_name );
-
- if (NULL != (persistence_file = fopen(file_path, "w+")))
- {
- fwrite(topic, 1, strlen(topic)+1, persistence_file);
- fwrite(message, 1, strlen(message), persistence_file);
- fclose(persistence_file);
-
- search_for_subscribers (prefixed_topic, publish_msg, file_path);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Not able to open file!");
- }
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Not able to get current directory!");
- }
-
+ fwrite(topic, 1, strlen(topic)+1, persistence_file);
+ fwrite(message, 1, strlen(message), persistence_file);
+ fclose(persistence_file);
+ search_for_subscribers (prefixed_topic, publish_msg, file_path);
+ }
+ else
+ {
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
+ "open",
+ file_path);
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"outgoing PUBLISH message received: %s [%d bytes] (%d
overall)\n",
- topic, publish_msg->topic_len, ntohs(publish_msg->header.size));
-
-
+ topic,
+ publish_msg->topic_len,
+ ntohs (publish_msg->header.size));
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -914,8 +918,6 @@
* @param cls closure
* @param client identification of the client
* @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
*/
static void
handle_mqtt_subscribe (void *cls, struct GNUNET_SERVER_Client *client,
@@ -932,11 +934,8 @@
strncpy(topic, (char *) (subscribe_msg + 1),
subscribe_msg->topic_len);
topic[subscribe_msg->topic_len - 1] = '\0';
-
subscription = GNUNET_new (struct Subscription);
-
get_regex (topic, ®ex_topic);
-
if (0 != regcomp (&subscription->automaton,
regex_topic,
REG_NOSUB))
@@ -948,17 +947,17 @@
}
subscription->request_id = subscribe_msg->request_id;
subscription->client = find_active_client (client);
-
- GNUNET_CONTAINER_slist_add (subscriptions,
- GNUNET_CONTAINER_SLIST_DISPOSITION_STATIC,
- subscription, sizeof (struct Subscription));
+ GNUNET_CONTAINER_DLL_insert (subscription_head,
+ subscription_tail,
+ subscription);
subscription->regex_announce_handle =
GNUNET_REGEX_announce (dht_handle, &my_id,
regex_topic,1 , NULL);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "MQTT SUBSCRIBE message received: %s->%s\n", topic, regex_topic);
-
+ "MQTT SUBSCRIBE message received: %s->%s\n",
+ topic,
+ regex_topic);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -989,7 +988,6 @@
size_t msize;
client->transmit_handle = NULL;
-
if (buf == NULL)
{
/* client disconnected */
@@ -998,31 +996,31 @@
client->client_handle);
return 0;
}
-
off = 0;
-
while ((NULL != (reply = client->pending_head)) &&
(size >= off + (msize = ntohs (reply->msg->size))))
{
GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
reply);
memcpy (&cbuf[off], reply->msg, msize);
- if (NULL != reply->context)
- {
- set_timer_for_deleting_message(reply);
- }
+ if (NULL != reply->context)
+ {
+ set_timer_for_deleting_message(reply);
+ }
GNUNET_free (reply->msg);
GNUNET_free (reply);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to client %p\n", msize,
+ "Transmitting %u bytes to client %p\n",
+ msize,
client->client_handle);
off += msize;
}
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client
%p\n",
- (unsigned int) off, (unsigned int) size, client->client_handle);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted %u/%u bytes to client %p\n",
+ (unsigned int) off,
+ (unsigned int) size,
+ client->client_handle);
process_pending_client_messages (client);
-
return off;
}
@@ -1045,11 +1043,10 @@
NULL ? "no more messages" : "request already pending");
return;
}
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking for transmission of %u bytes to client %p\n",
- ntohs (client->pending_head->msg->size), client->client_handle);
-
+ ntohs (client->pending_head->msg->size),
+ client->client_handle);
client->transmit_handle =
GNUNET_SERVER_notify_transmit_ready (client->client_handle,
ntohs (client->pending_head->
@@ -1081,34 +1078,27 @@
* @param cls closure
* @param client identification of the client
* @param message the actual message
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
*/
static void
handle_mqtt_unsubscribe (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
struct PendingMessage *pm;
- struct GNUNET_CONTAINER_SList_Iterator it;
const struct GNUNET_MQTT_ClientUnsubscribeMessage *unsubscribe_msg;
struct GNUNET_MQTT_ClientUnsubscribeAckMessage *unsub_ack_msg;
struct ClientInfo *client_info;
- struct Subscription *subscription = NULL;
+ struct Subscription *subscription;
unsubscribe_msg = (const struct GNUNET_MQTT_ClientUnsubscribeMessage *) msg;
-
- for (it = GNUNET_CONTAINER_slist_begin (subscriptions);
- GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES;)
+ for (subscription = subscription_head; NULL != subscription; subscription =
subscription->next)
{
- subscription = GNUNET_CONTAINER_slist_get (&it, NULL);
-
if (subscription->request_id == unsubscribe_msg->request_id)
{
- GNUNET_CONTAINER_slist_erase (&it);
+ GNUNET_CONTAINER_DLL_remove (subscription_head,
+ subscription_tail,
+ subscription);;
break;
}
-
- GNUNET_CONTAINER_slist_next (&it);
}
if (NULL == subscription)
@@ -1116,10 +1106,10 @@
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Active subscription with ID %lu does not exist\n",
subscription->request_id);
- return;
+ return;
}
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Cancelling subscription with ID %lu\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling subscription with ID %lu\n",
subscription->request_id);
client_info = find_active_client (client);
@@ -1134,10 +1124,8 @@
pm = GNUNET_new (struct PendingMessage);
pm->msg = (struct GNUNET_MessageHeader*) unsub_ack_msg;
-
add_pending_client_message (client_info, pm);
subscription_free (subscription);
-
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1149,12 +1137,13 @@
* appropriate subscribing application.
*/
static void
-deliver_incoming_publish (struct GNUNET_MQTT_ClientPublishMessage *msg,
+deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
struct RegexSearchContext *context)
{
- char *topic, *prefixed_topic;
+ char *topic;
+ char *prefixed_topic;
struct GNUNET_MQTT_ClientPublishMessage *publish_msg;
- struct GNUNET_CONTAINER_SList_Iterator it;
+ struct Subscription *subscription;
size_t msg_len = ntohs (msg->header.size);
int free_publish_msg = GNUNET_YES;
@@ -1162,24 +1151,23 @@
publish_msg = GNUNET_malloc (msg_len);
memcpy (publish_msg, msg, msg_len);
topic = GNUNET_malloc (publish_msg->topic_len);
- strncpy(topic, (char *) (publish_msg + 1), publish_msg->topic_len);
+ strncpy (topic,
+ (const char *) &publish_msg[1],
+ publish_msg->topic_len);
topic[publish_msg->topic_len - 1] = '\0';
add_prefix(topic, &prefixed_topic);
- it = GNUNET_CONTAINER_slist_begin (subscriptions);
-
- while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES)
+ for (subscription = subscription_head; NULL != subscription; subscription =
subscription->next)
{
- struct Subscription *subscription = GNUNET_CONTAINER_slist_get (&it, NULL);
-
if (0 == regexec (&subscription->automaton, prefixed_topic, 0, NULL, 0))
{
struct PendingMessage *pm;
struct GNUNET_MQTT_ClientPublishMessage *return_msg;
struct ClientInfo *client_info = subscription->client;
- if (GNUNET_YES == free_publish_msg) {
+ if (GNUNET_YES == free_publish_msg)
+ {
return_msg = publish_msg;
free_publish_msg = GNUNET_NO;
}
@@ -1188,25 +1176,17 @@
return_msg = GNUNET_malloc (msg_len);
memcpy (return_msg, msg, msg_len);
}
-
return_msg->request_id = subscription->request_id;
-
pm = GNUNET_new (struct PendingMessage);
pm->msg = (struct GNUNET_MessageHeader*) return_msg;
- pm->context = context;
-
+ pm->context = context;
add_pending_client_message (client_info, pm);
}
-
- GNUNET_CONTAINER_slist_next (&it);
}
-
GNUNET_free (topic);
GNUNET_free (prefixed_topic);
-
- if (GNUNET_YES == free_publish_msg) {
+ if (GNUNET_YES == free_publish_msg)
GNUNET_free (publish_msg);
- }
}
@@ -1221,7 +1201,7 @@
void **tunnel_ctx,
const struct GNUNET_MessageHeader *msg)
{
- deliver_incoming_publish ((struct GNUNET_MQTT_ClientPublishMessage*) msg,
+ deliver_incoming_publish ((const struct GNUNET_MQTT_ClientPublishMessage*)
msg,
NULL);
return GNUNET_OK;
@@ -1254,43 +1234,31 @@
* @param client identification of the client
*/
static void
-handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+handle_client_disconnect (void *cls,
+ struct GNUNET_SERVER_Client *client)
{
- struct GNUNET_CONTAINER_SList_Iterator it;
+ struct Subscription *subscription;
+ struct Subscription *nxt;
struct ClientInfo *client_info = NULL;
if (NULL == client)
return;
-
- it = GNUNET_CONTAINER_slist_begin (subscriptions);
-
- while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES)
+ for (subscription=subscription_head; NULL != subscription; subscription =
nxt)
{
- struct Subscription *subscription = GNUNET_CONTAINER_slist_get (&it, NULL);
-
+ nxt = subscription->next;
if (subscription->client->client_handle == client)
{
client_info = subscription->client;
- GNUNET_CONTAINER_slist_erase (&it);
+ GNUNET_CONTAINER_DLL_remove (subscription_head,
+ subscription_tail,
+ subscription);
subscription_free (subscription);
}
- else
- GNUNET_CONTAINER_slist_next (&it);
}
-
if (NULL == client_info)
- {
- client_info = client_head;
-
- while (client_info != NULL)
- {
+ for (client_info = client_head; NULL != client_info; client_info =
client_info->next)
if (client_info->client_handle == client)
break;
-
- client_info = client_info->next;
- }
- }
-
if (NULL != client_info)
client_info_free (client_info);
}
@@ -1303,43 +1271,32 @@
* @param tc unused
*/
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_CONTAINER_SList_Iterator it;
+ struct Subscription *subscription;
+ struct RegexSearchContext *context;
- it = GNUNET_CONTAINER_slist_begin (subscriptions);
-
- while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES)
+ while (NULL != (subscription = subscription_head))
{
- struct Subscription *subscription = GNUNET_CONTAINER_slist_get (&it, NULL);
-
- GNUNET_CONTAINER_slist_erase (&it);
+ GNUNET_CONTAINER_DLL_remove (subscription_head,
+ subscription_tail,
+ subscription);
subscription_free (subscription);
}
-
- it = GNUNET_CONTAINER_slist_begin (search_contexts);
-
- while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES)
+ while (NULL != (context = sc_head))
{
- struct RegexSearchContext *context = GNUNET_CONTAINER_slist_get (&it,
- NULL);
-
- GNUNET_CONTAINER_slist_erase (&it);
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ context);
regex_search_context_free (context);
}
-
GNUNET_CONTAINER_multipeermap_iterate (remote_subscribers,
free_remote_subscriber_iterator,
NULL);
-
GNUNET_DHT_disconnect (dht_handle);
GNUNET_MESH_disconnect (mesh_handle);
- GNUNET_CONTAINER_slist_destroy (subscriptions);
- GNUNET_CONTAINER_slist_destroy (search_contexts);
GNUNET_CONTAINER_multipeermap_destroy (remote_subscribers);
-
- GNUNET_SERVER_disconnect_notify_cancel (server_handle,
- handle_client_disconnect, NULL);
}
@@ -1361,95 +1318,104 @@
/**
- * Look for old messages and call try to deliver them again by calling regex
search
- *
+ * Look for old messages and call try to deliver them again by calling
+ * regex search
*/
static void
look_for_old_messages ()
{
- DIR *dir;
- FILE *file;
- char *current_dir;
- char *folder_name = "mqtt";
- struct dirent *ent;
- char *file_path;
- char *topic, *aux, *prefixed_topic;
- size_t struct_size;
- int ch;
- long long length;
- struct GNUNET_MQTT_ClientPublishMessage *old_publish_msg;
-
- uid_gen = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT64_MAX);
- current_dir_name = NULL;
- if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_filename(cfg, "PATHS",
"SERVICEHOME", ¤t_dir))
+ DIR *dir;
+ FILE *file;
+ struct dirent *ent;
+ char *file_path;
+ char *topic;
+ char *aux;
+ char *prefixed_topic;
+ size_t struct_size;
+ size_t n;
+ int ch;
+ long long length;
+ struct GNUNET_MQTT_ClientPublishMessage *old_publish_msg;
+
+ uid_gen = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
+ folder_name = NULL;
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_filename (cfg,
+ "MQTT",
+ "MESSAGE_FOLDER",
+ &folder_name))
{
- GNUNET_asprintf (¤t_dir_name, "%s%s", current_dir, folder_name);
-
- if ((dir = opendir (current_dir_name)) != NULL)
- {
- while ((ent = readdir (dir)) != NULL) {
-
- if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name,
".."))
- continue;
-
- GNUNET_asprintf (&file_path, "%s%s%s",
current_dir_name, DIR_SEPARATOR_STR, ent->d_name);
- file = fopen(file_path, "r");
- if (file != NULL)
- {
- size_t n = 0;
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ "MQTT", "MESSAGE_FOLDER");
+ return;
+ }
+ if (NULL == (dir = opendir (folder_name)))
+ {
+ GNUNET_DISK_directory_create (folder_name);
+ return;
+ }
- fseeko( file, 0, SEEK_END ); // seek to end
- length = ftello( file ); // determine offset of
end
- rewind(file); // restore position
-
- struct_size = sizeof(struct
GNUNET_MQTT_ClientPublishMessage) + length + 1;
-
- old_publish_msg = GNUNET_malloc(struct_size);
- old_publish_msg->header.size = htons
(struct_size);
- old_publish_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH);
- old_publish_msg->request_id = ++uid_gen;
-
- aux = (char*)&old_publish_msg[1];
- while ((ch = fgetc(file)) != EOF && (ch !=
'\0') )
- {
- aux[n] = (char) ch;
- n++;
- }
-
- old_publish_msg->topic_len = n + 1;
- aux[n] = '\0';
- n++;
- while ((ch = fgetc(file)) != EOF )
- {
- aux[n] = (char) ch;
- n++;
- }
-
- aux[n] = '\0';
-
- topic = GNUNET_malloc
(old_publish_msg->topic_len);
- strncpy(topic, (char *) (old_publish_msg + 1),
old_publish_msg->topic_len);
- topic[old_publish_msg->topic_len - 1] = '\0';
-
- add_prefix (topic, &prefixed_topic);
-
- search_for_subscribers(prefixed_topic,
old_publish_msg, file_path);
-
- GNUNET_free(topic);
- }
- }
- closedir (dir);
- } else {
- mkdir(current_dir_name, S_IRWXU);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A new folder was created for persisting messages: %s\n",
current_dir_name);
- }
- }else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Not able to get current directory!");
- }
+ while (NULL != (ent = readdir (dir)))
+ {
+ if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, ".."))
+ continue;
+
+ GNUNET_asprintf (&file_path,
+ "%s%s%s",
+ folder_name,
+ DIR_SEPARATOR_STR,
+ ent->d_name);
+ file = fopen(file_path, "r");
+ if (NULL == file)
+ {
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
+ "open",
+ file_path);
+ GNUNET_free (file_path);
+ continue;
+ }
+ n = 0;
+ fseeko (file, 0, SEEK_END); // seek to end
+ length = ftello (file); // determine offset of end
+ rewind (file); // restore position
+
+ struct_size = sizeof(struct GNUNET_MQTT_ClientPublishMessage) + length + 1;
+
+ old_publish_msg = GNUNET_malloc(struct_size);
+ old_publish_msg->header.size = htons (struct_size);
+ old_publish_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH);
+ old_publish_msg->request_id = ++uid_gen;
+
+ aux = (char*)&old_publish_msg[1];
+ while ((ch = fgetc(file)) != EOF && (ch != '\0') )
+ {
+ aux[n] = (char) ch;
+ n++;
+ }
+
+ old_publish_msg->topic_len = n + 1;
+ aux[n] = '\0';
+ n++;
+ while ((ch = fgetc(file)) != EOF )
+ {
+ aux[n] = (char) ch;
+ n++;
+ }
+
+ aux[n] = '\0';
+
+ topic = GNUNET_malloc (old_publish_msg->topic_len);
+ strncpy(topic, (char *) (old_publish_msg + 1), old_publish_msg->topic_len);
+ topic[old_publish_msg->topic_len - 1] = '\0';
+
+ add_prefix (topic, &prefixed_topic);
+
+ search_for_subscribers(prefixed_topic, old_publish_msg, file_path);
+ GNUNET_free (file_path);
+ GNUNET_free (topic);
+ }
+ closedir (dir);
}
@@ -1478,7 +1444,7 @@
{NULL, 0, 0}
};
static const uint32_t ports[] = {
- FIXME_PORT,
+ GNUNET_APPLICATION_TYPE_MQTT,
GNUNET_APPLICATION_TYPE_END
};
@@ -1495,11 +1461,7 @@
GNUNET_SCHEDULER_shutdown ();
return;
}
-
- subscriptions = GNUNET_CONTAINER_slist_create ();
- search_contexts = GNUNET_CONTAINER_slist_create ();
remote_subscribers = GNUNET_CONTAINER_multipeermap_create (8, GNUNET_NO);
-
server_handle = server;
GNUNET_assert (GNUNET_OK ==
GNUNET_CRYPTO_get_peer_identity (c,
@@ -1514,8 +1476,8 @@
cfg = c;
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
-
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task,
NULL);
look_for_old_messages ();
}
Modified: gnunet-mqtt/src/mqtt/mqtt.conf.in
===================================================================
--- gnunet-mqtt/src/mqtt/mqtt.conf.in 2013-10-14 13:21:42 UTC (rev 30184)
+++ gnunet-mqtt/src/mqtt/mqtt.conf.in 2013-10-14 13:22:11 UTC (rev 30185)
@@ -1,8 +1,16 @@
[mqtt]
+AUTOSTART = YES
BINARY = gnunet-service-mqtt
-UNIXPATH = /tmp/gnunet-service-mqtt.sock
-HOME = $SERVICEHOME
+UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-mqtt.sock
# PORT = 2106
@UNIXONLY@ PORT = 35345
SUBSCRIPTION_REANNOUNCE_TIME = 5 m
MESSAGE_DELETE_TIME = 5 s
+
+UNIX_MATCH_UID = NO
+UNIX_MATCH_GID = YES
+
+
+
+# Where should MQTT cache its messages?
+MESSAGE_FOLDER = $GNUNET_CACHE_HOME/mqtt/
\ No newline at end of file
Modified: gnunet-mqtt/src/mqtt/mqtt.h
===================================================================
--- gnunet-mqtt/src/mqtt/mqtt.h 2013-10-14 13:21:42 UTC (rev 30184)
+++ gnunet-mqtt/src/mqtt/mqtt.h 2013-10-14 13:22:11 UTC (rev 30185)
@@ -38,7 +38,7 @@
struct GNUNET_MQTT_ClientPublishMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH
+ * Type: #GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH
*/
struct GNUNET_MessageHeader header;
@@ -64,7 +64,7 @@
struct GNUNET_MQTT_ClientSubscribeMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE
+ * Type: #GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE
*/
struct GNUNET_MessageHeader header;
@@ -90,7 +90,7 @@
struct GNUNET_MQTT_ClientUnsubscribeMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE
+ * Type: #GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE
*/
struct GNUNET_MessageHeader header;
@@ -107,7 +107,7 @@
struct GNUNET_MQTT_ClientUnsubscribeAckMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE_ACK
+ * Type: #GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE_ACK
*/
struct GNUNET_MessageHeader header;
Modified: gnunet-mqtt/src/mqtt/mqtt_api.c
===================================================================
--- gnunet-mqtt/src/mqtt/mqtt_api.c 2013-10-14 13:21:42 UTC (rev 30184)
+++ gnunet-mqtt/src/mqtt/mqtt_api.c 2013-10-14 13:22:11 UTC (rev 30185)
@@ -284,20 +284,23 @@
* @param tc scheduler context
*/
static void
-try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+try_reconnect (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_MQTT_Handle *handle = cls;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with MQTT %p\n", handle);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Reconnecting with MQTT %p\n",
+ handle);
handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
if (GNUNET_YES != try_connect (handle))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "MQTT reconnect failed\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "MQTT reconnect failed\n");
return;
}
-
process_pending_messages (handle);
}
@@ -314,15 +317,16 @@
return;
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
if (NULL != handle->th)
+ {
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
- handle->th = NULL;
+ handle->th = NULL;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting from MQTT service, will try to reconnect in %s\n",
GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
GNUNET_YES));
GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
-
handle->reconnect_task =
GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
}
@@ -343,7 +347,6 @@
handle->cfg = cfg;
handle->uid_gen = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT64_MAX);
-
if (GNUNET_NO == try_connect (handle))
{
GNUNET_MQTT_disconnect (handle);
@@ -370,7 +373,6 @@
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
}
-
while (NULL != (pm = handle->pending_head))
{
GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
@@ -383,13 +385,11 @@
GNUNET_free (pm);
}
-
if (NULL != handle->client)
{
GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
}
-
GNUNET_free (handle);
}
@@ -401,7 +401,8 @@
* @param tc scheduler context
*/
static void
-publish_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+publish_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_MQTT_PublishHandle *ph = cls;
struct GNUNET_MQTT_Handle *handle = ph->mqtt_handle;
@@ -442,10 +443,8 @@
sh->pending->timeout_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_free (sh->pending);
}
-
if (NULL != sh->cont)
sh->cont (sh->cont_cls, GNUNET_NO);
-
GNUNET_free (sh);
}
@@ -461,18 +460,19 @@
*/
static int
process_incoming_publish_message (struct GNUNET_MQTT_Handle *handle,
- const struct GNUNET_MQTT_ClientPublishMessage
- *msg)
+ const struct
GNUNET_MQTT_ClientPublishMessage *msg)
{
struct GNUNET_MQTT_SubscribeHandle *sh;
- char *topic, *message;
+ char *topic;
+ char *message;
size_t message_len;
for (sh = handle->subscribe_head; NULL != sh; sh = sh->next)
if (sh->request_id == msg->request_id)
break;
- if (NULL == sh) {
+ if (NULL == sh)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received PUBLISH message matching no subscriptions\n");
return GNUNET_SYSERR;
@@ -480,14 +480,14 @@
/* Extract topic */
topic = GNUNET_malloc (msg->topic_len);
- strncpy(topic, (char *) (msg + 1), msg->topic_len);
+ strncpy (topic, (const char *) (msg + 1), msg->topic_len);
topic[msg->topic_len - 1] = '\0';
/* Extract message */
message_len = ntohs (msg->header.size) -
sizeof (struct GNUNET_MQTT_ClientPublishMessage) - msg->topic_len;
message = GNUNET_malloc (message_len);
- strncpy(message, ((char *) (msg + 1)) + msg->topic_len,
+ strncpy(message, ((const char *) (msg + 1)) + msg->topic_len,
message_len);
message[message_len - 1] = '\0';
@@ -914,10 +914,9 @@
subscribe_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE);
subscribe_msg->request_id = sh->request_id;
subscribe_msg->topic_len = topic_len;
-
memcpy (&subscribe_msg[1], topic, topic_len);
-
- GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
+ GNUNET_CONTAINER_DLL_insert (handle->pending_head,
+ handle->pending_tail,
pending);
pending->in_pending_queue = GNUNET_YES;
pending->wait_for_response = GNUNET_YES;
Modified: gnunet-mqtt/src/mqtt/template.conf
===================================================================
--- gnunet-mqtt/src/mqtt/template.conf 2013-10-14 13:21:42 UTC (rev 30184)
+++ gnunet-mqtt/src/mqtt/template.conf 2013-10-14 13:22:11 UTC (rev 30185)
@@ -21,19 +21,10 @@
[arm]
DEFAULTSERVICES = dht core mqtt
-[TESTING]
-WEAKRANDOM = YES
-
[testbed]
OVERLAY_TOPOLOGY = FROM_FILE
OVERLAY_TOPOLOGY_FILE = test_mqtt_multipeer_topology.dat
-[gnunetd]
-HOSTKEY = $SERVICEHOME/.hostkey
-
-[PATHS]
-SERVICEHOME = /tmp/test-dht-multipeer/
-
[nat]
DISABLEV6 = YES
RETURN_LOCAL_ADDRESSES = YES
@@ -59,3 +50,6 @@
[mqtt]
SUBSCRIPTION_REANNOUNCE_TIME = 5 m
MESSAGE_DELETE_TIME = 5 s
+
+[PATHS]
+GNUNET_TEST_HOME = /tmp/gnunet-test-mqtt/
\ No newline at end of file
Modified: gnunet-mqtt/src/mqtt/template_single_peer.conf
===================================================================
--- gnunet-mqtt/src/mqtt/template_single_peer.conf 2013-10-14 13:21:42 UTC
(rev 30184)
+++ gnunet-mqtt/src/mqtt/template_single_peer.conf 2013-10-14 13:22:11 UTC
(rev 30185)
@@ -21,15 +21,6 @@
[arm]
DEFAULTSERVICES = dht core mqtt
-[TESTING]
-WEAKRANDOM = YES
-
-[gnunetd]
-HOSTKEY = $SERVICEHOME/.hostkey
-
-[PATHS]
-SERVICEHOME = /tmp/test-dht-multipeer/
-
[nat]
DISABLEV6 = YES
RETURN_LOCAL_ADDRESSES = YES
@@ -55,3 +46,6 @@
[mqtt]
SUBSCRIPTION_REANNOUNCE_TIME = 5 m
MESSAGE_DELETE_TIME = 5 s
+
+[PATHS]
+GNUNET_TEST_HOME = /tmp/gnunet-test-mqtt/
Modified: gnunet-mqtt/src/mqtt/test_mqtt_multiple_peers.c
===================================================================
--- gnunet-mqtt/src/mqtt/test_mqtt_multiple_peers.c 2013-10-14 13:21:42 UTC
(rev 30184)
+++ gnunet-mqtt/src/mqtt/test_mqtt_multiple_peers.c 2013-10-14 13:22:11 UTC
(rev 30185)
@@ -35,10 +35,12 @@
*/
static struct GNUNET_MQTT_Handle *mqtt_handle_publish, *mqtt_handle_subscribe;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
-GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
+
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+
static int result;
/**
@@ -46,47 +48,48 @@
*/
static unsigned long long request_timeout = 5;
+
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
if (NULL != basic_mqtt_op_publish)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da() for closing
- down the connection */
- basic_mqtt_op_publish = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da() for closing
+ down the
connection */
+ basic_mqtt_op_publish = NULL;
}
if (NULL != basic_mqtt_op_subscribe)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da() for closing
- down the connection */
- basic_mqtt_op_subscribe = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da() for closing
+ down the
connection */
+ basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
in_time_shutdown_task ()
{
if (NULL != basic_mqtt_op_publish)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
- down the connection */
- basic_mqtt_op_publish = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
+ down the
connection */
+ basic_mqtt_op_publish = NULL;
}
if (NULL != basic_mqtt_op_subscribe)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
- down the connection */
- basic_mqtt_op_subscribe = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
+ down the
connection */
+ basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_cancel(shutdown_tid);
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
subscribe_result_callback (void *cls, uint8_t topic_len, char *topic,
size_t size, void *data)
@@ -94,36 +97,38 @@
result = GNUNET_OK;
FPRINTF (stdout, "%s: %s -> %s\n", _("Message received"), topic,
(char*) data);
-
GNUNET_free (topic);
- GNUNET_free (data);
-
+ GNUNET_free (data);
in_time_shutdown_task();
}
+
static void
-service_connect_comp_publish (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+service_connect_comp_publish (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
- struct GNUNET_TIME_Relative timeout;
- char *topic = "some/topic";
- char *message = "test message";
+ struct GNUNET_TIME_Relative timeout;
+ const char *topic = "some/topic";
+ const char *message = "test message";
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
- GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
+ GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
strlen(message) + 1, message, timeout, NULL,
NULL);
- }
+}
+
static void
-service_connect_comp_subscribe (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+service_connect_comp_subscribe (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
struct GNUNET_TIME_Relative timeout;
- char *topic = "some/topic";
+ const char *topic = "some/topic";
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
@@ -131,11 +136,12 @@
GNUNET_MQTT_subscribe (mqtt_handle_subscribe, strlen(topic) + 1, topic,
timeout,
NULL, NULL,
subscribe_result_callback, NULL);
- }
+}
static void *
-gmqtt_ca_publish (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
+gmqtt_ca_publish (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
/* Use the provided configuration to connect to service */
mqtt_handle_publish = GNUNET_MQTT_connect(cfg);
@@ -145,34 +151,41 @@
static void *
-gmqtt_ca_subscribe (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
+gmqtt_ca_subscribe (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
-
/* Use the provided configuration to connect to service */
mqtt_handle_subscribe = GNUNET_MQTT_connect(cfg);
return mqtt_handle_subscribe;
}
+
static void
gmqtt_da_publish (void *cls, void *op_result)
{
/* Disconnect from gnunet-service-mqtt service */
- GNUNET_MQTT_disconnect (mqtt_handle_publish);
- mqtt_handle_publish = NULL;
+ GNUNET_MQTT_disconnect (mqtt_handle_publish);
+ mqtt_handle_publish = NULL;
}
+
static void
gmqtt_da_subscribe (void *cls, void *op_result)
{
- /* Disconnect from gnunet-service-mqtt service */
- GNUNET_MQTT_disconnect (mqtt_handle_subscribe);
- mqtt_handle_subscribe = NULL;
+ /* Disconnect from gnunet-service-mqtt service */
+ GNUNET_MQTT_disconnect (mqtt_handle_subscribe);
+ mqtt_handle_subscribe = NULL;
}
+
static void
-test_master (void *cls, unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **peers)
+test_master (void *cls,
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
basic_mqtt_op_publish = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
peers[0], /* The peer whose service
to connect to */
@@ -195,13 +208,13 @@
}
-int main (int argc, char **argv)
+
+int
+main (int argc, char **argv)
{
int ret;
result = GNUNET_SYSERR;
- FPRINTF (stdout, "\n Starting test for mqtt multiple peers
communication.\n\n");
-
ret = GNUNET_TESTBED_test_run("test mqtt multiple peer comumunication", /*
test case name */
"template.conf", /* template configuration */
2, /* number of peers to start */
Modified: gnunet-mqtt/src/mqtt/test_mqtt_regex_hash.c
===================================================================
--- gnunet-mqtt/src/mqtt/test_mqtt_regex_hash.c 2013-10-14 13:21:42 UTC (rev
30184)
+++ gnunet-mqtt/src/mqtt/test_mqtt_regex_hash.c 2013-10-14 13:22:11 UTC (rev
30185)
@@ -35,10 +35,12 @@
*/
static struct GNUNET_MQTT_Handle *mqtt_handle_publish, *mqtt_handle_subscribe;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
-GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
+
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+
static int result;
/**
@@ -46,8 +48,10 @@
*/
static unsigned long long request_timeout = 5;
+
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
if (NULL != basic_mqtt_op_publish)
{
@@ -55,75 +59,78 @@
down the connection */
basic_mqtt_op_publish = NULL;
}
-
if (NULL != basic_mqtt_op_subscribe)
{
GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da() for closing
down the connection */
basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
in_time_shutdown_task ()
{
if (NULL != basic_mqtt_op_publish)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
- down the connection */
- basic_mqtt_op_publish = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
+ down the
connection */
+ basic_mqtt_op_publish = NULL;
}
-
if (NULL != basic_mqtt_op_subscribe)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
- down the connection */
- basic_mqtt_op_subscribe = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
+ down the
connection */
+ basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_cancel(shutdown_tid);
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
subscribe_result_callback (void *cls, uint8_t topic_len, char *topic,
size_t size, void *data)
{
result = GNUNET_OK;
- FPRINTF (stdout, "%s: %s -> %s\n", _("Message received"), topic,
+ FPRINTF (stdout,
+ "%s: %s -> %s\n",
+ _("Message received"),
+ topic,
(char*) data);
-
GNUNET_free (topic);
GNUNET_free (data);
-
in_time_shutdown_task();
}
+
static void
-service_connect_comp_publish (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+service_connect_comp_publish (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
- struct GNUNET_TIME_Relative timeout;
- char *topic = "some/topic/level1/Level2";
- char *message = "test message";
-
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ struct GNUNET_TIME_Relative timeout;
+ const char *topic = "some/topic/level1/Level2";
+ const char *message = "test message";
+
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
- GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
+ GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
strlen(message) + 1, message, timeout, NULL,
NULL);
- }
+}
+
static void
-service_connect_comp_subscribe (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+service_connect_comp_subscribe (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
struct GNUNET_TIME_Relative timeout;
- char *topic = "some/topic/#";
+ const char *topic = "some/topic/#";
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
@@ -131,7 +138,7 @@
GNUNET_MQTT_subscribe (mqtt_handle_subscribe, strlen(topic) + 1, topic,
timeout,
NULL, NULL,
subscribe_result_callback, NULL);
- }
+}
static void *
@@ -146,70 +153,76 @@
static void *
gmqtt_ca_subscribe (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-
+{
/* Use the provided configuration to connect to service */
mqtt_handle_subscribe = GNUNET_MQTT_connect(cfg);
return mqtt_handle_subscribe;
}
+
static void
gmqtt_da_publish (void *cls, void *op_result)
{
/* Disconnect from gnunet-service-mqtt service */
- GNUNET_MQTT_disconnect (mqtt_handle_publish);
- mqtt_handle_publish = NULL;
+ GNUNET_MQTT_disconnect (mqtt_handle_publish);
+ mqtt_handle_publish = NULL;
}
+
static void
gmqtt_da_subscribe (void *cls, void *op_result)
{
- /* Disconnect from gnunet-service-mqtt service */
- GNUNET_MQTT_disconnect (mqtt_handle_subscribe);
- mqtt_handle_subscribe = NULL;
+ /* Disconnect from gnunet-service-mqtt service */
+ GNUNET_MQTT_disconnect (mqtt_handle_subscribe);
+ mqtt_handle_subscribe = NULL;
}
+
static void
-test_master (void *cls, unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **peers)
+test_master (void *cls,
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
- basic_mqtt_op_publish = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
- peers[0], /* The peer whose service
to connect to */
- "gnunet-service-mqtt", /* The name of
the service */
- service_connect_comp_publish, /*
callback to call after a handle to service is opened */
- NULL, /* closure for the above
callback */
- gmqtt_ca_publish, /* callback to
call with peer's configuration; this should open the needed service connection
*/
- gmqtt_da_publish, /* callback to be
called when closing the opened service connection */
- NULL); /* closure for the above two
callbacks */
- basic_mqtt_op_subscribe = GNUNET_TESTBED_service_connect(NULL, /* Closure
for operation */
- peers[1], /* The peer whose service
to connect to */
- "gnunet-service-mqtt", /* The name
of the service */
- service_connect_comp_subscribe, /*
callback to call after a handle to service is opened */
- NULL, /* closure for the above
callback */
- gmqtt_ca_subscribe, /* callback to
call with peer's configuration; this should open the needed service connection
*/
- gmqtt_da_subscribe, /* callback to
be called when closing the opened service connection */
- NULL); /* closure for the above two
callbacks */
-
- shutdown_tid = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10), &shutdown_task, NULL);
-
+ basic_mqtt_op_publish = GNUNET_TESTBED_service_connect (NULL, /* Closure for
operation */
+ peers[0], /* The peer
whose service to connect to */
+
"gnunet-service-mqtt", /* The name of the service */
+
service_connect_comp_publish, /* callback to call after a handle to service is
opened */
+ NULL, /* closure for
the above callback */
+ gmqtt_ca_publish, /*
callback to call with peer's configuration; this should open the needed service
connection */
+ gmqtt_da_publish, /*
callback to be called when closing the opened service connection */
+ NULL); /* closure for
the above two callbacks */
+ basic_mqtt_op_subscribe = GNUNET_TESTBED_service_connect (NULL, /* Closure
for operation */
+ peers[1], /* The
peer whose service to connect to */
+
"gnunet-service-mqtt", /* The name of the service */
+
service_connect_comp_subscribe, /* callback to call after a handle to service
is opened */
+ NULL, /* closure
for the above callback */
+ gmqtt_ca_subscribe,
/* callback to call with peer's configuration; this should open the needed
service connection */
+ gmqtt_da_subscribe,
/* callback to be called when closing the opened service connection */
+ NULL); /* closure
for the above two callbacks */
+
+ shutdown_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10),
+ &shutdown_task, NULL);
}
-int main (int argc, char **argv)
+
+int
+main (int argc, char **argv)
{
int ret;
result = GNUNET_SYSERR;
- FPRINTF (stdout, "\n Starting test for regex expressions (topics that
contain the # wildcard).\n\n");
-
ret = GNUNET_TESTBED_test_run("test mqtt regex", /* test case name */
- "template.conf", /* template configuration */
- 2, /* number of peers to start */
- 0LL, /* Event mask -set to 0 for no event
notifications */
- NULL, /* Controller event callback */
- NULL, /* Closure for controller event callback */
- &test_master, /* continuation callback to be
called when testbed setup is complete */
- NULL); /* Closure for the test_master callback */
+ "template.conf", /* template configuration */
+ 2, /* number of peers to start */
+ 0LL, /* Event mask -set to 0 for no event
notifications */
+ NULL, /* Controller event callback */
+ NULL, /* Closure for controller event callback
*/
+ &test_master, /* continuation callback to be
called when testbed setup is complete */
+ NULL); /* Closure for the test_master callback
*/
if ( (GNUNET_OK != ret) || (GNUNET_OK != result) )
return 1;
return 0;
Modified: gnunet-mqtt/src/mqtt/test_mqtt_regex_plus.c
===================================================================
--- gnunet-mqtt/src/mqtt/test_mqtt_regex_plus.c 2013-10-14 13:21:42 UTC (rev
30184)
+++ gnunet-mqtt/src/mqtt/test_mqtt_regex_plus.c 2013-10-14 13:22:11 UTC (rev
30185)
@@ -36,10 +36,12 @@
*/
static struct GNUNET_MQTT_Handle *mqtt_handle_publish, *mqtt_handle_subscribe;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
-GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_publish;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op_subscribe;
+
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+
static int result;
/**
@@ -47,85 +49,89 @@
*/
static unsigned long long request_timeout = 5;
+
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
if (NULL != basic_mqtt_op_publish)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da() for closing
- down the connection */
- basic_mqtt_op_publish = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da() for closing
+ down the
connection */
+ basic_mqtt_op_publish = NULL;
}
if (NULL != basic_mqtt_op_subscribe)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da() for closing
- down the connection */
- basic_mqtt_op_subscribe = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da() for closing
+ down the
connection */
+ basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
in_time_shutdown_task ()
{
if (NULL != basic_mqtt_op_publish)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
- down the connection */
- basic_mqtt_op_publish = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_publish); /* calls the
gmqtt_da_publish() for closing
+ down the
connection */
+ basic_mqtt_op_publish = NULL;
}
-
if (NULL != basic_mqtt_op_subscribe)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
- down the connection */
- basic_mqtt_op_subscribe = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op_subscribe); /* calls the
gmqtt_da_subscribe() for closing
+ down the
connection */
+ basic_mqtt_op_subscribe = NULL;
}
-
GNUNET_SCHEDULER_cancel(shutdown_tid);
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
subscribe_result_callback (void *cls, uint8_t topic_len, char *topic,
size_t size, void *data)
{
result = GNUNET_OK;
- FPRINTF (stdout, "%s: %s -> %s\n", _("Message received"), topic,
+ FPRINTF (stdout,
+ "%s: %s -> %s\n",
+ _("Message received"),
+ topic,
(char*) data);
-
GNUNET_free (topic);
GNUNET_free (data);
-
in_time_shutdown_task();
}
+
static void
-service_connect_comp_publish (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+service_connect_comp_publish (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
- try_regexes ();
- struct GNUNET_TIME_Relative timeout;
- char *topic = "some/IntLevel1/topic";
- char *message = "test message";
+ struct GNUNET_TIME_Relative timeout;
+ const char *topic = "some/IntLevel1/topic";
+ const char *message = "test message";
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
- GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
+ GNUNET_MQTT_publish (mqtt_handle_publish, strlen(topic) + 1, topic,
strlen(message) + 1, message, timeout, NULL,
NULL);
- }
+}
+
static void
service_connect_comp_subscribe (void *cls, struct GNUNET_TESTBED_Operation *op,
- void *ca_result,
- const char *emsg)
+ void *ca_result,
+ const char *emsg)
{
struct GNUNET_TIME_Relative timeout;
- char *topic = "some/+/topic";
+ const char *topic = "some/+/topic";
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
@@ -172,38 +178,42 @@
mqtt_handle_subscribe = NULL;
}
+
static void
-test_master (void *cls, unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **peers)
+test_master (void *cls,
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
basic_mqtt_op_publish = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
- peers[0], /* The peer whose service
to connect to */
- "gnunet-service-mqtt", /* The name of
the service */
- service_connect_comp_publish, /*
callback to call after a handle to service is opened */
- NULL, /* closure for the above
callback */
- gmqtt_ca_publish, /* callback to
call with peer's configuration; this should open the needed service connection
*/
- gmqtt_da_publish, /* callback to be
called when closing the opened service connection */
- NULL); /* closure for the above two
callbacks */
+ peers[0], /* The peer
whose service to connect to */
+ "gnunet-service-mqtt",
/* The name of the service */
+
service_connect_comp_publish, /* callback to call after a handle to service is
opened */
+ NULL, /* closure for
the above callback */
+ gmqtt_ca_publish, /*
callback to call with peer's configuration; this should open the needed service
connection */
+ gmqtt_da_publish, /*
callback to be called when closing the opened service connection */
+ NULL); /* closure for
the above two callbacks */
basic_mqtt_op_subscribe = GNUNET_TESTBED_service_connect(NULL, /* Closure
for operation */
- peers[1], /* The peer whose service
to connect to */
- "gnunet-service-mqtt", /* The name
of the service */
- service_connect_comp_subscribe, /*
callback to call after a handle to service is opened */
- NULL, /* closure for the above
callback */
- gmqtt_ca_subscribe, /* callback to
call with peer's configuration; this should open the needed service connection
*/
- gmqtt_da_subscribe, /* callback to
be called when closing the opened service connection */
- NULL); /* closure for the above two
callbacks */
-
- shutdown_tid = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10), &shutdown_task, NULL);
-
+ peers[1], /* The
peer whose service to connect to */
+
"gnunet-service-mqtt", /* The name of the service */
+
service_connect_comp_subscribe, /* callback to call after a handle to service
is opened */
+ NULL, /* closure for
the above callback */
+ gmqtt_ca_subscribe,
/* callback to call with peer's configuration; this should open the needed
service connection */
+ gmqtt_da_subscribe,
/* callback to be called when closing the opened service connection */
+ NULL); /* closure
for the above two callbacks */
+
+ shutdown_tid = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10),
+ &shutdown_task, NULL);
}
+
int main (int argc, char **argv)
{
int ret;
result = GNUNET_SYSERR;
- FPRINTF (stdout, "\n Starting test for regex expressions (topics that
contain the + wildcard).\n\n");
-
ret = GNUNET_TESTBED_test_run("test mqtt regex", /* test case name */
"template.conf", /* template configuration */
2, /* number of peers to start */
Modified: gnunet-mqtt/src/mqtt/test_mqtt_single_peer.c
===================================================================
--- gnunet-mqtt/src/mqtt/test_mqtt_single_peer.c 2013-10-14 13:21:42 UTC
(rev 30184)
+++ gnunet-mqtt/src/mqtt/test_mqtt_single_peer.c 2013-10-14 13:22:11 UTC
(rev 30185)
@@ -35,9 +35,10 @@
*/
static struct GNUNET_MQTT_Handle *mqtt_handle;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op;
-GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op;
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+
static int result;
/**
@@ -45,6 +46,7 @@
*/
static unsigned long long request_timeout = 5;
+
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
@@ -59,30 +61,34 @@
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
in_time_shutdown_task ()
{
if (NULL != basic_mqtt_op)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op); /* calls the gmqtt_da()
for closing
- down the connection */
- basic_mqtt_op = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op); /* calls the gmqtt_da() for
closing
+ down the connection */
+ basic_mqtt_op = NULL;
}
GNUNET_SCHEDULER_cancel(shutdown_tid);
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
subscribe_result_callback (void *cls, uint8_t topic_len, char *topic,
size_t size, void *data)
{
result = GNUNET_OK;
- FPRINTF (stdout, "%s: %s -> %s\n", _("\nMessage received"), topic,
+ FPRINTF (stdout,
+ "%s: %s -> %s\n",
+ _("\nMessage received"),
+ topic,
(char*) data);
GNUNET_free (topic);
GNUNET_free (data);
-
in_time_shutdown_task();
}
@@ -92,21 +98,20 @@
void *ca_result,
const char *emsg)
{
- struct GNUNET_TIME_Relative timeout;
- char *topic = "some/topic";
- char *message = "test message";
-
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ struct GNUNET_TIME_Relative timeout;
+ const char *topic = "some/topic";
+ const char *message = "test message";
+
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
-
GNUNET_MQTT_subscribe (mqtt_handle, strlen(topic) + 1, topic, timeout,
NULL, NULL,
subscribe_result_callback, NULL);
GNUNET_MQTT_publish (mqtt_handle, strlen(topic) + 1, topic,
strlen(message) + 1, message, timeout, NULL,
NULL);
-
- shutdown_tid = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10), &shutdown_task, NULL);
+ shutdown_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10),
+ &shutdown_task, NULL);
}
@@ -119,6 +124,7 @@
return mqtt_handle;
}
+
static void
gmqtt_da (void *cls, void *op_result)
{
@@ -127,36 +133,39 @@
mqtt_handle = NULL;
}
+
static void
-test_master (void *cls, unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **peers)
+test_master (void *cls,
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
- basic_mqtt_op = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
- peers[0], /* The peer whose service
to connect to */
- "gnunet-service-mqtt", /* The name of
the service */
- service_connect_comp, /* callback to
call after a handle to service is opened */
- NULL, /* closure for the above
callback */
- gmqtt_ca, /* callback to call with
peer's configuration; this should open the needed service connection */
- gmqtt_da, /* callback to be called
when closing the opened service connection */
- NULL); /* closure for the above two
callbacks */
-
+ basic_mqtt_op = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
+ peers[0], /* The peer whose
service to connect to */
+ "gnunet-service-mqtt", /* The
name of the service */
+ service_connect_comp, /*
callback to call after a handle to service is opened */
+ NULL, /* closure for the above
callback */
+ gmqtt_ca, /* callback to call
with peer's configuration; this should open the needed service connection */
+ gmqtt_da, /* callback to be
called when closing the opened service connection */
+ NULL); /* closure for the
above two callbacks */
}
+
int main (int argc, char **argv)
{
int ret;
+
result = GNUNET_SYSERR;
-
- FPRINTF (stdout, "\n Starting test for mqtt single peer communication.\n\n");
-
- ret = GNUNET_TESTBED_test_run("test mqtt single peer comumunication", /*
test case name */
- "template_single_peer.conf", /* template
configuration */
- 1, /* number of peers to start */
- 0LL, /* Event mask -set to 0 for no event
notifications */
- NULL, /* Controller event callback */
- NULL, /* Closure for controller event callback */
- &test_master, /* continuation callback to be
called when testbed setup is complete */
- NULL); /* Closure for the test_master callback */
+ ret = GNUNET_TESTBED_test_run ("test mqtt single peer comumunication", /*
test case name */
+ "template_single_peer.conf", /* template
configuration */
+ 1, /* number of peers to start */
+ 0LL, /* Event mask -set to 0 for no event
notifications */
+ NULL, /* Controller event callback */
+ NULL, /* Closure for controller event callback
*/
+ &test_master, /* continuation callback to be
called when testbed setup is complete */
+ NULL); /* Closure for the test_master callback
*/
if ( (GNUNET_OK != ret) || (GNUNET_OK != result) )
return 1;
return 0;
Modified: gnunet-mqtt/src/mqtt/test_mqtt_unsubscribe.c
===================================================================
--- gnunet-mqtt/src/mqtt/test_mqtt_unsubscribe.c 2013-10-14 13:21:42 UTC
(rev 30184)
+++ gnunet-mqtt/src/mqtt/test_mqtt_unsubscribe.c 2013-10-14 13:22:11 UTC
(rev 30185)
@@ -35,9 +35,10 @@
*/
static struct GNUNET_MQTT_Handle *mqtt_handle;
-struct GNUNET_TESTBED_Operation *basic_mqtt_op;
-GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+static struct GNUNET_TESTBED_Operation *basic_mqtt_op;
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_tid;
+
static int result;
/**
@@ -45,28 +46,32 @@
*/
static unsigned long long request_timeout = 5;
+
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
shutdown_tid = GNUNET_SCHEDULER_NO_TASK;
if (NULL != basic_mqtt_op)
{
- GNUNET_TESTBED_operation_done (basic_mqtt_op); /* calls the gmqtt_da()
for closing
- down the connection */
- basic_mqtt_op = NULL;
+ GNUNET_TESTBED_operation_done (basic_mqtt_op); /* calls the gmqtt_da() for
closing
+ down the connection */
+ basic_mqtt_op = NULL;
}
GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */
}
+
static void
subscribe_result_callback (void *cls, uint8_t topic_len, char *topic,
size_t size, void *data)
{
result = GNUNET_SYSERR;
- FPRINTF (stdout, "%s: %s -> %s\n", _("\nMessage received"), topic,
+ FPRINTF (stdout,
+ "%s: %s -> %s\n",
+ _("\nMessage received"),
+ topic,
(char*) data);
-
GNUNET_free (topic);
GNUNET_free (data);
}
@@ -77,14 +82,13 @@
void *ca_result,
const char *emsg)
{
- struct GNUNET_TIME_Relative timeout;
- struct GNUNET_MQTT_SubscribeHandle *sh;
- char *topic = "some/topic";
- char *message = "test message";
-
- timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ struct GNUNET_TIME_Relative timeout;
+ struct GNUNET_MQTT_SubscribeHandle *sh;
+ const char *topic = "some/topic";
+ const char *message = "test message";
+
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
request_timeout);
-
sh = GNUNET_MQTT_subscribe (mqtt_handle, strlen(topic) + 1, topic, timeout,
NULL, NULL,
subscribe_result_callback, NULL);
@@ -106,6 +110,7 @@
return mqtt_handle;
}
+
static void
gmqtt_da (void *cls, void *op_result)
{
@@ -114,9 +119,14 @@
mqtt_handle = NULL;
}
+
static void
-test_master (void *cls, unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **peers)
+test_master (void *cls,
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
basic_mqtt_op = GNUNET_TESTBED_service_connect(NULL, /* Closure for
operation */
peers[0], /* The peer whose service
to connect to */
@@ -128,13 +138,13 @@
NULL); /* closure for the above two
callbacks */
}
-int main (int argc, char **argv)
+
+int
+main (int argc, char **argv)
{
int ret;
result = GNUNET_SYSERR;
- FPRINTF (stdout, "\n Starting test for mqtt unsubscribe operation.\n\n");
-
ret = GNUNET_TESTBED_test_run("test mqtt unsubscribe operation", /* test
case name */
"template_single_peer.conf", /* template
configuration */
1, /* number of peers to start */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30185 - gnunet-mqtt/src/mqtt,
gnunet <=