[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [incomplete 1/1] Rsocket migration support [incomplete]
From: |
Dr. David Alan Gilbert (git) |
Subject: |
[Qemu-devel] [incomplete 1/1] Rsocket migration support [incomplete] |
Date: |
Mon, 19 Jan 2015 11:01:12 +0000 |
From: "Dr. David Alan Gilbert" <address@hidden>
Implement migration over RDMA using the 'rsocket' library, the code
appears to work for guests with < 4GB RAM, but is hitting what
appears to be internal library limitations above that.
(riowrite always returns EAGAIN as soon as I register more RAM).
Note also that the library doesn't provide zero copy on the send
side.
The code has a few other hacks and incompletenesses, but I thought
I'd release the source anyway for anyone else interested in
investigating.
Very lightly tested, but it did manage to migrate a 'stressapptest'
run on a 3.5GB guest successfully over 10Gb ROCE.
Note it needs the 'qemu_ram_foreach_block: pass up error value, and down
the ramblock name' patch from my postcopy world.
Signed-off-by: Dr. David Alan Gilbert <address@hidden>
---
arch_init.c | 3 +-
include/migration/migration.h | 4 +
include/qemu/iov.h | 18 +-
include/qemu/sockets.h | 4 +
migration/Makefile.objs | 2 +-
migration/migration.c | 4 +
migration/rsocket.c | 964 ++++++++++++++++++++++++++++++++++++++++++
qemu-coroutine-io.c | 3 +-
trace-events | 35 ++
util/iov.c | 14 +-
util/qemu-sockets.c | 8 +-
11 files changed, 1042 insertions(+), 17 deletions(-)
create mode 100644 migration/rsocket.c
diff --git a/arch_init.c b/arch_init.c
index 7680d28..5aaa51b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -847,10 +847,10 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
qemu_put_be64(f, block->length);
}
- qemu_mutex_unlock_ramlist();
ram_control_before_iterate(f, RAM_CONTROL_SETUP);
ram_control_after_iterate(f, RAM_CONTROL_SETUP);
+ qemu_mutex_unlock_ramlist();
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
@@ -1103,6 +1103,7 @@ static int ram_load(QEMUFile *f, void *opaque, int
version_id)
total_ram_bytes -= length;
}
+ ram_control_before_iterate(f, RAM_CONTROL_SETUP);
break;
case RAM_SAVE_FLAG_COMPRESS:
host = host_from_stream_offset(f, addr, flags);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..a54dd99 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -94,6 +94,10 @@ void rdma_start_outgoing_migration(void *opaque, const char
*host_port, Error **
void rdma_start_incoming_migration(const char *host_port, Error **errp);
+void rsocket_start_outgoing_migration(MigrationState *s, const char
*host_port, Error **errp);
+
+void rsocket_start_incoming_migration(const char *host_port, Error **errp);
+
void migrate_fd_error(MigrationState *s);
void migrate_fd_connect(MigrationState *s);
diff --git a/include/qemu/iov.h b/include/qemu/iov.h
index 68d25f2..7bd9935 100644
--- a/include/qemu/iov.h
+++ b/include/qemu/iov.h
@@ -58,6 +58,17 @@ size_t iov_memset(const struct iovec *iov, const unsigned
int iov_cnt,
size_t offset, int fillc, size_t bytes);
/*
+ * Helper function for iov_send_recv for standard FDs
+ */
+ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt,
+ bool do_send);
+
+/*
+ * Type of iov_send_recv_fd and similar helper functions.
+ */
+typedef ssize_t (*iov_send_recv_func)(int, struct iovec *, unsigned, bool);
+
+/*
* Send/recv data from/to iovec buffers directly
*
* `offset' bytes in the beginning of iovec buffer are skipped and
@@ -76,11 +87,12 @@ size_t iov_memset(const struct iovec *iov, const unsigned
int iov_cnt,
* should be within the iovec, not only beginning of it.
*/
ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
- size_t offset, size_t bytes, bool do_send);
+ size_t offset, size_t bytes, bool do_send,
+ iov_send_recv_func helper);
#define iov_recv(sockfd, iov, iov_cnt, offset, bytes) \
- iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false)
+ iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false, iov_send_recv_fd)
#define iov_send(sockfd, iov, iov_cnt, offset, bytes) \
- iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true)
+ iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true, iov_send_recv_fd)
/**
* Produce a text hexdump of iovec `iov' with `iov_cnt' number of elements
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index f47dae6..8611d8f 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -60,8 +60,11 @@ int inet_nonblocking_connect(const char *str,
NonBlockingConnectHandler *callback,
void *opaque, Error **errp);
+void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr);
int inet_dgram_opts(QemuOpts *opts, Error **errp);
NetworkAddressFamily inet_netfamily(int family);
+int inet_getport(struct addrinfo *e);
+void inet_setport(struct addrinfo *e, int port);
int unix_listen_opts(QemuOpts *opts, Error **errp);
int unix_listen(const char *path, char *ostr, int olen, Error **errp);
@@ -71,6 +74,7 @@ int unix_connect(const char *path, Error **errp);
int unix_nonblocking_connect(const char *str,
NonBlockingConnectHandler *callback,
void *opaque, Error **errp);
+struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp);
SocketAddress *socket_parse(const char *str, Error **errp);
int socket_connect(SocketAddress *addr, Error **errp,
diff --git a/migration/Makefile.objs b/migration/Makefile.objs
index d929e96..02fe66f 100644
--- a/migration/Makefile.objs
+++ b/migration/Makefile.objs
@@ -3,7 +3,7 @@ common-obj-y += vmstate.o
common-obj-y += qemu-file.o qemu-file-buf.o qemu-file-unix.o qemu-file-stdio.o
common-obj-y += xbzrle.o
-common-obj-$(CONFIG_RDMA) += rdma.o
+common-obj-$(CONFIG_RDMA) += rdma.o rsocket.o
common-obj-$(CONFIG_POSIX) += exec.o unix.o fd.o
common-obj-y += block.o
diff --git a/migration/migration.c b/migration/migration.c
index c49a05a..d264400 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -74,6 +74,8 @@ void qemu_start_incoming_migration(const char *uri, Error
**errp)
#ifdef CONFIG_RDMA
else if (strstart(uri, "rdma:", &p))
rdma_start_incoming_migration(p, errp);
+ else if (strstart(uri, "rsocket:", &p))
+ rsocket_start_incoming_migration(p, errp);
#endif
#if !defined(WIN32)
else if (strstart(uri, "exec:", &p))
@@ -442,6 +444,8 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
#ifdef CONFIG_RDMA
} else if (strstart(uri, "rdma:", &p)) {
rdma_start_outgoing_migration(s, p, &local_err);
+ } else if (strstart(uri, "rsocket:", &p)) {
+ rsocket_start_outgoing_migration(s, p, &local_err);
#endif
#if !defined(WIN32)
} else if (strstart(uri, "exec:", &p)) {
diff --git a/migration/rsocket.c b/migration/rsocket.c
new file mode 100644
index 0000000..59fca29
--- /dev/null
+++ b/migration/rsocket.c
@@ -0,0 +1,964 @@
+/*
+ * QEMU live migration
+ *
+ * Copyright Copyright 2015 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ * David Gilbert <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ * (Based on migration/tcp.c)
+ */
+
+#include <string.h>
+
+#include "qemu-common.h"
+#include "qemu/error-report.h"
+#include "qemu/iov.h"
+#include "qemu/sockets.h"
+#include "qemu/thread.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "block/block.h"
+#include "qemu/main-loop.h"
+#include "trace.h"
+
+#include "rdma/rsocket.h"
+
+
+typedef struct Rsocket_handler_thread_data {
+ int rfd;
+ short events;
+ IOHandler *callback;
+ void *callback_data;
+
+ QemuThread thread;
+ int pipe_fds[2];
+} Rsocket_handler_thread_data;
+
+/*
+ * Called via qemu_set_fd_handler in main thread, after
+ * handler_thread has kicked it on receipt of the event.
+ */
+static void rsocket_handler_fdcallback(void *opaque)
+{
+ Rsocket_handler_thread_data *rhtd = opaque;
+ IOHandler *callback = rhtd->callback;
+ IOHandler *callback_data = rhtd->callback_data;
+
+ trace_rsocket_handler_fdcallback();
+ /* First do some cleanup */
+ qemu_thread_join(&rhtd->thread);
+ qemu_set_fd_handler(rhtd->pipe_fds[0], NULL, NULL, NULL);
+ close(rhtd->pipe_fds[0]);
+ close(rhtd->pipe_fds[1]);
+ g_free(rhtd);
+
+ callback(callback_data);
+}
+
+/* Created by rsocket_set_handler */
+static void *handler_thread(void *opaque)
+{
+ Rsocket_handler_thread_data *rhtd = opaque;
+
+ struct pollfd pollfd;
+ pollfd.fd = rhtd->rfd;
+ pollfd.events = rhtd->events;
+ pollfd.revents = 0;
+
+ trace_rsocket_handler_thread_top();
+ rpoll(&pollfd, 1, -1 /* Forever - hmm */);
+
+ trace_rsocket_handler_thread_after_poll();
+ /*
+ * Kick the real handler
+ * rsocket_handler_fdcallback will now be called and cleanup
+ */
+ return (void *)write(rhtd->pipe_fds[1], "K", 1);
+}
+
+/*
+ * The 'rfd' isn't a real fd, and so we can't give it to qemu_set_fd_handler,
+ * so spawn a dummy thread that waits in an rpoll, kicks a real fd which
+ * then ends up calling callback.
+ */
+static int rsocket_set_handler(int rfd, short events, const char *name,
+ IOHandler *callback, void *callback_data)
+{
+ Rsocket_handler_thread_data *rhtd = g_malloc0(sizeof(*rhtd));
+
+ trace_rsocket_set_handler();
+ rhtd->rfd = rfd;
+ rhtd->callback = callback;
+ rhtd->callback_data = callback_data;
+ rhtd->events = events;
+ if (qemu_pipe(rhtd->pipe_fds)) {
+ return -1;
+ }
+ qemu_set_fd_handler(rhtd->pipe_fds[0], rsocket_handler_fdcallback, NULL,
+ rhtd);
+ qemu_thread_create(&rhtd->thread, name, handler_thread, rhtd,
+ QEMU_THREAD_JOINABLE);
+
+ return 0;
+}
+
+/* - - - Replacements for util/qemu_socket.c functions - - - - - - - - - - */
+
+/* Struct to store connect state for non blocking connect */
+typedef struct RConnectState {
+ int fd;
+ struct addrinfo *addr_list;
+ struct addrinfo *current_addr;
+ NonBlockingConnectHandler *callback;
+ void *opaque;
+} RConnectState;
+
+static void rsocket_set_nonblock(int rfd)
+{
+ long f; /* rsocket uses va_arg(..,long) to read this! */
+ trace_rsocket_set_nonblock(rfd);
+ /* Take care, rsocket's rfcntl is very basic */
+ f = rfcntl(rfd, F_GETFL);
+ rfcntl(rfd, F_SETFL, f | O_NONBLOCK);
+}
+
+/*
+ * rsocket_inet_connect_addr/rsocket_wait_for_connect/rsocket_inet_connect_opts
+ * deal with multiple addresses and connections that take a while to connect
+ */
+static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress,
+ RConnectState *connect_state, Error
**errp);
+static void rsocket_wait_for_connect(void *opaque)
+{
+ RConnectState *s = opaque;
+ int val = 0, rc = 0;
+ socklen_t valsize = sizeof(val);
+ bool in_progress;
+ Error *err = NULL;
+
+ trace_rsocket_wait_for_connect();
+ do {
+ rc = rgetsockopt(s->fd, SOL_SOCKET, SO_ERROR, &val, &valsize);
+ } while (rc == -1 && errno == EINTR);
+
+ /* update rc to contain error */
+ if (!rc && val) {
+ rc = -1;
+ errno = val;
+ }
+
+ /* connect error */
+ if (rc < 0) {
+ error_setg_errno(&err, errno, "Error connecting to rsocket");
+ rclose(s->fd);
+ s->fd = rc;
+ }
+
+ /* try to connect to the next address on the list */
+ if (s->current_addr) {
+ while (s->current_addr->ai_next != NULL && s->fd < 0) {
+ s->current_addr = s->current_addr->ai_next;
+ s->fd = rsocket_inet_connect_addr(s->current_addr, &in_progress, s,
+ NULL);
+ if (s->fd < 0) {
+ error_free(err);
+ err = NULL;
+ error_setg_errno(&err, errno,
+ "Unable to start rsocket connect");
+ }
+ /* connect in progress */
+ if (in_progress) {
+ goto out;
+ }
+ }
+
+ freeaddrinfo(s->addr_list);
+ }
+
+ if (s->callback) {
+ s->callback(s->fd, err, s->opaque);
+ }
+ g_free(s);
+out:
+ error_free(err);
+}
+
+static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress,
+ RConnectState *connect_state, Error
**errp)
+{
+ int sock, rc, tmp;
+
+ trace_rsocket_inet_connect_addr();
+ *in_progress = false;
+
+ sock = rsocket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (sock < 0) {
+ error_setg_errno(errp, errno, "Failed to create socket");
+ return -1;
+ }
+ rsetsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (const char *)&tmp, sizeof(tmp));
+ /* connect to peer */
+ do {
+ rc = 0;
+ if (rconnect(sock, addr->ai_addr, addr->ai_addrlen) < 0) {
+ rc = -errno;
+ }
+ } while (rc == -EINTR);
+
+ if (connect_state != NULL && rc == -EINPROGRESS) {
+ connect_state->fd = sock;
+ rsocket_set_handler(sock, POLLOUT | POLLERR , "rsocketconnect",
+ rsocket_wait_for_connect, connect_state);
+ *in_progress = true;
+ } else if (rc < 0) {
+ error_setg_errno(errp, errno, "Failed to connect socket");
+ rclose(sock);
+ return -1;
+ }
+ return sock;
+}
+
+static int rsocket_inet_connect_opts(QemuOpts *opts, Error **errp,
+ NonBlockingConnectHandler *callback, void *opaque)
+{
+ Error *local_err = NULL;
+ struct addrinfo *res, *e;
+ int sock = -1;
+ bool in_progress;
+ RConnectState *connect_state = NULL;
+
+ trace_rsocket_inet_connect_opts();
+ res = inet_parse_connect_opts(opts, errp);
+ if (!res) {
+ return -1;
+ }
+
+ if (callback != NULL) {
+ connect_state = g_malloc0(sizeof(*connect_state));
+ connect_state->addr_list = res;
+ connect_state->callback = callback;
+ connect_state->opaque = opaque;
+ }
+
+ for (e = res; e != NULL; e = e->ai_next) {
+ error_free(local_err);
+ local_err = NULL;
+ if (connect_state != NULL) {
+ connect_state->current_addr = e;
+ }
+ sock = rsocket_inet_connect_addr(e, &in_progress, connect_state,
+ &local_err);
+ if (sock >= 0) {
+ break;
+ }
+ }
+
+ if (sock < 0) {
+ error_propagate(errp, local_err);
+ } else if (in_progress) {
+ /* wait_for_connect() will do the rest */
+ return sock;
+ } else {
+ if (callback) {
+ callback(sock, NULL, opaque);
+ }
+ }
+ g_free(connect_state);
+ freeaddrinfo(res);
+ return sock;
+}
+
+static int rsocket_inet_listen_opts(QemuOpts *opts, int port_offset,
+ Error **errp)
+{
+ struct addrinfo ai,*res,*e;
+ const char *addr;
+ char port[33];
+ char uaddr[INET6_ADDRSTRLEN+1];
+ char uport[33];
+ int slisten;
+ int rc, to, port_min, port_max, p;
+
+ trace_rsocket_inet_listen_opts();
+ memset(&ai,0, sizeof(ai));
+ ai.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ ai.ai_family = PF_UNSPEC;
+ ai.ai_socktype = SOCK_STREAM;
+
+ if ((qemu_opt_get(opts, "host") == NULL) ||
+ (qemu_opt_get(opts, "port") == NULL)) {
+ error_setg(errp, "host and/or port not specified");
+ return -1;
+ }
+ pstrcpy(port, sizeof(port), qemu_opt_get(opts, "port"));
+ addr = qemu_opt_get(opts, "host");
+
+ to = qemu_opt_get_number(opts, "to", 0);
+ if (qemu_opt_get_bool(opts, "ipv4", 0))
+ ai.ai_family = PF_INET;
+ if (qemu_opt_get_bool(opts, "ipv6", 0))
+ ai.ai_family = PF_INET6;
+
+ /* lookup */
+ if (port_offset) {
+ unsigned long long baseport;
+ if (parse_uint_full(port, &baseport, 10) < 0) {
+ error_setg(errp, "can't convert to a number: %s", port);
+ return -1;
+ }
+ if (baseport > 65535 ||
+ baseport + port_offset > 65535) {
+ error_setg(errp, "port %s out of range", port);
+ return -1;
+ }
+ snprintf(port, sizeof(port), "%d", (int)baseport + port_offset);
+ }
+ rc = getaddrinfo(strlen(addr) ? addr : NULL, port, &ai, &res);
+ if (rc != 0) {
+ error_setg(errp, "address resolution failed for %s:%s: %s", addr, port,
+ gai_strerror(rc));
+ return -1;
+ }
+
+ /* create socket + bind */
+ for (e = res; e != NULL; e = e->ai_next) {
+ int tmp;
+ getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen,
+ uaddr,INET6_ADDRSTRLEN,uport,32,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ slisten = rsocket(e->ai_family, e->ai_socktype, e->ai_protocol);
+ if (slisten < 0) {
+ if (!e->ai_next) {
+ error_setg_errno(errp, errno, "Failed to create socket");
+ }
+ continue;
+ }
+
+ tmp = 1;
+ rsetsockopt(slisten, SOL_SOCKET, SO_REUSEADDR,
+ (const char *)&tmp, sizeof(tmp));
+#ifdef IPV6_V6ONLY
+ if (e->ai_family == PF_INET6) {
+ /* listen on both ipv4 and ipv6 */
+ const int off = 0;
+
+ rsetsockopt(slisten, IPPROTO_IPV6, IPV6_V6ONLY, &off,
+ sizeof(off));
+ }
+#endif
+
+ port_min = inet_getport(e);
+ port_max = to ? to + port_offset : port_min;
+ for (p = port_min; p <= port_max; p++) {
+ inet_setport(e, p);
+ if (rbind(slisten, e->ai_addr, e->ai_addrlen) == 0) {
+ goto listen;
+ }
+ if (p == port_max) {
+ if (!e->ai_next) {
+ error_setg_errno(errp, errno, "Failed to bind socket");
+ }
+ }
+ }
+ rclose(slisten);
+ }
+ freeaddrinfo(res);
+ return -1;
+
+listen:
+ if (rlisten(slisten,1) != 0) {
+ error_setg_errno(errp, errno, "Failed to listen on socket");
+ rclose(slisten);
+ freeaddrinfo(res);
+ return -1;
+ }
+ snprintf(uport, sizeof(uport), "%d", inet_getport(e) - port_offset);
+ qemu_opt_set(opts, "host", uaddr);
+ qemu_opt_set(opts, "port", uport);
+ qemu_opt_set(opts, "ipv6", (e->ai_family == PF_INET6) ? "on" : "off");
+ qemu_opt_set(opts, "ipv4", (e->ai_family != PF_INET6) ? "on" : "off");
+ freeaddrinfo(res);
+ return slisten;
+}
+
+static int rsocket_inet_listen(const char *str,
+ int socktype, int port_offset, Error **errp)
+{
+ QemuOpts *opts;
+ int sock = -1;
+ InetSocketAddress *addr;
+
+ trace_rsocket_inet_listen();
+ addr = inet_parse(str, errp);
+ if (addr != NULL) {
+ opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort);
+ inet_addr_to_opts(opts, addr);
+ qapi_free_InetSocketAddress(addr);
+ sock = rsocket_inet_listen_opts(opts, port_offset, errp);
+ qemu_opts_del(opts);
+ }
+ return sock;
+}
+
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
+typedef struct RAMBlockMapEntry {
+ void *host_addr;
+ off_t rsocket_offset;
+ size_t len;
+} RAMBlockMapEntry;
+
+typedef struct QEMURsocket {
+ int rfd;
+ QEMUFile *file;
+ bool isSource;
+
+ /* Used by the 'yield_thread' that waits on an rsocket for data */
+ QemuThread thread;
+ QemuSemaphore sem;
+ int pipe_fds[2];
+ short poll_event;
+
+ /* A mapping from ram_addr_t (block_offset) to RAMBlockMapEntry */
+ GHashTable *RAMBlockMap;
+
+ /* Only used for get_fd */
+ int dummy_fd;
+} QEMURsocket;
+
+/* The yield thread is used once the rsocket is open to wait for data
+ * or the space to write data by using 'rpoll'; it then uses a pipe
+ * to wake a waiting coroutine.
+ * The thread waits for a semaphore and when it receives it waits on the
+ * channel.
+ */
+static void *yield_thread(void *opaque)
+{
+ QEMURsocket *rs = opaque;
+ struct pollfd pfd;
+
+ while (1) {
+ trace_rsocket_yield_thread_top();
+ /* Wait until something tries to yield */
+ qemu_sem_wait(&rs->sem);
+ /* Now wait for the rsocket */
+ pfd.fd = rs->rfd;
+ pfd.events = atomic_fetch_add(&rs->poll_event, 0);
+ if (!pfd.events) {
+ trace_rsocket_yield_thread_requested_exit();
+ /* Exit */
+ break;
+ }
+ pfd.events |= POLLERR | POLLHUP | POLLNVAL;
+ rpoll(&pfd, 1, -1 /* Hmm */);
+
+ /* Kick the waiting coroutine */
+ if (write(rs->pipe_fds[1], "K", 1) != 1) {
+ break;
+ }
+ }
+ trace_rsocket_yield_thread_exit();
+ return NULL;
+}
+
+static int start_yield_thread(QEMURsocket *rs, short poll_event)
+{
+ trace_rsocket_start_yield_thread();
+ if (qemu_pipe(rs->pipe_fds)) {
+ return -errno;
+ }
+ rs->poll_event = poll_event;
+ qemu_sem_init(&rs->sem, 0);
+
+ qemu_thread_create(&rs->thread, "rsocketyield", yield_thread, rs,
+ QEMU_THREAD_JOINABLE);
+
+ return 0;
+}
+
+static void stop_yield_thread(QEMURsocket *rs)
+{
+ trace_rsocket_stop_yield_thread();
+ /* Tell the thread to exit */
+ atomic_and(&rs->poll_event, 0);
+ qemu_sem_post(&rs->sem);
+
+ qemu_thread_join(&rs->thread);
+ close(rs->pipe_fds[0]);
+ close(rs->pipe_fds[1]);
+ qemu_sem_destroy(&rs->sem);
+}
+
+static int rsocket_yield(QEMURsocket *rs)
+{
+ char dummy;
+
+ trace_rsocket_yield();
+ /* Ask the yield_thread to wait on the rsocket */
+ qemu_sem_post(&rs->sem);
+ /* And then wait for it to kick us */
+ yield_until_fd_readable(rs->pipe_fds[0]);
+ /* Consume the dummy character on the pipe */
+ return read(rs->pipe_fds[0], &dummy, 1);
+}
+
+/* helper function for iov_send_recv as used by rsocket_writev_buffer */
+static ssize_t rsocket_send(int rsockfd, struct iovec *iov, unsigned iov_cnt,
+ bool do_send)
+{
+ ssize_t ret;
+ struct msghdr msg;
+
+ trace_rsocket_send();
+ assert(do_send);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = iov;
+ msg.msg_iovlen = iov_cnt;
+ do {
+ ret = rsendmsg(rsockfd, &msg, 0);
+ } while (ret < 0 && errno == EINTR);
+ return ret;
+}
+
+static ssize_t rsocket_writev_buffer(void *opaque, struct iovec *iov, int
iovcnt,
+ int64_t pos)
+{
+ QEMURsocket *rs = opaque;
+ ssize_t len;
+ ssize_t size = iov_size(iov, iovcnt);
+
+ trace_rsocket_writev_buffer(size);
+ len = iov_send_recv(rs->rfd, iov, iovcnt, 0 /* offset */, size, true,
rsocket_send);
+ if (len < size) {
+ len = -socket_error();
+ }
+ trace_rsocket_writev_buffer_end(size, len);
+ return len;
+}
+
+static int rsocket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int
size)
+{
+ QEMURsocket *rs = opaque;
+ ssize_t len;
+
+ trace_rsocket_get_buffer(size);
+ for (;;) {
+ len = rrecv(rs->rfd, buf, size, 0);
+ if (len != -1) {
+ break;
+ }
+ if (errno == EAGAIN) {
+ rsocket_yield(rs);
+ } else if (errno != EINTR) {
+ break;
+ }
+ }
+
+ if (len == -1) {
+ len = -socket_error();
+ }
+ trace_rsocket_get_buffer_exit(size, len);
+ return len;
+}
+
+static gboolean rsocket_close_RAMBlock_func(gpointer key, gpointer value,
+ gpointer user_data)
+{
+ QEMURsocket *rs = user_data;
+ /* key is the host address of the start of the RAMBlock */
+ RAMBlockMapEntry *rbme = value;
+
+ trace_rsocket_close_RAMBlock_func(rbme->host_addr, rs->isSource);
+
+ if (rs->isSource) {
+ /* rsocket segs if I don't unmap before close */
+ riounmap(rs->rfd, rbme->host_addr, rbme->len);
+ }
+ g_free(rbme);
+
+ return TRUE; /* Delete from hash */
+}
+
+static int rsocket_close(void *opaque)
+{
+ QEMURsocket *rs = opaque;
+ trace_rsocket_close();
+ stop_yield_thread(rs);
+ g_hash_table_foreach_remove(rs->RAMBlockMap, rsocket_close_RAMBlock_func,
+ rs);
+ g_hash_table_destroy(rs->RAMBlockMap);
+ /* rclose(rs->rfd); - HACK! I'm getting a seg in the rsocket code here;
even with the close above */
+ close(rs->dummy_fd);
+ g_free(rs);
+
+ return 0;
+}
+
+static int rsocket_get_fd(void *opaque)
+{
+ QEMURsocket *rs = opaque;
+
+ /* Hack! get_fd is used for one thing in the general migration code, and
+ * that's for marking it as non-blocking; that needs fixing since we don't
+ * have a real fd we can return.
+ */
+ return rs->dummy_fd;
+}
+
+static int rsocket_dest_ramblock_reg(const char *block_name, void *host_addr,
+ ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+ QEMURsocket *rs = opaque;
+ uint64_t rsocket_offset;
+ uint8_t block_name_len = strlen(block_name);
+ RAMBlockMapEntry *rbme = g_new0(RAMBlockMapEntry, 1);
+
+ /* Register every RAMBlock so that we can RDMA into it */
+ rsocket_offset = riomap(rs->rfd, host_addr, length, PROT_WRITE,
+ 0 /* flags? */, -1 /* Offset */);
+ if (rsocket_offset == -1) {
+ error_report("riomap for %s", block_name);
+ return -1;
+ }
+ rbme->rsocket_offset = rsocket_offset;
+ rbme->len = length;
+ rbme->host_addr = host_addr;
+ g_hash_table_insert(rs->RAMBlockMap, (gpointer)block_offset, rbme);
+
+ trace_rsocket_dest_ramblock_reg(block_name, block_offset, rsocket_offset);
+
+ /* Send the block name and the key to the other side */
+ if (rwrite(rs->rfd, &block_name_len, 1)!=1) {
+ error_report("%s: block_name_len write for %s\n", strerror(errno),
block_name);
+ return -1;
+ }
+ if (rwrite(rs->rfd, block_name, block_name_len)!=block_name_len) {
+ error_report("%s: block_name write for %s\n", strerror(errno),
block_name);
+ return -1;
+ }
+ if (rwrite(rs->rfd, &rsocket_offset, 8)!=8) {
+ error_report("%s: rsocket_offset write for %s\n", strerror(errno),
block_name);
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Register all the RAMBlocks as places we might want to RDMA into.
+ */
+static int rsocket_dest_ramblock_setup(QEMURsocket *rs)
+{
+ char zero = 0;
+
+ /*
+ * Now register each RAMBlock and get an 'offset' that the src can pass
+ * to Riowrite.
+ */
+ if (qemu_ram_foreach_block(rsocket_dest_ramblock_reg, rs)) {
+ error_report("Failed to map rsocket buffers");
+ return -1;
+ }
+
+ if (rwrite(rs->rfd, &zero, 1)!=1) {
+ error_report("%s: terminator write RAMBlock list\n", strerror(errno));
+ return -1;
+ }
+ return 0;
+}
+
+/* Effectively this is just a RAMBlock but we don't have access to it */
+typedef struct RAMBlockSourceData {
+ void *host_addr;
+ ram_addr_t block_offset;
+ ram_addr_t length;
+} RAMBlockSourceData;
+
+static int source_ramblock_name_mapfunc(const char *block_name, void
*host_addr,
+ ram_addr_t block_offset, ram_addr_t length, void *opaque)
+{
+ GHashTable *map = opaque;
+ RAMBlockSourceData *rbsd = g_new(RAMBlockSourceData, 1);
+
+ rbsd->host_addr = host_addr;
+ rbsd->block_offset = block_offset;
+ rbsd->length = length;
+ g_hash_table_insert(map,
(gpointer)(intptr_t)g_quark_from_string(block_name), rbsd);
+ return 0;
+}
+
+/*
+ * Read the list of rsocket keys from the source.
+ */
+static int rsocket_source_ramblock_setup(QEMUFile *f, QEMURsocket *rs)
+{
+ char ram_block_name[256];
+ uint8_t block_name_len;
+ uint64_t rsocket_key;
+ GHashTable *block_name_map;
+ int ret = -1;
+
+ /* Ensure the previous data sent gets to the destination, because
+ * only then will it send this response.
+ */
+ qemu_fflush(f);
+
+ /* Build a mapping from RAMBlock name to ram_addr_t offset for block */
+ block_name_map = g_hash_table_new(NULL, NULL);
+ if (qemu_ram_foreach_block(source_ramblock_name_mapfunc, block_name_map)) {
+ error_report("Failed to make source RAMBlock map");
+ goto err;
+ }
+
+ /*
+ * We're sent a list of RAMBlocks of the form:
+ * byte - length of RAMBlock name
+ * byte[] - The RAMBlock name
+ * uint64_t - The rsocket 'offset' or key for the block
+ *
+ * If the length is 0 it's the end of the list.
+ */
+ do {
+ RAMBlockMapEntry *rbme;
+ if (rread(rs->rfd, &block_name_len, 1) != 1) {
+ error_report("%s: block_name_len read", strerror(errno));
+ goto err;
+ }
+ if (block_name_len) {
+ RAMBlockSourceData* rbsd;
+ if (rread(rs->rfd, ram_block_name, block_name_len) !=
+ block_name_len) {
+ error_report("%s: block_name read", strerror(errno));
+ goto err;
+ }
+ ram_block_name[block_name_len] = 0;
+ if (rread(rs->rfd, &rsocket_key, 8) != 8) {
+ error_report("%s: rsocket_key read", strerror(errno));
+ goto err;
+ }
+ rbsd = g_hash_table_lookup(block_name_map,
+
(gpointer)(intptr_t)g_quark_from_string(ram_block_name));
+ if (!rbsd) {
+ error_report("No matching RAMBlock for %s", ram_block_name);
+ goto err;
+ }
+ rbme = g_new0(RAMBlockMapEntry, 1);
+ rbme->rsocket_offset = rsocket_key;
+ rbme->len = rbsd->length;
+ rbme->host_addr = rbsd->host_addr;
+ g_hash_table_insert(rs->RAMBlockMap, (gpointer)rbsd->block_offset,
rbme);
+ trace_rsocket_source_ramblock_setup(ram_block_name,
rbsd->block_offset, rsocket_key);
+ }
+ } while (block_name_len);
+
+ ret = 0; /* Good */
+err:
+ /* TODO: Clean up contents */
+ g_hash_table_destroy(block_name_map);
+ return ret;
+}
+
+static int rsocket_dest_before_ram_iterate(QEMUFile *f, void *opaque,
+ uint64_t flags)
+{
+ QEMURsocket *rs = opaque;;
+
+ trace_rsocket_dest_before_ram_iterate(flags);
+ switch (flags) {
+ case RAM_CONTROL_SETUP:
+ /*
+ * Called after we've loaded the list of RAMBlocks from the source and
+ * checked them
+ */
+ return rsocket_dest_ramblock_setup(rs);
+ break;
+
+ }
+
+ return 0;
+}
+
+static int rsocket_source_before_ram_iterate(QEMUFile *f, void *opaque,
+ uint64_t flags)
+{
+ QEMURsocket *rs = opaque;
+
+ trace_rsocket_source_before_ram_iterate(flags);
+ switch (flags) {
+ case RAM_CONTROL_SETUP:
+ /*
+ * Called after we've sent the list of RAMBlocks
+ */
+ return rsocket_source_ramblock_setup(f, rs);
+ break;
+
+ }
+
+ return 0;
+}
+
+static size_t rsocket_save_page(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset, ram_addr_t offset,
+ size_t size, int *bytes_sent)
+{
+ QEMURsocket *rs = opaque;
+ RAMBlockMapEntry *rbme;
+ size_t ret;
+
+ rbme = g_hash_table_lookup(rs->RAMBlockMap, (gpointer)block_offset);
+ if (!rbme) {
+ error_report("Unable to find matching RSocket key for block "
RAM_ADDR_FMT, block_offset);
+ return -1;
+ }
+ trace_rsocket_save_page(block_offset, offset, rbme->rsocket_offset);
+
+ do {
+ ret=riowrite(rs->rfd, rbme->host_addr+offset, size,
rbme->rsocket_offset+offset, 0);
+ } while (ret==0 && errno == EAGAIN);
+ if (ret != size) {
+ error_report("riowrite: %s (%zd/%zd)", strerror(errno), size, ret);
+ return -1;
+ }
+ *bytes_sent = size;
+
+ return RAM_SAVE_CONTROL_DELAYED;
+}
+
+
+static const QEMUFileOps rsocket_read_ops = {
+ .get_fd = rsocket_get_fd,
+ .get_buffer = rsocket_get_buffer,
+ .close = rsocket_close,
+ .before_ram_iterate = rsocket_dest_before_ram_iterate,
+};
+
+static const QEMUFileOps rsocket_write_ops = {
+ .writev_buffer = rsocket_writev_buffer,
+ .close = rsocket_close,
+ .before_ram_iterate = rsocket_source_before_ram_iterate,
+ .save_page = rsocket_save_page,
+};
+
+static QEMUFile *qemu_fopen_rsocket(int rfd, const char *mode)
+{
+ QEMURsocket *s;
+
+ trace_qemu_fopen_rsocket(rfd, mode);
+ if (qemu_file_mode_is_not_valid(mode)) {
+ return NULL;
+ }
+
+ s = g_malloc0(sizeof(QEMURsocket));
+ s->rfd = rfd;
+ s->dummy_fd = open("/dev/null", O_RDWR);
+ s->RAMBlockMap = g_hash_table_new(NULL, NULL);
+ if (s->dummy_fd == -1) {
+ error_report("Failed to open dummy_fd (%s)", strerror(errno));
+ goto err;
+ }
+ if (start_yield_thread(s, (mode[0] == 'w')?POLLOUT:POLLIN)) {
+ goto errfd;
+ }
+ if (mode[0] == 'w') {
+ s->file = qemu_fopen_ops(s, &rsocket_write_ops);
+ s->isSource = true;
+ } else {
+ s->file = qemu_fopen_ops(s, &rsocket_read_ops);
+ s->isSource = false;
+ rsocket_set_nonblock(rfd);
+ }
+
+ return s->file;
+
+errfd:
+ close(s->dummy_fd);
+err:
+ rclose(rfd);
+ g_free(s);
+ return NULL;
+}
+
+static void rsocket_have_connect(int rfd, Error *err, void *opaque)
+{
+ MigrationState *s = opaque;
+
+ trace_rsocket_have_connect(rfd);
+
+ if (rfd < 0) {
+ s->file = NULL;
+ migrate_fd_error(s);
+ } else {
+ s->file = qemu_fopen_rsocket(rfd, "wb");
+ migrate_fd_connect(s);
+ }
+}
+
+void rsocket_start_outgoing_migration(MigrationState *s, const char *host_port,
+ Error **errp)
+{
+ QemuOpts *opts;
+ InetSocketAddress *addr;
+
+ trace_rsocket_start_outgoing_migration();
+ addr = inet_parse(host_port, errp);
+ if (addr != NULL) {
+ opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort);
+ inet_addr_to_opts(opts, addr);
+ qapi_free_InetSocketAddress(addr);
+ rsocket_inet_connect_opts(opts, errp, rsocket_have_connect, s);
+ qemu_opts_del(opts);
+ }
+}
+
+static void rsocket_accept_incoming_migration(void *opaque)
+{
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int rfd_s = (intptr_t)opaque;
+ int rfd_c;
+ QEMUFile *f;
+ int err;
+
+ trace_rsocket_accept_incoming_migration();
+ do {
+ rfd_c = raccept(rfd_s, (struct sockaddr *)&addr, &addrlen);
+ err = errno;
+ } while (rfd_c < 0 && err == EINTR);
+ rclose(rfd_s);
+
+ if (rfd_c < 0) {
+ error_report("could not accept migration connection (%s)",
+ strerror(err));
+ return;
+ }
+
+ f = qemu_fopen_rsocket(rfd_c, "rb");
+ if (f == NULL) {
+ error_report("could not qemu_fopen rsocket");
+ goto out;
+ }
+
+ process_incoming_migration(f);
+ return;
+
+out:
+ rclose(rfd_c);
+}
+
+void rsocket_start_incoming_migration(const char *host_port, Error **errp)
+{
+ int rfd;
+
+ trace_rsocket_start_incoming_migration();
+ rfd = rsocket_inet_listen(host_port, SOCK_STREAM, 0, errp);
+ if (rfd < 0) {
+ return;
+ }
+
+ rsocket_set_handler(rfd,POLLIN | POLLERR , "rsocketlisten",
+ rsocket_accept_incoming_migration,
+ (void *)(intptr_t)rfd);
+}
diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c
index d404926..9569d17 100644
--- a/qemu-coroutine-io.c
+++ b/qemu-coroutine-io.c
@@ -37,7 +37,8 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned
iov_cnt,
int err;
while (done < bytes) {
ret = iov_send_recv(sockfd, iov, iov_cnt,
- offset + done, bytes - done, do_send);
+ offset + done, bytes - done, do_send,
+ iov_send_recv_fd);
if (ret > 0) {
done += ret;
} else if (ret < 0) {
diff --git a/trace-events b/trace-events
index b5722ea..92a6dff 100644
--- a/trace-events
+++ b/trace-events
@@ -1149,6 +1149,41 @@ vmstate_load_field_error(const char *field, int ret)
"field \"%s\" load failed,
# qemu-file.c
qemu_file_fclose(void) ""
+rsocket_handler_fdcallback(void) ""
+rsocket_handler_thread_top(void) ""
+rsocket_handler_thread_after_poll(void) ""
+rsocket_set_handler(void) ""
+rsocket_wait_for_connect(void) ""
+rsocket_inet_connect_addr(void) ""
+rsocket_inet_connect_opts(void) ""
+rsocket_inet_listen_opts(void) ""
+rsocket_inet_listen(void) ""
+rsocket_yield_thread_top(void) ""
+rsocket_yield_thread_requested_exit(void) ""
+rsocket_yield_thread_exit(void) ""
+rsocket_set_nonblock(int rfd) "%d"
+rsocket_start_yield_thread(void) ""
+rsocket_stop_yield_thread(void) ""
+rsocket_yield(void) ""
+rsocket_send(void) ""
+rsocket_writev_buffer(ssize_t size) "%zd"
+rsocket_writev_buffer_end(ssize_t size, ssize_t len) "size=%zd / return %zd"
+rsocket_get_buffer(int size) "size=%d"
+rsocket_get_buffer_exit(int size, ssize_t len) "size=%d return %zd"
+rsocket_close(void) ""
+qemu_fopen_rsocket(int rfd, const char *mode) "rfd %d mode %s"
+rsocket_have_connect(int rfd) "rfd %d"
+rsocket_start_outgoing_migration(void) ""
+rsocket_accept_incoming_migration(void) ""
+rsocket_start_incoming_migration(void) ""
+rsocket_dest_ramblock_reg(const char *block_name, uint64_t block_offset,
uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64
+rsocket_source_ramblock_setup(const char *block_name, uint64_t block_offset,
uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64
+rsocket_dest_before_ram_iterate(uint64_t flags) "%" PRIx64
+rsocket_save_page(uint64_t block_offset, uint64_t offset, uint64_t
rsocket_key) "block=%" PRIx64 " offset=%" PRIx64 " rsocket_key=%" PRIx64
+rsocket_source_before_ram_iterate(uint64_t flags) "%" PRIx64
+rsocket_close_RAMBlock_func(void *host_addr, bool isSource) "%p %u"
+rsocket_save_page_riowrite_eagain(uint64_t offset) "%" PRIx64
+
# arch_init.c
migration_bitmap_sync_start(void) ""
migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64""
diff --git a/util/iov.c b/util/iov.c
index 2fb18e6..6dbf62e 100644
--- a/util/iov.c
+++ b/util/iov.c
@@ -88,9 +88,9 @@ size_t iov_size(const struct iovec *iov, const unsigned int
iov_cnt)
return len;
}
-/* helper function for iov_send_recv() */
-static ssize_t
-do_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, bool do_send)
+/* helper function for iov_send_recv() for normal FDs */
+ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt,
+ bool do_send)
{
#ifdef CONFIG_POSIX
ssize_t ret;
@@ -134,8 +134,8 @@ do_send_recv(int sockfd, struct iovec *iov, unsigned
iov_cnt, bool do_send)
}
ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
- size_t offset, size_t bytes,
- bool do_send)
+ size_t offset, size_t bytes, bool do_send,
+ iov_send_recv_func helper)
{
ssize_t total = 0;
ssize_t ret;
@@ -174,11 +174,11 @@ ssize_t iov_send_recv(int sockfd, struct iovec *iov,
unsigned iov_cnt,
assert(iov[niov].iov_len > tail);
orig_len = iov[niov].iov_len;
iov[niov++].iov_len = tail;
- ret = do_send_recv(sockfd, iov, niov, do_send);
+ ret = helper(sockfd, iov, niov, do_send);
/* Undo the changes above before checking for errors */
iov[niov-1].iov_len = orig_len;
} else {
- ret = do_send_recv(sockfd, iov, niov, do_send);
+ ret = helper(sockfd, iov, niov, do_send);
}
if (offset) {
iov[0].iov_base -= offset;
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index a76bb3c..2d0fa60 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -58,7 +58,7 @@ QemuOptsList socket_optslist = {
},
};
-static int inet_getport(struct addrinfo *e)
+int inet_getport(struct addrinfo *e)
{
struct sockaddr_in *i4;
struct sockaddr_in6 *i6;
@@ -75,7 +75,7 @@ static int inet_getport(struct addrinfo *e)
}
}
-static void inet_setport(struct addrinfo *e, int port)
+void inet_setport(struct addrinfo *e, int port)
{
struct sockaddr_in *i4;
struct sockaddr_in6 *i6;
@@ -319,7 +319,7 @@ static int inet_connect_addr(struct addrinfo *addr, bool
*in_progress,
return sock;
}
-static struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp)
+struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp)
{
struct addrinfo ai, *res;
int rc;
@@ -574,7 +574,7 @@ fail:
return NULL;
}
-static void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr)
+void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr)
{
bool ipv4 = addr->ipv4 || !addr->has_ipv4;
bool ipv6 = addr->ipv6 || !addr->has_ipv6;
--
2.1.0
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Qemu-devel] [incomplete 1/1] Rsocket migration support [incomplete],
Dr. David Alan Gilbert (git) <=