>From 2cf2bea12c8639355bf3561dc772cad773746026 Mon Sep 17 00:00:00 2001
From: Jeff Darcy
Date: Tue, 31 May 2011 12:13:30 -0400
Subject: [PATCH] Use separate polling thread for each connection.
Good: 2x performance with SSL transport
Bad: doesn't work with portmapper, doesn't clean up properly on disconnect
Ugly: there's still a race with mgmt code calling submit_reply and then
freeing our data out from under us if the message isn't sent
synchronously
---
glusterfsd/src/glusterfsd-mgmt.c | 3 +-
rpc/rpc-transport/socket/src/socket.c | 257 +++++++++++++++++++++------------
rpc/rpc-transport/socket/src/socket.h | 2 +
3 files changed, 167 insertions(+), 95 deletions(-)
diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c
index 1f5f648..413790b 100644
--- a/glusterfsd/src/glusterfsd-mgmt.c
+++ b/glusterfsd/src/glusterfsd-mgmt.c
@@ -877,6 +877,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
gf_log ("", GF_LOG_WARNING, "failed to create rpc clnt");
goto out;
}
+ /* This is used from within mgmt_rpc_notify, so LET'S SET IT FIRST! */
+ ctx->mgmt = rpc;
ret = rpc_clnt_register_notify (rpc, mgmt_rpc_notify, THIS);
if (ret) {
@@ -894,7 +896,6 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
if (ret)
goto out;
- ctx->mgmt = rpc;
out:
return ret;
}
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index dc84da7..31e8eac 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1,4 +1,5 @@
/*
+ * #endif
Copyright (c) 2010 Gluster, Inc.
This file is part of GlusterFS.
@@ -51,6 +52,10 @@
#define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
#define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list"
+#define POLL_MASK_INPUT (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL)
+
#define __socket_proto_reset_pending(priv) do { \
memset (&priv->incoming.frag.vector, 0, \
sizeof (priv->incoming.frag.vector)); \
@@ -136,6 +141,17 @@
__socket_proto_update_priv_after_read (priv, ret, bytes_read); \
}
+/*
+ * The RPC notification callbacks have traditionally only been called from the
+ * single polling thread, and might fail in hard-to-debug ways if called more
+ * than once concurrently. Worse, that might appear to work most of the time
+ * and then fail only occasionally. To guard against that, we use this lock
+ * to ensure that we still have only one concurrent up-call, even though it's
+ * coming from one of our own polling threads instead of the global one. See
+ * the top of socket_poller for one interaction where this turned out to be
+ * necessary - and yes, it was hard to debug.
+ */
+pthread_mutex_t socket_global_lock = PTHREAD_MUTEX_INITIALIZER;
int socket_init (rpc_transport_t *this);
@@ -755,9 +771,11 @@ out:
int
-__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
{
- int ret = -1;
+ int ret = -1;
+ socket_private_t *priv = NULL;
+ char a_byte = 0;
ret = __socket_writev (this, entry->pending_vector,
entry->pending_count,
@@ -768,6 +786,16 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
/* current entry was completely written */
GF_ASSERT (entry->pending_count == 0);
__socket_ioq_entry_free (entry);
+ priv = this->private;
+ /*
+ * The pipe should only remain readable if there are more
+ * entries after this, so drain the byte representing this
+ * entry.
+ */
+ if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "read error on pipe");
+ }
}
return ret;
@@ -790,18 +818,11 @@ __socket_ioq_churn (rpc_transport_t *this)
/* pick next entry */
entry = priv->ioq_next;
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 0);
if (ret != 0)
break;
}
-
- if (list_empty (&priv->ioq)) {
- /* all pending writes done, not interested in POLLOUT */
- priv->idx = event_select_on (this->ctx->event_pool,
- priv->sock, priv->idx, -1, 0);
- }
-
out:
return ret;
}
@@ -855,7 +876,9 @@ socket_event_poll_out (rpc_transport_t *this)
}
pthread_mutex_unlock (&priv->lock);
+ pthread_mutex_lock(&socket_global_lock);
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL);
+ pthread_mutex_unlock(&socket_global_lock);
out:
return ret;
@@ -1778,10 +1801,12 @@ socket_event_poll_in (rpc_transport_t *this)
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ pthread_mutex_lock(&socket_global_lock);
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
rpc_transport_pollin_destroy (pollin);
+ pthread_mutex_unlock(&socket_global_lock);
}
return ret;
@@ -1860,55 +1885,90 @@ out:
}
-/* reads rpc_requests during pollin */
-int
-socket_event_handler (int fd, int idx, void *data,
- int poll_in, int poll_out, int poll_err)
+void *
+socket_poller (void *ctx)
{
- rpc_transport_t *this = NULL;
- socket_private_t *priv = NULL;
- int ret = 0;
-
- this = data;
- GF_VALIDATE_OR_GOTO ("socket", this, out);
- GF_VALIDATE_OR_GOTO ("socket", this->private, out);
- GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
-
- THIS = this->xl;
- priv = this->private;
-
-
- pthread_mutex_lock (&priv->lock);
- {
- priv->idx = idx;
- }
- pthread_mutex_unlock (&priv->lock);
-
+ rpc_transport_t *this = ctx;
+ socket_private_t *priv = this->private;
+ struct pollfd pfd[2] = {{0,},};
+ gf_boolean_t to_write = _gf_false;
+ int ret = 0;
+
+ /*
+ * We can't actually start doing anything that might generate upcalls
+ * until the thread that created us releases the lock that they held
+ * while doing so. Unfortunately, there's no reasonable way for us
+ * down here in socket-land to associate the creator with the created
+ * when the interactions are occurring through a glusterd callback, so
+ * instead of using the more obvious pthread_cond_t approach we just
+ * (ab)use the global lock as a kind of gate. Once we have the lock
+ * we've passed the gate and don't need to do anything more, so we
+ * just release it right away.
+ */
+ pthread_mutex_lock(&socket_global_lock);
if (!priv->connected) {
+ THIS = this->xl;
ret = socket_connect_finish (this);
}
+ pthread_mutex_unlock(&socket_global_lock);
- if (!ret && poll_out) {
- ret = socket_event_poll_out (this);
- }
-
- if (!ret && poll_in) {
- ret = socket_event_poll_in (this);
- }
-
- if ((ret < 0) || poll_err) {
- /* Logging has happened already in earlier cases */
- gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
- "disconnecting now");
- socket_event_poll_err (this);
- rpc_transport_unref (this);
- }
-
-out:
- return 0;
+ for (;;) {
+ pthread_mutex_lock(&priv->lock);
+ to_write = !list_empty(&priv->ioq);
+ pthread_mutex_unlock(&priv->lock);
+ pfd[0].fd = priv->pipe[0];
+ pfd[0].events = POLL_MASK_ERROR;
+ pfd[0].revents = 0;
+ pfd[1].fd = priv->sock;
+ pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+ pfd[1].revents = 0;
+ if (to_write) {
+ pfd[1].events |= POLL_MASK_OUTPUT;
+ }
+ else {
+ pfd[0].events |= POLL_MASK_INPUT;
+ }
+ if (poll(pfd,2,-1) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,"poll failed");
+ return NULL;
+ }
+ if (pfd[0].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on pipe");
+ return NULL;
+ }
+ if (pfd[1].revents & POLL_MASK_ERROR) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "poll error on socket");
+ return NULL;
+ }
+ /* Only glusterd actually seems to need this. */
+ THIS = this->xl;
+ if (pfd[1].revents & POLL_MASK_INPUT) {
+ ret = socket_event_poll_in(this);
+ }
+ else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+ ret = socket_event_poll_out(this);
+ }
+ else {
+ /*
+ * This usually means that we left poll() because
+ * somebody pushed a byte onto our pipe. That wakeup
+ * is why the pipe is there, but once awake we can do
+ * all the checking we need on the next iteration.
+ */
+ ret = 0;
+ }
+ if (ret < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "unknown error in polling loop");
+ return NULL;
+ }
+ }
}
+
int
socket_server_event_handler (int fd, int idx, void *data,
int poll_in, int poll_out, int poll_err)
@@ -2035,14 +2095,16 @@ socket_server_event_handler (int fd, int idx, void *data,
new_priv->connected = 1;
rpc_transport_ref (new_trans);
- new_priv->idx =
- event_register (ctx->event_pool,
- new_sock,
- socket_event_handler,
- new_trans, 1, 0);
+ if (pipe(new_priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
- if (new_priv->idx == -1)
- ret = -1;
+ if (pthread_create(&new_priv->thread,NULL,
+ socket_poller,new_trans) != 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create poll thread");
+ }
}
pthread_mutex_unlock (&new_priv->lock);
if (ret == -1) {
@@ -2242,16 +2304,18 @@ socket_connect (rpc_transport_t *this, int port)
}
priv->connected = 0;
-
rpc_transport_ref (this);
- priv->idx = event_register (ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1);
- if (priv->idx == -1) {
- gf_log ("", GF_LOG_WARNING,
- "failed to register the event");
- ret = -1;
- }
+ if (pipe(priv->pipe) < 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create pipe");
+ }
+
+ if (pthread_create(&priv->thread,NULL,
+ socket_poller, this) != 0) {
+ gf_log(this->name,GF_LOG_ERROR,
+ "could not create poll thread");
+ }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2413,10 +2477,10 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
{
socket_private_t *priv = NULL;
int ret = -1;
- char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'j';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2442,26 +2506,28 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
goto unlock;
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "request sent in-line");
need_append = 0;
-
- if (ret > 0)
- need_poll_out = 1;
+ }
}
if (need_append) {
+ gf_log(this->name,GF_LOG_DEBUG,"deferring request");
list_add_tail (&entry->list, &priv->ioq);
+ /*
+ * Make sure the polling thread wakes up, by writing a
+ * byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
ret = 0;
}
-
- if (need_poll_out) {
- /* first entry to wait. continue writing on POLLOUT */
- priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
- priv->idx, -1, 1);
- }
}
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2476,10 +2542,10 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
{
socket_private_t *priv = NULL;
int ret = -1;
- char need_poll_out = 0;
char need_append = 1;
struct ioq *entry = NULL;
glusterfs_ctx_t *ctx = NULL;
+ char a_byte = 'd';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2498,33 +2564,36 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
}
goto unlock;
}
+
priv->submit_log = 0;
entry = __socket_ioq_new (this, &reply->msg);
if (!entry)
goto unlock;
+
if (list_empty (&priv->ioq)) {
- ret = __socket_ioq_churn_entry (this, entry);
+ ret = __socket_ioq_churn_entry (this, entry, 1);
- if (ret == 0)
+ if (ret == 0) {
+ gf_log(this->name,GF_LOG_DEBUG,
+ "reply sent in-line");
need_append = 0;
-
- if (ret > 0)
- need_poll_out = 1;
+ }
}
if (need_append) {
+ gf_log(this->name,GF_LOG_DEBUG,"deferring reply");
list_add_tail (&entry->list, &priv->ioq);
+ /*
+ * Make sure the polling thread wakes up, by writing a
+ * byte to represent this entry.
+ */
+ if (write(priv->pipe[1],&a_byte,1) < 1) {
+ gf_log(this->name,GF_LOG_WARNING,
+ "write error on pipe");
+ }
ret = 0;
}
-
- if (need_poll_out) {
- /* first entry to wait. continue writing on POLLOUT */
- priv->idx = event_select_on (ctx->event_pool,
- priv->sock,
- priv->idx, -1, 1);
- }
}
-
unlock:
pthread_mutex_unlock (&priv->lock);
@@ -2692,7 +2761,7 @@ socket_init (rpc_transport_t *this)
priv->idx = -1;
priv->connected = -1;
priv->nodelay = 1;
- priv->bio = 0;
+ priv->bio = 1;
priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
INIT_LIST_HEAD (&priv->ioq);
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index eaa38a9..625306c 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -204,6 +204,8 @@ typedef struct {
char *ssl_own_cert;
char *ssl_private_key;
char *ssl_ca_list;
+ pthread_t thread;
+ int pipe[2];
} socket_private_t;
--
1.7.3.4