qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v2 3/6] net/colo-compare.c: Fix deadlock in compare_chr_send


From: Lukas Straub
Subject: [PATCH v2 3/6] net/colo-compare.c: Fix deadlock in compare_chr_send
Date: Sun, 26 Apr 2020 21:25:46 +0200

The chr_out chardev is connected to a filter-redirector
running in the main loop. qemu_chr_fe_write_all might block
here in compare_chr_send if the (socket-)buffer is full.
If another filter-redirector in the main loop want's to
send data to chr_pri_in it might also block if the buffer
is full. This leads to a deadlock because both event loops
get blocked.

Fix this by converting compare_chr_send to a coroutine and
putting the packets in a send queue. Also create a new
function notify_chr_send, since that should be independend.

Signed-off-by: Lukas Straub <address@hidden>
---
 net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 130 insertions(+), 43 deletions(-)

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 1de4220fe2..ff6a740284 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -32,6 +32,9 @@
 #include "migration/migration.h"
 #include "util.h"
 
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,20 @@ static int event_unhandled_count;
  *                    |packet  |  |packet  +    |packet  | |packet  +
  *                    +--------+  +--------+    +--------+ +--------+
  */
+
+typedef struct SendCo {
+    Coroutine *co;
+    GQueue send_list;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    uint32_t size;
+    uint32_t vnet_hdr_len;
+    uint8_t buf[];
+} SendEntry;
+
 typedef struct CompareState {
     Object parent;
 
@@ -91,6 +108,7 @@ typedef struct CompareState {
     SocketReadState pri_rs;
     SocketReadState sec_rs;
     SocketReadState notify_rs;
+    SendCo sendco;
     bool vnet_hdr;
     uint32_t compare_timeout;
     uint32_t expired_scan_cycle;
@@ -126,8 +144,11 @@ enum {
 static int compare_chr_send(CompareState *s,
                             const uint8_t *buf,
                             uint32_t size,
-                            uint32_t vnet_hdr_len,
-                            bool notify_remote_frame);
+                            uint32_t vnet_hdr_len);
+
+static int notify_chr_send(CompareState *s,
+                           const uint8_t *buf,
+                           uint32_t size);
 
 static bool packet_matches_str(const char *str,
                                const uint8_t *buf,
@@ -145,7 +166,7 @@ static void notify_remote_frame(CompareState *s)
     char msg[] = "DO_CHECKPOINT";
     int ret = 0;
 
-    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
     if (ret < 0) {
         error_report("Notify Xen COLO-frame failed");
     }
@@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState *s, 
Packet *pkt)
     ret = compare_chr_send(s,
                            pkt->data,
                            pkt->size,
-                           pkt->vnet_hdr_len,
-                           false);
+                           pkt->vnet_hdr_len);
     if (ret < 0) {
         error_report("colo send primary packet failed");
     }
@@ -699,63 +719,123 @@ static void colo_compare_connection(void *opaque, void 
*user_data)
     }
 }
 
-static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
-                            uint32_t size,
-                            uint32_t vnet_hdr_len,
-                            bool notify_remote_frame)
+static void coroutine_fn _compare_chr_send(void *opaque)
 {
+    CompareState *s = opaque;
+    SendCo *sendco = &s->sendco;
     int ret = 0;
-    uint32_t len = htonl(size);
 
-    if (!size) {
-        return 0;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)&len,
-                                    sizeof(len));
-    } else {
         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    }
 
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+        if (ret != sizeof(len)) {
+            g_free(entry);
+            goto err;
+        }
 
