[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v12 2/7] block/nbd.c: Add yank feature
From: |
Lukas Straub |
Subject: |
[PATCH v12 2/7] block/nbd.c: Add yank feature |
Date: |
Sun, 13 Dec 2020 12:48:17 +0100 |
Register a yank function which shuts down the socket and sets
s->state = NBD_CLIENT_QUIT. This is the same behaviour as if an
error occured.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
---
block/nbd.c | 154 +++++++++++++++++++++++++++++++---------------------
1 file changed, 93 insertions(+), 61 deletions(-)
diff --git a/block/nbd.c b/block/nbd.c
index 42536702b6..994d1e7b33 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -35,6 +35,7 @@
#include "qemu/option.h"
#include "qemu/cutils.h"
#include "qemu/main-loop.h"
+#include "qemu/atomic.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/qmp/qstring.h"
@@ -44,6 +45,8 @@
#include "block/nbd.h"
#include "block/block_int.h"
+#include "qemu/yank.h"
+
#define EN_OPTSTR ":exportname="
#define MAX_NBD_REQUESTS 16
@@ -141,14 +144,13 @@ typedef struct BDRVNBDState {
NBDConnectThread *connect_thread;
} BDRVNBDState;
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
- Error **errp);
-static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs,
- Error **errp);
+static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
+ Error **errp);
+static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
bool detach);
-static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
- Error **errp);
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
+static void nbd_yank(void *opaque);
static void nbd_clear_bdrvstate(BDRVNBDState *s)
{
@@ -166,12 +168,12 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
if (ret == -EIO) {
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
s->state = NBD_CLIENT_QUIT;
@@ -204,7 +206,7 @@ static void reconnect_delay_timer_cb(void *opaque)
{
BDRVNBDState *s = opaque;
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
while (qemu_co_enter_next(&s->free_sema, NULL)) {
/* Resume all queued requests */
@@ -216,7 +218,7 @@ static void reconnect_delay_timer_cb(void *opaque)
static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t
expire_time_ns)
{
- if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
return;
}
@@ -261,7 +263,7 @@ static void nbd_client_attach_aio_context(BlockDriverState
*bs,
* s->connection_co is either yielded from nbd_receive_reply or from
* nbd_co_reconnect_loop()
*/
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
}
@@ -287,7 +289,7 @@ static void coroutine_fn
nbd_client_co_drain_begin(BlockDriverState *bs)
reconnect_delay_timer_del(s);
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
qemu_co_queue_restart_all(&s->free_sema);
}
@@ -338,13 +340,14 @@ static void nbd_teardown_connection(BlockDriverState *bs)
static bool nbd_client_connecting(BDRVNBDState *s)
{
- return s->state == NBD_CLIENT_CONNECTING_WAIT ||
- s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+ NBDClientState state = qatomic_load_acquire(&s->state);
+ return state == NBD_CLIENT_CONNECTING_WAIT ||
+ state == NBD_CLIENT_CONNECTING_NOWAIT;
}
static bool nbd_client_connecting_wait(BDRVNBDState *s)
{
- return s->state == NBD_CLIENT_CONNECTING_WAIT;
+ return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
}
static void connect_bh(void *opaque)
@@ -424,12 +427,12 @@ static void *connect_thread_func(void *opaque)
return NULL;
}
-static QIOChannelSocket *coroutine_fn
+static int coroutine_fn
nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
{
+ int ret;
QemuThread thread;
BDRVNBDState *s = bs->opaque;
- QIOChannelSocket *res;
NBDConnectThread *thr = s->connect_thread;
qemu_mutex_lock(&thr->mutex);
@@ -446,10 +449,12 @@ nbd_co_establish_connection(BlockDriverState *bs, Error
**errp)
case CONNECT_THREAD_SUCCESS:
/* Previous attempt finally succeeded in background */
thr->state = CONNECT_THREAD_NONE;
- res = thr->sioc;
+ s->sioc = thr->sioc;
thr->sioc = NULL;
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
qemu_mutex_unlock(&thr->mutex);
- return res;
+ return 0;
case CONNECT_THREAD_RUNNING:
/* Already running, will wait */
break;
@@ -481,8 +486,13 @@ nbd_co_establish_connection(BlockDriverState *bs, Error
**errp)
thr->state = CONNECT_THREAD_NONE;
error_propagate(errp, thr->err);
thr->err = NULL;
- res = thr->sioc;
+ s->sioc = thr->sioc;
thr->sioc = NULL;
+ if (s->sioc) {
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ }
+ ret = (s->sioc ? 0 : -1);
break;
case CONNECT_THREAD_RUNNING:
case CONNECT_THREAD_RUNNING_DETACHED:
@@ -491,7 +501,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error
**errp)
* failed. Still connect thread is executing in background, and its
* result may be used for next connection attempt.
*/
- res = NULL;
+ ret = -1;
error_setg(errp, "Connection attempt cancelled by other operation");
break;
@@ -508,7 +518,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error
**errp)
qemu_mutex_unlock(&thr->mutex);
- return res;
+ return ret;
}
/*
@@ -561,7 +571,6 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState
*s)
{
int ret;
Error *local_err = NULL;
- QIOChannelSocket *sioc;
if (!nbd_client_connecting(s)) {
return;
@@ -594,21 +603,22 @@ static coroutine_fn void
nbd_reconnect_attempt(BDRVNBDState *s)
/* Finalize previous connection if any */
if (s->ioc) {
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
- sioc = nbd_co_establish_connection(s->bs, &local_err);
- if (!sioc) {
+ if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
ret = -ECONNREFUSED;
goto out;
}
bdrv_dec_in_flight(s->bs);
- ret = nbd_client_handshake(s->bs, sioc, &local_err);
+ ret = nbd_client_handshake(s->bs, &local_err);
if (s->drained) {
s->wait_drained_end = true;
@@ -640,7 +650,7 @@ static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState
*s)
uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
s->reconnect_delay *
NANOSECONDS_PER_SECOND);
}
@@ -683,7 +693,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
int ret = 0;
Error *local_err = NULL;
- while (s->state != NBD_CLIENT_QUIT) {
+ while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
/*
* The NBD client can only really be considered idle when it has
* yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -698,7 +708,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
nbd_co_reconnect_loop(s);
}
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
continue;
}
@@ -753,6 +763,8 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
s->connection_co = NULL;
if (s->ioc) {
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
@@ -777,7 +789,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
rc = -EIO;
goto err;
}
@@ -804,7 +816,8 @@ static int nbd_co_send_request(BlockDriverState *bs,
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
- if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
+ rc >= 0) {
if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
NULL) < 0) {
rc = -EIO;
@@ -1129,7 +1142,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
s->requests[i].receiving = true;
qemu_coroutine_yield();
s->requests[i].receiving = false;
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
error_setg(errp, "Connection closed");
return -EIO;
}
@@ -1288,7 +1301,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReply local_reply;
NBDStructuredReplyChunk *chunk;
Error *local_err = NULL;
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
error_setg(&local_err, "Connection closed");
nbd_iter_channel_error(iter, -EIO, &local_err);
goto break_loop;
@@ -1313,7 +1326,8 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
}
/* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
- if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
+ if (nbd_reply_is_simple(reply) ||
+ qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
goto break_loop;
}
@@ -1745,6 +1759,15 @@ static int nbd_client_reopen_prepare(BDRVReopenState
*state,
return 0;
}
+static void nbd_yank(void *opaque)
+{
+ BlockDriverState *bs = opaque;
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+ qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+ qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
+}
+
static void nbd_client_close(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
@@ -1757,52 +1780,53 @@ static void nbd_client_close(BlockDriverState *bs)
nbd_teardown_connection(bs);
}
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
- Error **errp)
+static int nbd_establish_connection(BlockDriverState *bs,
+ SocketAddress *saddr,
+ Error **errp)
{
ERRP_GUARD();
- QIOChannelSocket *sioc;
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- sioc = qio_channel_socket_new();
- qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
+ s->sioc = qio_channel_socket_new();
+ qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
- qio_channel_socket_connect_sync(sioc, saddr, errp);
+ qio_channel_socket_connect_sync(s->sioc, saddr, errp);
if (*errp) {
- object_unref(OBJECT(sioc));
- return NULL;
+ object_unref(OBJECT(s->sioc));
+ s->sioc = NULL;
+ return -1;
}
- qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank,
bs);
+ qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
- return sioc;
+ return 0;
}
-/* nbd_client_handshake takes ownership on sioc. On failure it is unref'ed. */
-static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
- Error **errp)
+/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed.
*/
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
AioContext *aio_context = bdrv_get_aio_context(bs);
int ret;
trace_nbd_client_handshake(s->export);
-
- s->sioc = sioc;
-
- qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
- qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);
+ qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
+ qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
s->info.request_sizes = true;
s->info.structured_reply = true;
s->info.base_allocation = true;
s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
s->info.name = g_strdup(s->export ?: "");
- ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
+ ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
s->hostname, &s->ioc, &s->info, errp);
g_free(s->info.x_dirty_bitmap);
g_free(s->info.name);
if (ret < 0) {
- object_unref(OBJECT(sioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
}
@@ -1835,7 +1859,7 @@ static int nbd_client_handshake(BlockDriverState *bs,
QIOChannelSocket *sioc,
}
if (!s->ioc) {
- s->ioc = QIO_CHANNEL(sioc);
+ s->ioc = QIO_CHANNEL(s->sioc);
object_ref(OBJECT(s->ioc));
}
@@ -1851,9 +1875,11 @@ static int nbd_client_handshake(BlockDriverState *bs,
QIOChannelSocket *sioc,
{
NBDRequest request = { .type = NBD_CMD_DISC };
- nbd_send_request(s->ioc ?: QIO_CHANNEL(sioc), &request);
+ nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
- object_unref(OBJECT(sioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
@@ -2245,7 +2271,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options,
int flags,
{
int ret;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- QIOChannelSocket *sioc;
ret = nbd_process_options(bs, options, errp);
if (ret < 0) {
@@ -2256,17 +2281,23 @@ static int nbd_open(BlockDriverState *bs, QDict
*options, int flags,
qemu_co_mutex_init(&s->send_mutex);
qemu_co_queue_init(&s->free_sema);
+ yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp);
+ if (*errp) {
+ return -EEXIST;
+ }
+
/*
* establish TCP connection, return error if it fails
* TODO: Configurable retry-until-timeout behaviour.
*/
- sioc = nbd_establish_connection(s->saddr, errp);
- if (!sioc) {
+ if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
return -ECONNREFUSED;
}
- ret = nbd_client_handshake(bs, sioc, errp);
+ ret = nbd_client_handshake(bs, errp);
if (ret < 0) {
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
return ret;
}
@@ -2326,6 +2357,7 @@ static void nbd_close(BlockDriverState *bs)
BDRVNBDState *s = bs->opaque;
nbd_client_close(bs);
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
}
--
2.20.1
pgpYjEyP8FwHo.pgp
Description: OpenPGP digital signature
- [PATCH v12 0/7] Introduce 'yank' oob qmp command to recover from hanging qemu, Lukas Straub, 2020/12/13
- [PATCH v12 1/7] Introduce yank feature, Lukas Straub, 2020/12/13
- [PATCH v12 4/7] migration: Add yank feature, Lukas Straub, 2020/12/13
- [PATCH v12 5/7] io/channel-tls.c: make qio_channel_tls_shutdown thread-safe, Lukas Straub, 2020/12/13
- [PATCH v12 6/7] io: Document qmp oob suitability of qio_channel_shutdown and io_shutdown, Lukas Straub, 2020/12/13
- [PATCH v12 2/7] block/nbd.c: Add yank feature,
Lukas Straub <=
- [PATCH v12 3/7] chardev/char-socket.c: Add yank feature, Lukas Straub, 2020/12/13
- [PATCH v12 7/7] tests/test-char.c: Wait for the chardev to connect in char_socket_client_dupid_test, Lukas Straub, 2020/12/13
- Re: [PATCH v12 0/7] Introduce 'yank' oob qmp command to recover from hanging qemu, Markus Armbruster, 2020/12/15