[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for m
From: |
Juan Quintela |
Subject: |
[Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side |
Date: |
Mon, 23 Jan 2017 22:32:18 +0100 |
We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.
Signed-off-by: Juan Quintela <address@hidden>
---
migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 67 insertions(+), 10 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index ca94704..4e530ea 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void)
}
}
-static int multifd_send_page(uint8_t *address)
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
{
int i, j, thread_count;
bool found = false;
@@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address)
pages.address[pages.num] = address;
pages.num++;
- if (pages.num < (pages.size - 1)) {
- return UINT16_MAX;
+ if (!last_page) {
+ if (pages.num < (pages.size - 1)) {
+ return UINT16_MAX;
+ }
}
thread_count = migrate_multifd_threads();
@@ -570,17 +572,25 @@ static int multifd_send_page(uint8_t *address)
}
struct MultiFDRecvParams {
+ /* not changed */
QemuThread thread;
QIOChannel *c;
QemuCond cond;
QemuMutex mutex;
+ /* proteced by param mutex */
bool quit;
bool started;
+ multifd_pages_t pages;
+ /* proteced by multifd mutex */
+ bool done;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
static MultiFDRecvParams *multifd_recv;
+QemuMutex multifd_recv_mutex;
+QemuCond multifd_recv_cond;
+
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *params = opaque;
@@ -594,7 +604,17 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_lock(¶ms->mutex);
while (!params->quit){
- qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ if (params->pages.num) {
+ params->pages.num = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_mutex_lock(&multifd_recv_mutex);
+ params->done = true;
+ qemu_cond_signal(&multifd_recv_cond);
+ qemu_mutex_unlock(&multifd_recv_mutex);
+ qemu_mutex_lock(¶ms->mutex);
+ } else {
+ qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ }
}
qemu_mutex_unlock(¶ms->mutex);
@@ -647,8 +667,9 @@ void migrate_multifd_recv_threads_create(void)
qemu_cond_init(&multifd_recv[i].cond);
multifd_recv[i].quit = false;
multifd_recv[i].started = false;
+ multifd_recv[i].done = true;
+ multifd_init_group(&multifd_recv[i].pages);
multifd_recv[i].c = socket_recv_channel_create();
-
if(!multifd_recv[i].c) {
error_report("Error creating a recv channel");
exit(0);
@@ -664,6 +685,45 @@ void migrate_multifd_recv_threads_create(void)
}
}
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+ int i, thread_count;
+ MultiFDRecvParams *params;
+ static multifd_pages_t pages;
+ static bool once = false;
+
+ if (!once) {
+ multifd_init_group(&pages);
+ once = true;
+ }
+
+ pages.address[pages.num] = address;
+ pages.num++;
+
+ if (fd_num == UINT16_MAX) {
+ return;
+ }
+
+ thread_count = migrate_multifd_threads();
+ assert(fd_num < thread_count);
+ params = &multifd_recv[fd_num];
+
+ qemu_mutex_lock(&multifd_recv_mutex);
+ while (!params->done) {
+ qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+ }
+ params->done = false;
+ qemu_mutex_unlock(&multifd_recv_mutex);
+ qemu_mutex_lock(¶ms->mutex);
+ for(i = 0; i < pages.num; i++) {
+ params->pages.address[i] = pages.address[i];
+ }
+ params->pages.num = pages.num;
+ pages.num = 0;
+ qemu_cond_signal(¶ms->cond);
+ qemu_mutex_unlock(¶ms->mutex);
+}
+
/**
* save_page_header: Write page header to wire
*
@@ -1097,7 +1157,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus
*pss,
if (pages == -1) {
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
- fd_num = multifd_send_page(p);
+ fd_num = multifd_send_page(p, migration_dirty_pages == 1);
qemu_put_be16(f, fd_num);
*bytes_transferred += 2; /* size of fd_num */
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
@@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int
version_id)
case RAM_SAVE_FLAG_MULTIFD_PAGE:
fd_num = qemu_get_be16(f);
- if (fd_num != 0) {
- /* this is yet an unused variable, changed later */
- fd_num = fd_num;
- }
+ multifd_recv_page(host, fd_num);
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
--
2.9.3
- [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter, (continued)
- [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 08/17] migration: create multifd migration threads, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side,
Juan Quintela <=
- [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels, Juan Quintela, 2017/01/23
- [Qemu-devel] [PATCH 17/17] migration: flush receive queue, Juan Quintela, 2017/01/23
- Re: [Qemu-devel] [PATCH 00/17] multifd v3, no-reply, 2017/01/23