-    if (s->vnet_hdr) {
-        /*
-         * We send vnet header len make other module(like filter-redirector)
-         * know how to parse net packet correctly.
-         */
-        len = htonl(vnet_hdr_len);
+        if (s->vnet_hdr) {
+            /*
+             * We send vnet header len make other module(like 
filter-redirector)
+             * know how to parse net packet correctly.
+             */
+            len = htonl(entry->vnet_hdr_len);
 
-        if (!notify_remote_frame) {
             ret = qemu_chr_fe_write_all(&s->chr_out,
                                         (uint8_t *)&len,
                                         sizeof(len));
+
+            if (ret != sizeof(len)) {
+                g_free(entry);
+                goto err;
+            }
         }
 
-        if (ret != sizeof(len)) {
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+
+        if (ret != entry->size) {
+            g_free(entry);
             goto err;
         }
+
+        g_free(entry);
     }
 
-    if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)buf,
-                                    size);
-    } else {
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+    sendco->ret = 0;
+    goto out;
+
+err:
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry);
     }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int compare_chr_send(CompareState *s,
+                            const uint8_t *buf,
+                            uint32_t size,
+                            uint32_t vnet_hdr_len)
+{
+    SendCo *sendco = &s->sendco;
+    SendEntry *entry;
+
+    if (!size) {
+        return 0;
+    }
+
+    entry = g_malloc(sizeof(SendEntry) + size);
+    entry->size = size;
+    entry->vnet_hdr_len = vnet_hdr_len;
+    memcpy(entry->buf, buf, size);
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
+
+    /* assume success */
+    return 0;
+}
+
+static int notify_chr_send(CompareState *s,
+                           const uint8_t *buf,
+                           uint32_t size)
+{
+    int ret = 0;
+    uint32_t len = htonl(size);
+
+    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+                            (uint8_t *)&len,
+                            sizeof(len));
+
+    if (ret != sizeof(len)) {
+        goto err;
+    }
+
+    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+                                (uint8_t *)buf,
+                                size);
 
     if (ret != size) {
         goto err;
     }
 
     return 0;
-
 err:
     return ret < 0 ? ret : -EIO;
 }
@@ -1062,8 +1142,7 @@ static void compare_pri_rs_finalize(SocketReadState 
*pri_rs)
         compare_chr_send(s,
                          pri_rs->buf,
                          pri_rs->packet_len,
-                         pri_rs->vnet_hdr_len,
-                         false);
+                         pri_rs->vnet_hdr_len);
     } else {
         /* compare packet in the specified connection */
         colo_compare_connection(conn, s);
@@ -1093,7 +1172,7 @@ static void compare_notify_rs_finalize(SocketReadState 
*notify_rs)
     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
                            notify_rs->buf,
                            notify_rs->packet_len)) {
-        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
         if (ret < 0) {
             error_report("Notify Xen COLO-frame INIT failed");
         }
@@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable *uc, 
Error **errp)
 
     QTAILQ_INSERT_TAIL(&net_compares, s, next);
 
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
+
     g_queue_init(&s->conn_list);
 
     qemu_mutex_init(&event_mtx);
@@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void 
*user_data)
         compare_chr_send(s,
                          pkt->data,
                          pkt->size,
-                         pkt->vnet_hdr_len,
-                         false);
+                         pkt->vnet_hdr_len);
         packet_destroy(pkt, NULL);
     }
     while (!g_queue_is_empty(&conn->secondary_list)) {
@@ -1281,6 +1362,11 @@ static void colo_compare_finalize(Object *obj)
     CompareState *s = COLO_COMPARE(obj);
     CompareState *tmp = NULL;
 
+    AioContext *ctx = iothread_get_aio_context(s->iothread);
+    aio_context_acquire(ctx);
+    AIO_WAIT_WHILE(ctx, !s->sendco.done);
+    aio_context_release(ctx);
+
     qemu_chr_fe_deinit(&s->chr_pri_in, false);
     qemu_chr_fe_deinit(&s->chr_sec_in, false);
     qemu_chr_fe_deinit(&s->chr_out, false);
@@ -1305,6 +1391,7 @@ static void colo_compare_finalize(Object *obj)
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 
     g_queue_clear(&s->conn_list);
+    g_queue_clear(&s->sendco.send_list);
 
     if (s->connection_track_table) {
         g_hash_table_destroy(s->connection_track_table);
-- 
2.20.1

Attachment: pgpIMtoUl6TZ4.pgp
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]