qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v4 10/20] QmpSession: keep a queue of pending comman


From: Marc-André Lureau
Subject: [Qemu-devel] [PATCH v4 10/20] QmpSession: keep a queue of pending commands
Date: Tue, 9 Apr 2019 18:09:59 +0200

The following commit will introduce asynchronous commands. Let's keep
the session aware of the pending commands, so we can do interesting
things like order the replies, or cancel pending operations when the
client is gone.

The queue needs a lock, since QmpReturn may be called from any thread.

Signed-off-by: Marc-André Lureau <address@hidden>
---
 include/qapi/qmp/dispatch.h |  4 ++++
 qapi/qmp-dispatch.c         | 32 ++++++++++++++++++++++++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index 6c0d21968e..7c9de9780d 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -16,6 +16,7 @@
 
 #include "qemu/queue.h"
 #include "qapi/qmp/json-parser.h"
+#include "qemu/thread.h"
 
 typedef struct QmpReturn QmpReturn;
 
@@ -47,11 +48,14 @@ struct QmpSession {
     const QmpCommandList *cmds;
     JSONMessageParser parser;
     QmpDispatchReturn *return_cb;
+    QemuMutex pending_lock;
+    QTAILQ_HEAD(, QmpReturn) pending;
 };
 
 struct QmpReturn {
     QmpSession *session;
     QDict *rsp;
+    QTAILQ_ENTRY(QmpReturn) entry;
 };
 
 /**
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index 5f75dc27bd..4699a6715b 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -32,11 +32,24 @@ QmpReturn *qmp_return_new(QmpSession *session, const 
QObject *request)
         qdict_put_obj(qret->rsp, "id", id);
     }
 
+    qemu_mutex_lock(&session->pending_lock);
+    QTAILQ_INSERT_TAIL(&session->pending, qret, entry);
+    qemu_mutex_unlock(&session->pending_lock);
+
     return qret;
 }
 
 void qmp_return_free(QmpReturn *qret)
 {
+    QmpSession *session = qret->session;
+
+    if (session) {
+        qemu_mutex_lock(&session->pending_lock);
+    }
+    QTAILQ_REMOVE(&session->pending, qret, entry);
+    if (session) {
+        qemu_mutex_unlock(&session->pending_lock);
+    }
     qobject_unref(qret->rsp);
     g_free(qret);
 }
@@ -44,7 +57,9 @@ void qmp_return_free(QmpReturn *qret)
 void qmp_return(QmpReturn *qret, QObject *rsp)
 {
     qdict_put_obj(qret->rsp, "return", rsp ?: QOBJECT(qdict_new()));
-    qret->session->return_cb(qret->session, qret->rsp);
+    if (qret->session) {
+        qret->session->return_cb(qret->session, qret->rsp);
+    }
     qmp_return_free(qret);
 }
 
@@ -55,7 +70,9 @@ void qmp_return_error(QmpReturn *qret, Error *err)
     qdict_put_str(qdict, "desc", error_get_pretty(err));
     qdict_put_obj(qret->rsp, "error", QOBJECT(qdict));
     error_free(err);
-    qret->session->return_cb(qret->session, qret->rsp);
+    if (qret->session) {
+        qret->session->return_cb(qret->session, qret->rsp);
+    }
     qmp_return_free(qret);
 }
 
@@ -219,17 +236,28 @@ void qmp_session_init(QmpSession *session,
                              session, NULL);
     session->cmds = cmds;
     session->return_cb = return_cb;
+    qemu_mutex_init(&session->pending_lock);
+    QTAILQ_INIT(&session->pending);
 }
 
 void qmp_session_destroy(QmpSession *session)
 {
+    QmpReturn *ret, *next;
+
     if (!session->return_cb) {
         return;
     }
 
+    qemu_mutex_lock(&session->pending_lock);
+    QTAILQ_FOREACH_SAFE(ret, &session->pending, entry, next) {
+        ret->session = NULL;
+        QTAILQ_REMOVE(&session->pending, ret, entry);
+    }
+    qemu_mutex_unlock(&session->pending_lock);
     session->cmds = NULL;
     session->return_cb = NULL;
     json_message_parser_destroy(&session->parser);
+    qemu_mutex_destroy(&session->pending_lock);
 }
 
 void qmp_dispatch(QmpSession *session, QObject *request, bool allow_oob)
-- 
2.21.0.196.g041f5ea1cf




reply via email to

[Prev in Thread] Current Thread [Next in Thread]