qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH 2/4] migration/multifd/zero-copy: Merge header & pages send i


From: Leonardo Bras
Subject: [RFC PATCH 2/4] migration/multifd/zero-copy: Merge header & pages send in a single write
Date: Tue, 25 Oct 2022 01:47:29 -0300

When zero-copy-send is enabled, each loop iteration of the
multifd_send_thread will calls for qio_channel_write_*() twice: The first
one for sending the header without zero-copy flag and the second one for
sending the memory pages, with zero-copy flag enabled.

This ends up calling two syscalls per loop iteration, where one should be
enough.

Also, since the behavior for non-zero-copy write is synchronous, and the
behavior for zero-copy write is asynchronous, it ends up interleaving
synchronous and asynchronous writes, hurting performance that could
otherwise be improved.

The choice of sending the header without the zero-copy flag in a separated
write happened because the header memory area would be reused in the next
write, so it was almost certain to have changed before the kernel could
send the packet.

To send the packet with zero-copy, create an array of header area instead
of a single one, and use a different header area after each write. Also,
flush the sending queue after all the headers have been used.

To avoid adding a zero-copy conditional in multifd_send_fill_packet(),
add a packet parameter (the packet that should be filled). This way it's
simpler to pick which element of the array will be used as a header.

Suggested-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 migration/multifd.h |  5 ++++-
 migration/multifd.c | 52 ++++++++++++++++++++++++---------------------
 2 files changed, 32 insertions(+), 25 deletions(-)

diff --git a/migration/multifd.h b/migration/multifd.h
index 519f498643..7f200c0286 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -90,6 +90,7 @@ typedef struct {
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
+
     /* is this channel thread running */
     bool running;
     /* should this thread finish */
@@ -109,8 +110,10 @@ typedef struct {
 
     /* thread local variables. No locking required */
 
-    /* pointer to the packet */
+    /* pointer to the packet array */
     MultiFDPacket_t *packet;
+    /* index of the packet array */
+    uint32_t packet_idx;
     /* size of the next packet that contains pages */
     uint32_t next_packet_size;
     /* packets sent through this channel */
diff --git a/migration/multifd.c b/migration/multifd.c
index 509bbbe3bf..aa18c47141 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -34,6 +34,8 @@
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define HEADER_ARR_SZ 1024
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -255,9 +257,9 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
     g_free(pages);
 }
 
-static void multifd_send_fill_packet(MultiFDSendParams *p)
+static void multifd_send_fill_packet(MultiFDSendParams *p,
+                                     MultiFDPacket_t *packet)
 {
-    MultiFDPacket_t *packet = p->packet;
     int i;
 
     packet->flags = cpu_to_be32(p->flags);
@@ -547,6 +549,7 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        p->packet_idx = 0;
         g_free(p->iov);
         p->iov = NULL;
         g_free(p->normal);
@@ -651,6 +654,7 @@ int multifd_send_sync_main(QEMUFile *f)
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    MultiFDPacket_t *header = p->packet;
     Error *local_err = NULL;
     int ret = 0;
     bool use_zero_copy_send = migrate_use_zero_copy_send();
@@ -676,13 +680,9 @@ static void *multifd_send_thread(void *opaque)
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
-            p->normal_num = 0;
 
-            if (use_zero_copy_send) {
-                p->iovs_num = 0;
-            } else {
-                p->iovs_num = 1;
-            }
+            p->normal_num = 0;
+            p->iovs_num = 1;
 
             for (int i = 0; i < p->pages->num; i++) {
                 p->normal[p->normal_num] = p->pages->offset[i];
@@ -696,7 +696,8 @@ static void *multifd_send_thread(void *opaque)
                     break;
                 }
             }
-            multifd_send_fill_packet(p);
+
+            multifd_send_fill_packet(p, header);
             p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += p->normal_num;
@@ -707,18 +708,8 @@ static void *multifd_send_thread(void *opaque)
             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
-            if (use_zero_copy_send) {
-                /* Send header first, without zerocopy */
-                ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                            p->packet_len, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            } else {
-                /* Send header using the same writev call */
-                p->iov[0].iov_len = p->packet_len;
-                p->iov[0].iov_base = p->packet;
-            }
+            p->iov[0].iov_len = p->packet_len;
+            p->iov[0].iov_base = header;
 
             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                               0, p->write_flags, &local_err);
@@ -726,6 +717,14 @@ static void *multifd_send_thread(void *opaque)
                 break;
             }
 
+            if (use_zero_copy_send) {
+                p->packet_idx = (p->packet_idx + 1) % HEADER_ARR_SZ;
+
+                if (!p->packet_idx && (multifd_zero_copy_flush(p->c) < 0)) {
+                    break;
+                }
+                header = (void *)p->packet + p->packet_idx * p->packet_len;
+            }
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
@@ -930,6 +929,7 @@ int multifd_save_setup(Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
+        int j;
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
@@ -940,9 +940,13 @@ int multifd_save_setup(Error **errp)
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+        p->packet = g_malloc0_n(HEADER_ARR_SZ, p->packet_len);
+        for (j = 0; j < HEADER_ARR_SZ ; j++) {
+            MultiFDPacket_t *packet = (void *)p->packet + j * p->packet_len;
+            packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+            packet->version = cpu_to_be32(MULTIFD_VERSION);
+        }
+        p->packet_idx = 0;
         p->name = g_strdup_printf("multifdsend_%d", i);
         /* We need one extra place for the packet header */
         p->iov = g_new0(struct iovec, page_count + 1);
-- 
2.38.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]