>From 2cf2bea12c8639355bf3561dc772cad773746026 Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Tue, 31 May 2011 12:13:30 -0400 Subject: [PATCH] Use separate polling thread for each connection. Good: 2x performance with SSL transport Bad: doesn't work with portmapper, doesn't clean up properly on disconnect Ugly: there's still a race with mgmt code calling submit_reply and then freeing our data out from under us if the message isn't sent synchronously --- glusterfsd/src/glusterfsd-mgmt.c | 3 +- rpc/rpc-transport/socket/src/socket.c | 257 +++++++++++++++++++++------------ rpc/rpc-transport/socket/src/socket.h | 2 + 3 files changed, 167 insertions(+), 95 deletions(-) diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index 1f5f648..413790b 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -877,6 +877,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) gf_log ("", GF_LOG_WARNING, "failed to create rpc clnt"); goto out; } + /* This is used from within mgmt_rpc_notify, so LET'S SET IT FIRST! */ + ctx->mgmt = rpc; ret = rpc_clnt_register_notify (rpc, mgmt_rpc_notify, THIS); if (ret) { @@ -894,7 +896,6 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) if (ret) goto out; - ctx->mgmt = rpc; out: return ret; } diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index dc84da7..31e8eac 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1,4 +1,5 @@ /* + * #endif Copyright (c) 2010 Gluster, Inc. This file is part of GlusterFS. @@ -51,6 +52,10 @@ #define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key" #define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list" +#define POLL_MASK_INPUT (POLLIN | POLLPRI) +#define POLL_MASK_OUTPUT (POLLOUT) +#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL) + #define __socket_proto_reset_pending(priv) do { \ memset (&priv->incoming.frag.vector, 0, \ sizeof (priv->incoming.frag.vector)); \ @@ -136,6 +141,17 @@ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } +/* + * The RPC notification callbacks have traditionally only been called from the + * single polling thread, and might fail in hard-to-debug ways if called more + * than once concurrently. Worse, that might appear to work most of the time + * and then fail only occasionally. To guard against that, we use this lock + * to ensure that we still have only one concurrent up-call, even though it's + * coming from one of our own polling threads instead of the global one. See + * the top of socket_poller for one interaction where this turned out to be + * necessary - and yes, it was hard to debug. + */ +pthread_mutex_t socket_global_lock = PTHREAD_MUTEX_INITIALIZER; int socket_init (rpc_transport_t *this); @@ -755,9 +771,11 @@ out: int -__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) +__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { - int ret = -1; + int ret = -1; + socket_private_t *priv = NULL; + char a_byte = 0; ret = __socket_writev (this, entry->pending_vector, entry->pending_count, @@ -768,6 +786,16 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) /* current entry was completely written */ GF_ASSERT (entry->pending_count == 0); __socket_ioq_entry_free (entry); + priv = this->private; + /* + * The pipe should only remain readable if there are more + * entries after this, so drain the byte representing this + * entry. + */ + if (!direct && read(priv->pipe[0],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "read error on pipe"); + } } return ret; @@ -790,18 +818,11 @@ __socket_ioq_churn (rpc_transport_t *this) /* pick next entry */ entry = priv->ioq_next; - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 0); if (ret != 0) break; } - - if (list_empty (&priv->ioq)) { - /* all pending writes done, not interested in POLLOUT */ - priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, priv->idx, -1, 0); - } - out: return ret; } @@ -855,7 +876,9 @@ socket_event_poll_out (rpc_transport_t *this) } pthread_mutex_unlock (&priv->lock); + pthread_mutex_lock(&socket_global_lock); ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); + pthread_mutex_unlock(&socket_global_lock); out: return ret; @@ -1778,10 +1801,12 @@ socket_event_poll_in (rpc_transport_t *this) ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + pthread_mutex_lock(&socket_global_lock); ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); rpc_transport_pollin_destroy (pollin); + pthread_mutex_unlock(&socket_global_lock); } return ret; @@ -1860,55 +1885,90 @@ out: } -/* reads rpc_requests during pollin */ -int -socket_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) +void * +socket_poller (void *ctx) { - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - - this = data; - GF_VALIDATE_OR_GOTO ("socket", this, out); - GF_VALIDATE_OR_GOTO ("socket", this->private, out); - GF_VALIDATE_OR_GOTO ("socket", this->xl, out); - - THIS = this->xl; - priv = this->private; - - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - } - pthread_mutex_unlock (&priv->lock); - + rpc_transport_t *this = ctx; + socket_private_t *priv = this->private; + struct pollfd pfd[2] = {{0,},}; + gf_boolean_t to_write = _gf_false; + int ret = 0; + + /* + * We can't actually start doing anything that might generate upcalls + * until the thread that created us releases the lock that they held + * while doing so. Unfortunately, there's no reasonable way for us + * down here in socket-land to associate the creator with the created + * when the interactions are occurring through a glusterd callback, so + * instead of using the more obvious pthread_cond_t approach we just + * (ab)use the global lock as a kind of gate. Once we have the lock + * we've passed the gate and don't need to do anything more, so we + * just release it right away. + */ + pthread_mutex_lock(&socket_global_lock); if (!priv->connected) { + THIS = this->xl; ret = socket_connect_finish (this); } + pthread_mutex_unlock(&socket_global_lock); - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this); - } - - if ((ret < 0) || poll_err) { - /* Logging has happened already in earlier cases */ - gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), - "disconnecting now"); - socket_event_poll_err (this); - rpc_transport_unref (this); - } - -out: - return 0; + for (;;) { + pthread_mutex_lock(&priv->lock); + to_write = !list_empty(&priv->ioq); + pthread_mutex_unlock(&priv->lock); + pfd[0].fd = priv->pipe[0]; + pfd[0].events = POLL_MASK_ERROR; + pfd[0].revents = 0; + pfd[1].fd = priv->sock; + pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; + pfd[1].revents = 0; + if (to_write) { + pfd[1].events |= POLL_MASK_OUTPUT; + } + else { + pfd[0].events |= POLL_MASK_INPUT; + } + if (poll(pfd,2,-1) < 0) { + gf_log(this->name,GF_LOG_ERROR,"poll failed"); + return NULL; + } + if (pfd[0].revents & POLL_MASK_ERROR) { + gf_log(this->name,GF_LOG_ERROR, + "poll error on pipe"); + return NULL; + } + if (pfd[1].revents & POLL_MASK_ERROR) { + gf_log(this->name,GF_LOG_ERROR, + "poll error on socket"); + return NULL; + } + /* Only glusterd actually seems to need this. */ + THIS = this->xl; + if (pfd[1].revents & POLL_MASK_INPUT) { + ret = socket_event_poll_in(this); + } + else if (pfd[1].revents & POLL_MASK_OUTPUT) { + ret = socket_event_poll_out(this); + } + else { + /* + * This usually means that we left poll() because + * somebody pushed a byte onto our pipe. That wakeup + * is why the pipe is there, but once awake we can do + * all the checking we need on the next iteration. + */ + ret = 0; + } + if (ret < 0) { + gf_log(this->name,GF_LOG_ERROR, + "unknown error in polling loop"); + return NULL; + } + } } + int socket_server_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) @@ -2035,14 +2095,16 @@ socket_server_event_handler (int fd, int idx, void *data, new_priv->connected = 1; rpc_transport_ref (new_trans); - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); + if (pipe(new_priv->pipe) < 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create pipe"); + } - if (new_priv->idx == -1) - ret = -1; + if (pthread_create(&new_priv->thread,NULL, + socket_poller,new_trans) != 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create poll thread"); + } } pthread_mutex_unlock (&new_priv->lock); if (ret == -1) { @@ -2242,16 +2304,18 @@ socket_connect (rpc_transport_t *this, int port) } priv->connected = 0; - rpc_transport_ref (this); - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); - if (priv->idx == -1) { - gf_log ("", GF_LOG_WARNING, - "failed to register the event"); - ret = -1; - } + if (pipe(priv->pipe) < 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create pipe"); + } + + if (pthread_create(&priv->thread,NULL, + socket_poller, this) != 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create poll thread"); + } } unlock: pthread_mutex_unlock (&priv->lock); @@ -2413,10 +2477,10 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + char a_byte = 'j'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2442,26 +2506,28 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) goto unlock; if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 1); - if (ret == 0) + if (ret == 0) { + gf_log(this->name,GF_LOG_DEBUG, + "request sent in-line"); need_append = 0; - - if (ret > 0) - need_poll_out = 1; + } } if (need_append) { + gf_log(this->name,GF_LOG_DEBUG,"deferring request"); list_add_tail (&entry->list, &priv->ioq); + /* + * Make sure the polling thread wakes up, by writing a + * byte to represent this entry. + */ + if (write(priv->pipe[1],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "write error on pipe"); + } ret = 0; } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } } unlock: pthread_mutex_unlock (&priv->lock); @@ -2476,10 +2542,10 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + char a_byte = 'd'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2498,33 +2564,36 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) } goto unlock; } + priv->submit_log = 0; entry = __socket_ioq_new (this, &reply->msg); if (!entry) goto unlock; + if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 1); - if (ret == 0) + if (ret == 0) { + gf_log(this->name,GF_LOG_DEBUG, + "reply sent in-line"); need_append = 0; - - if (ret > 0) - need_poll_out = 1; + } } if (need_append) { + gf_log(this->name,GF_LOG_DEBUG,"deferring reply"); list_add_tail (&entry->list, &priv->ioq); + /* + * Make sure the polling thread wakes up, by writing a + * byte to represent this entry. + */ + if (write(priv->pipe[1],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "write error on pipe"); + } ret = 0; } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } } - unlock: pthread_mutex_unlock (&priv->lock); @@ -2692,7 +2761,7 @@ socket_init (rpc_transport_t *this) priv->idx = -1; priv->connected = -1; priv->nodelay = 1; - priv->bio = 0; + priv->bio = 1; priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; INIT_LIST_HEAD (&priv->ioq); diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index eaa38a9..625306c 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -204,6 +204,8 @@ typedef struct { char *ssl_own_cert; char *ssl_private_key; char *ssl_ca_list; + pthread_t thread; + int pipe[2]; } socket_private_t; -- 1.7.3.4