[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC][PATCH v5 03/21] virtagent: common code for managing c
From: |
Michael Roth |
Subject: |
[Qemu-devel] [RFC][PATCH v5 03/21] virtagent: common code for managing client/server rpc jobs |
Date: |
Fri, 3 Dec 2010 12:03:04 -0600 |
This implements a simple state machine to manage client/server rpc
jobs being multiplexed over a single channel.
A client job consists of sending an rpc request, reading an
rpc response, then making the appropriate callbacks. We allow one
client job to be processed at a time, which will make the following
state transitions:
VA_CLIENT_IDLE -> VA_CLIENT_SEND (job queued, send channel open)
VA_CLIENT_SEND -> VA_CLIENT_WAIT (request sent, awaiting response)
VA_CLIENT_WAIT -> VA_CLIENT_IDLE (response recieved, callbacks made)
A server job consists of recieving an rpc request, generating a
response, then sending the response. We expect to receive one server
request at a time due to the 1 at a time restriction for client jobs.
Server jobs make the following transitions:
VA_SERVER_IDLE -> VA_SERVER_WAIT (recieved/executed request, send
channel busy, response deferred)
VA_SERVER_IDLE -> VA_SERVER_SEND (recieved/executed request, send
channel open, sending response)
VA_SERVER_WAIT -> VA_SERVER_SEND (send channel now open, sending
response)
VA_SERVER_SEND -> VA_SERVER_IDLE (response sent)
Signed-off-by: Michael Roth <address@hidden>
---
virtagent-common.c | 427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
virtagent-common.h | 66 ++++++++
2 files changed, 493 insertions(+), 0 deletions(-)
create mode 100644 virtagent-common.c
create mode 100644 virtagent-common.h
diff --git a/virtagent-common.c b/virtagent-common.c
new file mode 100644
index 0000000..45f9d9f
--- /dev/null
+++ b/virtagent-common.c
@@ -0,0 +1,427 @@
+/*
+ * virtagent - common host/guest RPC functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ * Adam Litke <address@hidden>
+ * Michael Roth <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+typedef struct VAClientJob {
+ xmlrpc_mem_block *req_data;
+ char *resp_data;
+ size_t resp_data_len;
+ VAClientCallback *cb;
+ QTAILQ_ENTRY(VAClientJob) next;
+ /* for use by QMP functions */
+ MonitorCompletion *mon_cb;
+ void *mon_data;
+} VAClientJob;
+
+typedef struct VAServerJob {
+ xmlrpc_mem_block *resp_data;
+ char *req_data;
+ size_t req_data_len;
+ void *opaque;
+ QTAILQ_ENTRY(VAServerJob) next;
+} VAServerJob;
+
+enum va_http_status {
+ VA_HTTP_STATUS_NEW,
+ VA_HTTP_STATUS_OK,
+ VA_HTTP_STATUS_ERROR,
+};
+
+typedef void (VAHTSendCallback)(enum va_http_status http_status,
+ const char *content, size_t content_len);
+typedef void (VAHTReadCallback)(enum va_http_status http_status,
+ const char *content, size_t content_len,
+ bool is_request);
+typedef struct VAHTState {
+ enum {
+ VA_SEND_START,
+ VA_SEND_HDR,
+ VA_SEND_BODY,
+ VA_READ_START,
+ VA_READ_HDR,
+ VA_READ_BODY,
+ } state;
+ char hdr[1024];
+ size_t hdr_len;
+ size_t hdr_pos;
+ char *content;
+ size_t content_len;
+ size_t content_pos;
+ VAHTSendCallback *send_cb;
+ VAHTReadCallback *read_cb;
+ bool is_request;
+} VAHTState;
+
+typedef struct VAState {
+ int fd;
+ enum va_client_state {
+ VA_CLIENT_IDLE = 0,
+ VA_CLIENT_SEND, /* sending rpc request */
+ VA_CLIENT_WAIT, /* waiting for rpc response */
+ } client_state;
+ enum va_server_state {
+ VA_SERVER_IDLE = 0,
+ VA_SERVER_WAIT, /* waiting to send rpc response */
+ VA_SERVER_SEND, /* sending rpc response */
+ } server_state;
+ VAClientData client_data;
+ VAServerData server_data;
+ int client_job_count;
+ QTAILQ_HEAD(, VAClientJob) client_jobs;
+ int server_job_count;
+ QTAILQ_HEAD(, VAServerJob) server_jobs;
+ /* for use by async send/read handlers for fd */
+ VAHTState send_state;
+ VAHTState read_state;
+} VAState;
+
+static VAState *va_state;
+
+static VAClientJob *va_current_client_job(void)
+{
+ TRACE("called");
+ return QTAILQ_FIRST(&va_state->client_jobs);
+}
+
+/***********************************************************/
+/* functions for starting/managing client/server rpc jobs */
+
+static int va_send_server_response(VAServerJob *server_job)
+{
+ VAHTState http_state;
+ TRACE("called");
+ http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, server_job->resp_data);
+ TRACE("server response: %s", http_state.content);
+ http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+ server_job->resp_data);
+ http_state.content_pos = 0;
+ http_state.hdr_pos = 0;
+ http_state.state = VA_SEND_START;
+ http_state.send_cb = va_server_send_cb;
+ va_http_hdr_init(&http_state, VA_HTTP_RESPONSE);
+ va_state->send_state = http_state;
+ qemu_set_fd_handler(va_state->fd, va_http_read_handler,
+ va_http_send_handler, NULL);
+ return 0;
+}
+
+static int va_send_client_request(VAClientJob *client_job)
+{
+ VAHTState http_state;
+ TRACE("called");
+ http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, client_job->req_data);
+ TRACE("client request: %s", http_state.content);
+ http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+ client_job->req_data);
+ http_state.content_pos = 0;
+ http_state.hdr_pos = 0;
+ http_state.state = VA_SEND_START;
+ http_state.send_cb = va_client_send_cb;
+ va_http_hdr_init(&http_state, VA_HTTP_REQUEST);
+ va_state->send_state = http_state;
+ qemu_set_fd_handler(va_state->fd, va_http_send_handler,
+ va_http_send_handler, NULL);
+ return 0;
+}
+
+/* do some sanity checks before setting client state */
+static bool va_set_client_state(enum va_client_state client_state)
+{
+ TRACE("setting client state to %d", client_state);
+ switch (client_state) {
+ case VA_CLIENT_IDLE:
+ assert(va_state->client_state == VA_CLIENT_IDLE ||
+ va_state->client_state == VA_CLIENT_WAIT);
+ break;
+ case VA_CLIENT_SEND:
+ assert(va_state->client_state == VA_CLIENT_IDLE);
+ break;
+ case VA_CLIENT_WAIT:
+ assert(va_state->client_state == VA_CLIENT_SEND);
+ break;
+ default:
+ LOG("invalid client state");
+ return false;
+ }
+ va_state->client_state = client_state;
+ return true;
+}
+
+/* do some sanity checks before setting server state */
+static bool va_set_server_state(enum va_server_state server_state)
+{
+ TRACE("setting server state to %d", server_state);
+ switch (server_state) {
+ case VA_SERVER_IDLE:
+ assert(va_state->server_state == VA_SERVER_IDLE ||
+ va_state->server_state == VA_SERVER_SEND);
+ break;
+ case VA_SERVER_WAIT:
+ assert(va_state->server_state == VA_SERVER_IDLE);
+ break;
+ case VA_SERVER_SEND:
+ assert(va_state->server_state == VA_SERVER_IDLE ||
+ va_state->server_state == VA_SERVER_WAIT);
+ break;
+ default:
+ LOG("invalid server state");
+ return false;
+ }
+ va_state->server_state = server_state;
+ return true;
+}
+
+/* xmit the next client/server job. for the client this entails sending
+ * a request to the remote server. for the server this entails sending a
+ * response to the remote client
+ *
+ * currently we only do one client job or one server job at a time. for
+ * situations where we start a client job but recieve a server job (remote
+ * rpc request) we go ahead and handle the server job before returning to
+ * handling the client job. TODO: there is potential for pipelining
+ * requests/responses for more efficient use of the channel.
+ *
+ * in all cases, we can only kick off client requests or server responses
+ * when the send side of the channel is not being used
+ */
+static int va_kick(void)
+{
+ VAServerJob *server_job;
+ VAClientJob *client_job;
+ int ret;
+
+ TRACE("called");
+
+ /* handle server jobs first */
+ if (QTAILQ_EMPTY(&va_state->server_jobs)) {
+ assert(va_set_server_state(VA_SERVER_IDLE));
+ } else {
+ TRACE("handling server job queue");
+ if (va_state->client_state == VA_CLIENT_SEND) {
+ TRACE("send channel busy, deferring till available");
+ assert(va_set_server_state(VA_SERVER_WAIT));
+ goto out;
+ }
+ TRACE("send server response");
+ server_job = QTAILQ_FIRST(&va_state->server_jobs);
+
+ /* set up the send handler for the response */
+ ret = va_send_server_response(server_job);
+ if (ret != 0) {
+ LOG("error setting up send handler for server response");
+ goto out_bad;
+ }
+ assert(va_set_server_state(VA_SERVER_SEND));
+ goto out;
+ }
+
+ /* handle client jobs if nothing to do for server */
+ if (QTAILQ_EMPTY(&va_state->client_jobs)) {
+ assert(va_set_client_state(VA_CLIENT_IDLE));
+ } else {
+ TRACE("handling client job queue");
+ if (va_state->client_state != VA_CLIENT_IDLE) {
+ TRACE("client job in progress, returning");
+ goto out;
+ }
+
+ TRACE("sending new client request");
+ client_job = QTAILQ_FIRST(&va_state->client_jobs);
+ /* set up the send handler for the request, then put it on the
+ * wait queue till response is read
+ */
+ ret = va_send_client_request(client_job);
+ if (ret != 0) {
+ LOG("error setting up sendhandler for client request");
+ goto out_bad;
+ }
+ assert(va_set_client_state(VA_CLIENT_SEND));
+ }
+
+out:
+ return 0;
+out_bad:
+ return ret;
+}
+
+/* push new client job onto queue, */
+static int va_push_client_job(VAClientJob *client_job)
+{
+ TRACE("called");
+ assert(client_job != NULL);
+ if (va_state->client_job_count >= VA_CLIENT_JOBS_MAX) {
+ LOG("client job queue limit exceeded");
+ return -ENOBUFS;
+ }
+ QTAILQ_INSERT_TAIL(&va_state->client_jobs, client_job, next);
+ va_state->client_job_count++;
+
+ return va_kick();
+}
+
+/* pop client job off queue. this should only be done when we're done with
+ * both sending the request and recieving the response
+ */
+static VAClientJob *va_pop_client_job(void)
+{
+ VAClientJob *client_job = va_current_client_job();
+ TRACE("called");
+ if (client_job != NULL) {
+ QTAILQ_REMOVE(&va_state->client_jobs, client_job, next);
+ va_state->client_job_count--;
+ assert(va_set_client_state(VA_CLIENT_IDLE));
+ }
+ return client_job;
+}
+
+/* push new server job onto the queue */
+static int va_push_server_job(VAServerJob *server_job)
+{
+ TRACE("called");
+ if (va_state->server_job_count >= VA_SERVER_JOBS_MAX) {
+ LOG("server job queue limit exceeded");
+ return -ENOBUFS;
+ }
+ QTAILQ_INSERT_TAIL(&va_state->server_jobs, server_job, next);
+ va_state->server_job_count++;
+ return va_kick();
+}
+
+/* pop server job off queue. this should only be done when we're ready to
+ * send the rpc response back to the remote client
+ */
+static VAServerJob *va_pop_server_job(void) {
+ VAServerJob *server_job = QTAILQ_FIRST(&va_state->server_jobs);
+ TRACE("called");
+ if (server_job != NULL) {
+ QTAILQ_REMOVE(&va_state->server_jobs, server_job, next);
+ va_state->server_job_count--;
+ assert(va_set_server_state(VA_SERVER_IDLE));
+ }
+
+ return server_job;
+}
+
+static VAClientJob *va_client_job_new(xmlrpc_mem_block *req_data,
+ VAClientCallback *cb,
+ MonitorCompletion *mon_cb,
+ void *mon_data)
+{
+ VAClientJob *cj = qemu_mallocz(sizeof(VAClientJob));
+ TRACE("called");
+ cj->req_data = req_data;
+ cj->cb = cb;
+ cj->mon_cb = mon_cb;
+ cj->mon_data = mon_data;
+
+ return cj;
+}
+
+static VAServerJob *va_server_job_new(xmlrpc_mem_block *resp_data)
+{
+ VAServerJob *sj = qemu_mallocz(sizeof(VAServerJob));
+ TRACE("called");
+ sj->resp_data = resp_data;
+
+ return sj;
+}
+
+/* create new client job and then put it on the queue. this can be
+ * called externally from virtagent. Since there can only be one virtagent
+ * instance we access state via an object-scoped global rather than pass
+ * it around.
+ *
+ * if this is successful virtagent will handle cleanup of req_xml after
+ * making the appropriate callbacks, otherwise callee should handle it
+ */
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+ MonitorCompletion *mon_cb, void *mon_data)
+{
+ int ret;
+ VAClientJob *client_job;
+ TRACE("called");
+
+ client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
+ if (client_job == NULL) {
+ return -EINVAL;
+ }
+
+ ret = va_push_client_job(client_job);
+ if (ret != 0) {
+ LOG("error adding client to queue: %s", strerror(ret));
+ qemu_free(client_job);
+ return ret;
+ }
+
+ return 0;
+}
+
+/* create new server job and then put it on the queue in wait state
+ * this should only be called from within our read handler callback
+ */
+static int va_server_job_add(xmlrpc_mem_block *resp_xml)
+{
+ VAServerJob *server_job;
+ TRACE("called");
+
+ server_job = va_server_job_new(resp_xml);
+ assert(server_job != NULL);
+ va_push_server_job(server_job);
+ return 0;
+}
+
+
+int va_init(enum va_ctx ctx, int fd)
+{
+ VAState *s;
+ int ret;
+ bool is_host = (ctx == VA_CTX_HOST) ? true : false;
+
+ TRACE("called");
+ if (va_state) {
+ LOG("virtagent already initialized");
+ return -EPERM;
+ }
+
+ s = qemu_mallocz(sizeof(VAState));
+
+ ret = va_server_init(&s->server_data, is_host);
+ if (ret) {
+ LOG("error initializing virtagent server");
+ goto out_bad;
+ }
+ ret = va_client_init(&s->client_data);
+ if (ret) {
+ LOG("error initializing virtagent client");
+ goto out_bad;
+ }
+
+ s->client_state = VA_CLIENT_IDLE;
+ s->server_state = VA_SERVER_IDLE;
+ QTAILQ_INIT(&s->client_jobs);
+ QTAILQ_INIT(&s->server_jobs);
+ s->read_state.state = VA_READ_START;
+ s->read_state.read_cb = va_http_read_cb;
+ s->fd = fd;
+ va_state = s;
+
+ /* start listening for requests/responses */
+ qemu_set_fd_handler(va_state->fd, va_http_read_handler, NULL, NULL);
+
+ return 0;
+out_bad:
+ qemu_free(s);
+ return ret;
+}
diff --git a/virtagent-common.h b/virtagent-common.h
new file mode 100644
index 0000000..c0ada60
--- /dev/null
+++ b/virtagent-common.h
@@ -0,0 +1,66 @@
+/*
+ * virt-agent - host/guest RPC client functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ * Adam Litke <address@hidden>
+ * Michael Roth <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+#ifndef VIRTAGENT_COMMON_H
+#define VIRTAGENT_COMMON_H
+
+#include <xmlrpc-c/base.h>
+#include <xmlrpc-c/client.h>
+#include <xmlrpc-c/server.h>
+#include "qemu-common.h"
+#include "qemu_socket.h"
+#include "monitor.h"
+#include "virtagent-server.h"
+#include "virtagent.h"
+
+#define DEBUG_VA
+
+#ifdef DEBUG_VA
+#define TRACE(msg, ...) do { \
+ fprintf(stderr, "%s:%s():L%d: " msg "\n", \
+ __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
+} while(0)
+#else
+#define TRACE(msg, ...) \
+ do { } while (0)
+#endif
+
+#define LOG(msg, ...) do { \
+ fprintf(stderr, "%s:%s(): " msg "\n", \
+ __FILE__, __FUNCTION__, ## __VA_ARGS__); \
+} while(0)
+
+#define VERSION "1.0"
+#define EOL "\r\n"
+
+#define VA_HDR_LEN_MAX 4096 /* http header limit */
+#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
+#define VA_CLIENT_JOBS_MAX 5 /* max client rpcs we can queue */
+#define VA_SERVER_JOBS_MAX 1 /* max server rpcs we can queue */
+
+enum va_ctx {
+ VA_CTX_HOST,
+ VA_CTX_GUEST,
+};
+
+enum va_job_status {
+ VA_JOB_STATUS_PENDING = 0,
+ VA_JOB_STATUS_OK,
+ VA_JOB_STATUS_ERROR,
+ VA_JOB_STATUS_CANCELLED,
+};
+
+int va_init(enum va_ctx ctx, int fd);
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+ MonitorCompletion *mon_cb, void *mon_data);
+#endif /* VIRTAGENT_COMMON_H */
--
1.7.0.4