[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] 答复: Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactk
From: |
wang.yong155 |
Subject: |
[Qemu-devel] 答复: Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary |
Date: |
Tue, 13 Jun 2017 08:48:25 +0800 (CST) |
>> >> From: Wang Yong address@hidden
>>
>> >>
>>
>> >> Process pactkets in the IOThread which arrived over the socket.
>>
>> >> we use qio_channel_set_aio_fd_handler to set the handlers on the
>>
>> >> IOThread AioContext.then the packets from the primary and the
>> secondary
>>
>> >> are processed in the IOThread.
>>
>> >> Finally remove the colo-compare thread using the IOThread instead.
>>
>> >>
>>
>> >> Signed-off-by: Wang address@hidden
>>
>> >> Signed-off-by: Wang address@hidden
>>
>> >> ---
>>
>> >> net/colo-compare.c | 133
>> ++++++++++++++++++++++++++++++++++++-----------------
>>
>> >> net/colo.h | 1 +
>>
>> >> 2 files changed, 91 insertions(+), 43 deletions(-)
>>
>> >>
>>
>> >> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>
>> >> index b0942a4..e3af791 100644
>>
>> >> --- a/net/colo-compare.c
>>
>> >> +++ b/net/colo-compare.c
>>
>> >> @@ -29,6 +29,7 @@
>>
>> >> #include "qemu/sockets.h"
>>
>> >> #include "qapi-visit.h"
>>
>> >> #include "net/colo.h"
>>
>> >> +#include "io/channel.h"
>>
>> >> #include "sysemu/iothread.h"
>>
>> >>
>>
>> >> #define TYPE_COLO_COMPARE "colo-compare"
>>
>> >> @@ -82,11 +83,6 @@ typedef struct CompareState {
>>
>> >> GQueue conn_list
>>
>> >> /* hashtable to save connection */
>>
>> >> GHashTable *connection_track_table
>>
>> >> - /* compare thread, a thread for each NIC */
>>
>> >> - QemuThread thread
>>
>> >> -
>>
>> >> - GMainContext *worker_context
>>
>> >> - GMainLoop *compare_loop
>>
>> >>
>>
>> >> /*compare iothread*/
>>
>> >> IOThread *iothread
>>
>> >> @@ -95,6 +91,14 @@ typedef struct CompareState {
>>
>> >> QEMUTimer *packet_check_timer
>>
>> >> } CompareState
>>
>> >>
>>
>> >> +typedef struct {
>>
>> >> + Chardev parent
>>
>> >> + QIOChannel *ioc /*I/O channel */
>>
>>
>> >We probably don't want to manipulate char backend's internal io
>> channel.
>>
>> >All need here is to access the frontend API (char-fe.c) I believe, and
>>
>> >hide the internal implementation.
>>
>> char-fd.c ?
>>
>Char-fe.c for sure which means frontend of chardev.
>> These API can only watch events in the qemu main thread, not in the
>> IOThread.
>>
>> I had to use the qio_channel_socket_set_aio_fd_handler function to
>>
>> monitor the char event in the IOThread,so the io channel is used her
>>
>The point is not touching the internal structure of chardev like ioc,
>instead extend its helper like e.g qemu_chr_fe_set_handlers() and let it
>set aio handlers,
Currently character devices are tied to the GSource API. However,I'll try to
submit a patch first.
Thanks
>> ->qio_channel_socket_set_aio_fd_handler
>>
>> ->aio_set_fd_handler
>>
>>
>> Thanks
>>
>>
>> >> +} CompareChardev
>>
>> >> +
>>
>> >> +#define COMPARE_CHARDEV(obj) \
>>
>> >> + OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
>>
>> >> +
>>
>> >> typedef struct CompareClass {
>>
>> >> ObjectClass parent_class
>>
>> >> } CompareClass
>>
>> >> @@ -107,6 +111,12 @@ enum {
>>
>> >> static int compare_chr_send(CharBackend *out,
>>
>> >> const uint8_t *buf,
>>
>> >> uint32_t size)
>>
>> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>>
>> >> + AioContext *ctx,
>>
>> >> + IOCanReadHandler *fd_can_read,
>>
>> >> + IOReadHandler *fd_read,
>>
>> >> + IOEventHandler *fd_event,
>>
>> >> + void *opaque)
>>
>> >>
>>
>> >> static gint seq_sorter(Packet *a, Packet *b, gpointer data)
>>
>> >> {
>>
>> >> @@ -534,6 +544,30 @@ err:
>>
>> >> return ret < 0 ? ret : -EIO
>>
>> >> }
>>
>> >>
>>
>> >> +static void compare_chr_read(void *opaque)
>>
>> >> +{
>>
>> >> + Chardev *chr = opaque
>>
>> >> + uint8_t buf[CHR_READ_BUF_LEN]
>>
>> >> + int len, size
>>
>> >> + int max_size
>>
>> >> +
>>
>> >> + max_size = qemu_chr_be_can_write(chr)
>>
>> >> + if (max_size <= 0) {
>>
>> >> + return
>>
>> >> + }
>>
>> >> +
>>
>> >> + len = sizeof(buf)
>>
>> >> + if (len > max_size) {
>>
>> >> + len = max_size
>>
>> >> + }
>>
>> >> + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf,
>> len)
>>
>> >> + if (size == 0) {
>>
>> >> + return
>>
>> >> + } else if (size > 0) {
>>
>> >> + qemu_chr_be_write(chr, buf, size)
>>
>> >> + }
>>
>> >> +}
>>
>> >> +
>>
>> >> static int compare_chr_can_read(void *opaque)
>>
>> >> {
>>
>> >> return COMPARE_READ_LEN_MAX
>>
>> >> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque,
>> const uint8_t *buf, int size)
>>
>> >>
>>
>> >> ret = net_fill_rstate(&s->pri_rs, buf, size)
>>
>> >> if (ret == -1) {
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
>>
>> >> - NULL, NULL, true)
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>>
>> >> + NULL, NULL, NULL, NULL)
>>
>> >> error_report("colo-compare primary_in error")
>>
>> >> }
>>
>> >> }
>>
>> >> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque,
>> const uint8_t *buf, int size)
>>
>> >>
>>
>> >> ret = net_fill_rstate(&s->sec_rs, buf, size)
>>
>> >> if (ret == -1) {
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
>>
>> >> - NULL, NULL, true)
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>>
>> >> + NULL, NULL, NULL, NULL)
>>
>> >> error_report("colo-compare secondary_in error")
>>
>> >> }
>>
>> >> }
>>
>> >> @@ -605,34 +639,57 @@ static void
>> colo_compare_timer_del(CompareState *s)
>>
>> >> }
>>
>> >> }
>>
>> >>
>>
>> >> -static void *colo_compare_thread(void *opaque)
>>
>> >> -{
>>
>> >> - CompareState *s = opaque
>>
>> >> -
>>
>>>> - s->worker_context = g_main_context_new()
>>
>> >> -
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
>>
>> >> - compare_pri_chr_in, NULL, s,
>> s->worker_context, true)
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
>>
>> >> - compare_sec_chr_in, NULL, s,
>> s->worker_context, true)
>>
>> >> -
>>
>> >> - s->compare_loop = g_main_loop_new(s->worker_context, FALSE)
>>
>> >> -
>>
>> >> - g_main_loop_run(s->compare_loop)
>>
>> >> -
>>
>> >> - g_main_loop_unref(s->compare_loop)
>>
>> >> - g_main_context_unref(s->worker_context)
>>
>> >> - return NULL
>>
>> >> -}
>>
>> >>
>>
>> >> static void colo_compare_iothread(CompareState *s)
>>
>> >> {
>>
>> >> object_ref(OBJECT(s->iothread))
>>
>> >> s->ctx = iothread_get_aio_context(s->iothread)
>>
>> >>
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>>
>> >> + compare_chr_can_read,
>>
>> >> + compare_pri_chr_in,
>>
>> >> + NULL,
>>
>> >> + s)
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>>
>> >> + compare_chr_can_read,
>>
>> >> + compare_sec_chr_in,
>>
>> >> + NULL,
>>
>> >> + s)
>>
>> >> +
>>
>> >> colo_compare_timer_init(s)
>>
>> >> }
>>
>> >>
>>
>> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>>
>> >> + AioContext *ctx,
>>
>> >> + IOCanReadHandler *fd_can_read,
>>
>> >> + IOReadHandler *fd_read,
>>
>> >> + IOEventHandler *fd_event,
>>
>> >> + void *opaque)
>>
>> >> +{
>>
>> >> + CompareChardev *s
>>
>> >> +
>>
>> >> + if (!b->chr) {
>>
>> >> + return
>>
>> >> + }
>>
>> >> + s = COMPARE_CHARDEV(b->chr)
>>
>> >> + if (!s->ioc) {
>>
>> >> + return
>>
>> >> + }
>>
>>
>> >So this is hacky, you can refer how vhost-user validate udp socket char
>>
>> >backend.
>>
>> I will investigate.
>>
>>
>> Thanks
>>
>>
>> >> +
>>
>> >> + b->chr_can_read = fd_can_read
>>
>> >> + b->chr_read = fd_read
>>
>> >> + b->chr_event = fd_event
>>
>> >> + b->opaque = opaque
>>
>> >> + remove_fd_in_watch(b->chr)
>>
>> >> +
>>
>> >> + if (b->chr_read) {
>>
>> >> + qio_channel_set_aio_fd_handler(s->ioc, ctx,
>>
>> >> + compare_chr_read, NULL, b->chr)
>>
>> >> + } else {
>>
>> >> + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL,
>> NULL)
>>
>>
>> >So instead of doing such hack, how about passing a AioContext * instead
>>
>> >of GMainContext * to qemu_chr_fe_set_handlers?
>>
>> IOThread AioContext ->GSource -> GMainContext is NULL
>>
>> if we still use the qemu_chr_fe_set_handlers, it will use the qemu
>> main thread' GMainContext,
>>
>> then io will still be processed in the qemu main thread.
>>
>> so I encapsulate a function(compare_chr_set_aio_fd_handlers) to
>> monitor char fd in the IOThread.
>>
>>>
>As above, we should do this inside qemu-fe.c not here.
>
>Thanks
>
>> Thanks
>>
>>
>> >Thanks
>>
>>
>> >> + }
>>
>> >> +}
>>
>> >> +
>>
>> >> static char *compare_get_pri_indev(Object *obj, Error **errp)
>>
>> >> {
>>
>> >> CompareState *s = COLO_COMPARE(obj)
>>
>> >> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable
>> *uc, Error **errp)
>>
>> >> {
>>
>> >> CompareState *s = COLO_COMPARE(uc)
>>
>> >> Chardev *chr
>>
>> >> - char thread_name[64]
>>
>> >> - static int compare_id
>>
>> >>
>>
>> >> if (!s->pri_indev || !s->sec_indev || !s->outdev ||
>> !s->iothread) {
>>
>> >> error_setg(errp, "colo compare needs 'primary_in' ,"
>>
>> >> @@ -776,12 +831,6 @@ static void
>> colo_compare_complete(UserCreatable *uc, Error **errp)
>>
>> >> g_free,
>>
>> >> connection_destroy)
>>
>> >>
>>
>> >> - sprintf(thread_name, "colo-compare %d", compare_id)
>>
>> >> - qemu_thread_create(&s->thread, thread_name,
>>
>> >> - colo_compare_thread, s,
>>
>> >> - QEMU_THREAD_JOINABLE)
>>
>> >> - compare_id++
>>
>> >> -
>>
>> >> colo_compare_iothread(s)
>>
>> >>
>>
>> >> return
>>
>> >> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
>>
>> >> {
>>
>> >> CompareState *s = COLO_COMPARE(obj)
>>
>> >>
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
>> NULL,
>>
>> >> - s->worker_context, true)
>>
>> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
>> NULL,
>>
>> >> - s->worker_context, true)
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>>
>> >> + NULL, NULL, NULL, NULL)
>>
>> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>>
>> >> + NULL, NULL, NULL, NULL)
>>
>> >> +
>>
>> >> qemu_chr_fe_deinit(&s->chr_out)
>>
>> >> colo_compare_timer_del(s)
>>
>> >>
>>
>> >> - g_main_loop_quit(s->compare_loop)
>>
>> >> - qemu_thread_join(&s->thread)
>>
>> >> -
>>
>> >> /* Release all unhandled packets after compare thead exited */
>>>
>> >> g_queue_foreach(&s->conn_list, colo_flush_packets, s)
>>
>> >>
>>
>> >> diff --git a/net/colo.h b/net/colo.h
>>
>> >> index 7c524f3..936dea1 100644
>>
>> >> --- a/net/colo.h
>>
>> >> +++ b/net/colo.h
>>
>> >> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable
>> *connection_track_table,
>>
>> >> void connection_hashtable_reset(GHashTable
>> *connection_track_table)
>>
>> >> Packet *packet_new(const void *data, int size)
>>
>> >> void packet_destroy(void *opaque, void *user_data)
>>
>> >> +void remove_fd_in_watch(Chardev *chr)
>>
>> >>
>>
>> >> #endif /* QEMU_COLO_PROXY_H */
原始邮件
发件人: <address@hidden>
收件人:王勇10170530 <address@hidden>
抄送人: <address@hidden> <address@hidden> <address@hidden>王广10165992
日 期 :2017年06月09日 12:20
主 题 :Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread
ofthe primary
On 2017年06月08日 17:16, address@hidden wrote:
>
> >> From: Wang Yong address@hidden
>
> >>
>
> >> Process pactkets in the IOThread which arrived over the socket.
>
> >> we use qio_channel_set_aio_fd_handler to set the handlers on the
>
> >> IOThread AioContext.then the packets from the primary and the
> secondary
>
> >> are processed in the IOThread.
>
> >> Finally remove the colo-compare thread using the IOThread instead.
>
> >>
>
> >> Signed-off-by: Wang address@hidden
>
> >> Signed-off-by: Wang address@hidden
>
> >> ---
>
> >> net/colo-compare.c | 133
> ++++++++++++++++++++++++++++++++++++-----------------
>
> >> net/colo.h | 1 +
>
> >> 2 files changed, 91 insertions(+), 43 deletions(-)
>
> >>
>
> >> diff --git a/net/colo-compare.c b/net/colo-compare.c
>
> >> index b0942a4..e3af791 100644
>
> >> --- a/net/colo-compare.c
>
> >> +++ b/net/colo-compare.c
>
> >> @@ -29,6 +29,7 @@
>
> >> #include "qemu/sockets.h"
>
> >> #include "qapi-visit.h"
>
> >> #include "net/colo.h"
>
> >> +#include "io/channel.h"
>
> >> #include "sysemu/iothread.h"
>
> >>
>
> >> #define TYPE_COLO_COMPARE "colo-compare"
>
> >> @@ -82,11 +83,6 @@ typedef struct CompareState {
>
> >> GQueue conn_list
>
> >> /* hashtable to save connection */
>
> >> GHashTable *connection_track_table
>
> >> - /* compare thread, a thread for each NIC */
>
> >> - QemuThread thread
>
> >> -
>
> >> - GMainContext *worker_context
>
> >> - GMainLoop *compare_loop
>
> >>
>
> >> /*compare iothread*/
>
> >> IOThread *iothread
>
> >> @@ -95,6 +91,14 @@ typedef struct CompareState {
>
> >> QEMUTimer *packet_check_timer
>
> >> } CompareState
>
> >>
>
> >> +typedef struct {
>
> >> + Chardev parent
>
> >> + QIOChannel *ioc /*I/O channel */
>
>
> >We probably don't want to manipulate char backend's internal io
> channel.
>
> >All need here is to access the frontend API (char-fe.c) I believe, and
>
> >hide the internal implementation.
>
> char-fd.c ?
>
Char-fe.c for sure which means frontend of chardev.
> These API can only watch events in the qemu main thread, not in the
> IOThread.
>
> I had to use the qio_channel_socket_set_aio_fd_handler function to
>
> monitor the char event in the IOThread,so the io channel is used her
>
The point is not touching the internal structure of chardev like ioc,
instead extend its helper like e.g qemu_chr_fe_set_handlers() and let it
set aio handlers,
> ->qio_channel_socket_set_aio_fd_handler
>
> ->aio_set_fd_handler
>
>
> Thanks
>
>
> >> +} CompareChardev
>
> >> +
>
> >> +#define COMPARE_CHARDEV(obj) \
>
> >> + OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
>
> >> +
>
> >> typedef struct CompareClass {
>
> >> ObjectClass parent_class
>
> >> } CompareClass
>
> >> @@ -107,6 +111,12 @@ enum {
>
> >> static int compare_chr_send(CharBackend *out,
>
> >> const uint8_t *buf,
>
> >> uint32_t size)
>
> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>
> >> + AioContext *ctx,
>
> >> + IOCanReadHandler *fd_can_read,
>
> >> + IOReadHandler *fd_read,
>
> >> + IOEventHandler *fd_event,
>
> >> + void *opaque)
>
> >>
>
> >> static gint seq_sorter(Packet *a, Packet *b, gpointer data)
>
> >> {
>
> >> @@ -534,6 +544,30 @@ err:
>
> >> return ret < 0 ? ret : -EIO
>
> >> }
>
> >>
>
> >> +static void compare_chr_read(void *opaque)
>
> >> +{
>
> >> + Chardev *chr = opaque
>
> >> + uint8_t buf[CHR_READ_BUF_LEN]
>
> >> + int len, size
>
> >> + int max_size
>
> >> +
>
> >> + max_size = qemu_chr_be_can_write(chr)
>
> >> + if (max_size <= 0) {
>
> >> + return
>
> >> + }
>
> >> +
>
> >> + len = sizeof(buf)
>
> >> + if (len > max_size) {
>
> >> + len = max_size
>
> >> + }
>
> >> + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf,
> len)
>
> >> + if (size == 0) {
>
> >> + return
>
> >> + } else if (size > 0) {
>
> >> + qemu_chr_be_write(chr, buf, size)
>
> >> + }
>
> >> +}
>
> >> +
>
> >> static int compare_chr_can_read(void *opaque)
>
> >> {
>
> >> return COMPARE_READ_LEN_MAX
>
> >> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque,
> const uint8_t *buf, int size)
>
> >>
>
> >> ret = net_fill_rstate(&s->pri_rs, buf, size)
>
> >> if (ret == -1) {
>
> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
>
> >> - NULL, NULL, true)
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> + NULL, NULL, NULL, NULL)
>
> >> error_report("colo-compare primary_in error")
>
> >> }
>
> >> }
>
> >> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque,
> const uint8_t *buf, int size)
>
> >>
>
> >> ret = net_fill_rstate(&s->sec_rs, buf, size)
>
> >> if (ret == -1) {
>
> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
>
> >> - NULL, NULL, true)
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> + NULL, NULL, NULL, NULL)
>
> >> error_report("colo-compare secondary_in error")
>
> >> }
>
> >> }
>
> >> @@ -605,34 +639,57 @@ static void
> colo_compare_timer_del(CompareState *s)
>
> >> }
>
> >> }
>
> >>
>
> >> -static void *colo_compare_thread(void *opaque)
>
> >> -{
>
> >> - CompareState *s = opaque
>
> >> -
>
> >> - s->worker_context = g_main_context_new()
>
> >> -
>
> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
>
> >> - compare_pri_chr_in, NULL, s,
> s->worker_context, true)
>
> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
>
> >> - compare_sec_chr_in, NULL, s,
> s->worker_context, true)
>
> >> -
>
> >> - s->compare_loop = g_main_loop_new(s->worker_context, FALSE)
>
> >> -
>
> >> - g_main_loop_run(s->compare_loop)
>
> >> -
>
> >> - g_main_loop_unref(s->compare_loop)
>
> >> - g_main_context_unref(s->worker_context)
>
> >> - return NULL
>
> >> -}
>
> >>
>
> >> static void colo_compare_iothread(CompareState *s)
>
> >> {
>
> >> object_ref(OBJECT(s->iothread))
>
> >> s->ctx = iothread_get_aio_context(s->iothread)
>
> >>
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> + compare_chr_can_read,
>
> >> + compare_pri_chr_in,
>
> >> + NULL,
>
> >> + s)
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> + compare_chr_can_read,
>
> >> + compare_sec_chr_in,
>
> >> + NULL,
>
> >> + s)
>
> >> +
>
> >> colo_compare_timer_init(s)
>
> >> }
>
> >>
>
> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>
> >> + AioContext *ctx,
>
> >> + IOCanReadHandler *fd_can_read,
>
> >> + IOReadHandler *fd_read,
>
> >> + IOEventHandler *fd_event,
>
> >> + void *opaque)
>
> >> +{
>
> >> + CompareChardev *s
>
> >> +
>
> >> + if (!b->chr) {
>
> >> + return
>
> >> + }
>
> >> + s = COMPARE_CHARDEV(b->chr)
>
> >> + if (!s->ioc) {
>
> >> + return
>
> >> + }
>
>
> >So this is hacky, you can refer how vhost-user validate udp socket char
>
> >backend.
>
> I will investigate.
>
>
> Thanks
>
>
> >> +
>
> >> + b->chr_can_read = fd_can_read
>
> >> + b->chr_read = fd_read
>
> >> + b->chr_event = fd_event
>
> >> + b->opaque = opaque
>
> >> + remove_fd_in_watch(b->chr)
>
> >> +
>
> >> + if (b->chr_read) {
>
> >> + qio_channel_set_aio_fd_handler(s->ioc, ctx,
>
> >> + compare_chr_read, NULL, b->chr)
>
> >> + } else {
>
> >> + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL,
> NULL)
>
>
> >So instead of doing such hack, how about passing a AioContext * instead
>
> >of GMainContext * to qemu_chr_fe_set_handlers?
>
> IOThread AioContext ->GSource -> GMainContext is NULL
>
> if we still use the qemu_chr_fe_set_handlers, it will use the qemu
> main thread' GMainContext,
>
> then io will still be processed in the qemu main thread.
>
> so I encapsulate a function(compare_chr_set_aio_fd_handlers) to
> monitor char fd in the IOThread.
>
>
As above, we should do this inside qemu-fe.c not here.
Thanks
> Thanks
>
>
> >Thanks
>
>
> >> + }
>
> >> +}
>
> >> +
>
> >> static char *compare_get_pri_indev(Object *obj, Error **errp)
>
> >> {
>
> >> CompareState *s = COLO_COMPARE(obj)
>
> >> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable
> *uc, Error **errp)
>
> >> {
>
> >> CompareState *s = COLO_COMPARE(uc)
>
> >> Chardev *chr
>
> >> - char thread_name[64]
>
> >> - static int compare_id
>
> >>
>
> >> if (!s->pri_indev || !s->sec_indev || !s->outdev ||
> !s->iothread) {
>
> >> error_setg(errp, "colo compare needs 'primary_in' ,"
>
> >> @@ -776,12 +831,6 @@ static void
> colo_compare_complete(UserCreatable *uc, Error **errp)
>
> >> g_free,
>
> >> connection_destroy)
>
> >>
>
> >> - sprintf(thread_name, "colo-compare %d", compare_id)
>
> >> - qemu_thread_create(&s->thread, thread_name,
>
> >> - colo_compare_thread, s,
>
> >> - QEMU_THREAD_JOINABLE)
>
> >> - compare_id++
>
> >> -
>
> >> colo_compare_iothread(s)
>
> >>
>
> >> return
>
> >> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
>
> >> {
>
> >> CompareState *s = COLO_COMPARE(obj)
>
> >>
>
> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
> NULL,
>
> >> - s->worker_context, true)
>
> >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
> NULL,
>
> >> - s->worker_context, true)
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> + NULL, NULL, NULL, NULL)
>
> >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> + NULL, NULL, NULL, NULL)
>
> >> +
>
> >> qemu_chr_fe_deinit(&s->chr_out)
>
> >> colo_compare_timer_del(s)
>
> >>
>
> >> - g_main_loop_quit(s->compare_loop)
>
> >> - qemu_thread_join(&s->thread)
>
> >> -
>
> >> /* Release all unhandled packets after compare thead exited */
>
> >> g_queue_foreach(&s->conn_list, colo_flush_packets, s)
>
> >>
>
> >> diff --git a/net/colo.h b/net/colo.h
>
> >> index 7c524f3..936dea1 100644
>
> >> --- a/net/colo.h
>
> >> +++ b/net/colo.h
>
> >> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable
> *connection_track_table,
>
> >> void connection_hashtable_reset(GHashTable
> *connection_track_table)
>
> >> Packet *packet_new(const void *data, int size)
>
> >> void packet_destroy(void *opaque, void *user_data)
>
> >> +void remove_fd_in_watch(Chardev *chr)
>
> >>
>
> >> #endif /* QEMU_COLO_PROXY_H */
>
>
>
>
>
> 原始邮件
> address@hidden
> address@hidden@cn.fujitsu.com>
> address@hidden@nongnu.org>王广10165992
> *日 期 :*2017年06月07日 16:35
> *主 题 :**Re: [PATCHv2 02/04] colo-compare: Process pactkets in the
> IOThread ofthe primary*
>
>
>
>
> On 2017年06月05日 18:44, Yong Wang wrote:
> > From: Wang Yong address@hidden
> >
> > Process pactkets in the IOThread which arrived over the socket.
> > we use qio_channel_set_aio_fd_handler to set the handlers on the
> > IOThread AioContext.then the packets from the primary and the secondary
> > are processed in the IOThread.
> > Finally remove the colo-compare thread using the IOThread instead.
> >
> > Signed-off-by: Wang address@hidden
> > Signed-off-by: Wang address@hidden
> > ---
> > net/colo-compare.c | 133
> ++++++++++++++++++++++++++++++++++++-----------------
> > net/colo.h | 1 +
> > 2 files changed, 91 insertions(+), 43 deletions(-)
> >
> > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > index b0942a4..e3af791 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -29,6 +29,7 @@
> > #include "qemu/sockets.h"
> > #include "qapi-visit.h"
> > #include "net/colo.h"
> > +#include "io/channel.h"
> > #include "sysemu/iothread.h"
> >
> > #define TYPE_COLO_COMPARE "colo-compare"
> > @@ -82,11 +83,6 @@ typedef struct CompareState {
> > GQueue conn_list
> > /* hashtable to save connection */
> > GHashTable *connection_track_table
> > - /* compare thread, a thread for each NIC */
> > - QemuThread thread
> > -
> > - GMainContext *worker_context
> > - GMainLoop *compare_loop
> >
> > /*compare iothread*/
> > IOThread *iothread
> > @@ -95,6 +91,14 @@ typedef struct CompareState {
> > QEMUTimer *packet_check_timer
> > } CompareState
> >
> > +typedef struct {
> > + Chardev parent
> > + QIOChannel *ioc /*I/O channel */
>
> We probably don't want to manipulate char backend's internal io channel.
> All need here is to access the frontend API (char-fe.c) I believe, and
> hide the internal implementation.
>
> > +} CompareChardev
> > +
> > +#define COMPARE_CHARDEV(obj) \
> > + OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
> > +
> > typedef struct CompareClass {
> > ObjectClass parent_class
> > } CompareClass
> > @@ -107,6 +111,12 @@ enum {
> > static int compare_chr_send(CharBackend *out,
> > const uint8_t *buf,
> > uint32_t size)
> > +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> > + AioContext *ctx,
> > + IOCanReadHandler *fd_can_read,
> > + IOReadHandler *fd_read,
> > + IOEventHandler *fd_event,
> > + void *opaque)
> >
> > static gint seq_sorter(Packet *a, Packet *b, gpointer data)
> > {
> > @@ -534,6 +544,30 @@ err:
> > return ret < 0 ? ret : -EIO
> > }
> >
> > +static void compare_chr_read(void *opaque)
> > +{
> > + Chardev *chr = opaque
> > + uint8_t buf[CHR_READ_BUF_LEN]
> > + int len, size
> > + int max_size
> > +
> > + max_size = qemu_chr_be_can_write(chr)
> > + if (max_size <= 0) {
> > + return
> > + }
> > +
> > + len = sizeof(buf)
> > + if (len > max_size) {
> > + len = max_size
> > + }
> > + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len)
> > + if (size == 0) {
> > + return
> > + } else if (size > 0) {
> > + qemu_chr_be_write(chr, buf, size)
> > + }
> > +}
> > +
> > static int compare_chr_can_read(void *opaque)
> > {
> > return COMPARE_READ_LEN_MAX
> > @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const
> uint8_t *buf, int size)
> >
> > ret = net_fill_rstate(&s->pri_rs, buf, size)
> > if (ret == -1) {
> > - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
> > - NULL, NULL, true)
> > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > + NULL, NULL, NULL, NULL)
> > error_report("colo-compare primary_in error")
> > }
> > }
> > @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const
> uint8_t *buf, int size)
> >
> > ret = net_fill_rstate(&s->sec_rs, buf, size)
> > if (ret == -1) {
> > - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
> > - NULL, NULL, true)
> > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > + NULL, NULL, NULL, NULL)
> > error_report("colo-compare secondary_in error")
> > }
> > }
> > @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s)
> > }
> > }
> >
> > -static void *colo_compare_thread(void *opaque)
> > -{
> > - CompareState *s = opaque
> > -
> > - s->worker_context = g_main_context_new()
> > -
> > - qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
> > - compare_pri_chr_in, NULL, s, s->worker_context,
> true)
> > - qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
> > - compare_sec_chr_in, NULL, s, s->worker_context,
> true)
> > -
> > - s->compare_loop = g_main_loop_new(s->worker_context, FALSE)
> > -
> > - g_main_loop_run(s->compare_loop)
> > -
> > - g_main_loop_unref(s->compare_loop)
> > - g_main_context_unref(s->worker_context)
> > - return NULL
> > -}
> >
> > static void colo_compare_iothread(CompareState *s)
> > {
> > object_ref(OBJECT(s->iothread))
> > s->ctx = iothread_get_aio_context(s->iothread)
> >
> > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > + compare_chr_can_read,
> > + compare_pri_chr_in,
> > + NULL,
> > + s)
> > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > + compare_chr_can_read,
> > + compare_sec_chr_in,
> > + NULL,
> > + s)
> > +
> > colo_compare_timer_init(s)
> > }
> >
> > +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> > + AioContext *ctx,
> > + IOCanReadHandler *fd_can_read,
> > + IOReadHandler *fd_read,
> > + IOEventHandler *fd_event,
> > + void *opaque)
> > +{
> > + CompareChardev *s
> > +
> > + if (!b->chr) {
> > + return
> > + }
> > + s = COMPARE_CHARDEV(b->chr)
> > + if (!s->ioc) {
> > + return
> > + }
>
> So this is hacky, you can refer how vhost-user validate udp socket char
> backend.
>
> > +
> > + b->chr_can_read = fd_can_read
> > + b->chr_read = fd_read
> > + b->chr_event = fd_event
> > + b->opaque = opaque
> > + remove_fd_in_watch(b->chr)
> > +
> > + if (b->chr_read) {
> > + qio_channel_set_aio_fd_handler(s->ioc, ctx,
> > + compare_chr_read, NULL, b->chr)
> > + } else {
> > + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL)
>
> So instead of doing such hack, how about passing a AioContext * instead
> of GMainContext * to qemu_chr_fe_set_handlers?
>
> Thanks
>
> > + }
> > +}
> > +
> > static char *compare_get_pri_indev(Object *obj, Error **errp)
> > {
> > CompareState *s = COLO_COMPARE(obj)
> > @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc,
> Error **errp)
> > {
> > CompareState *s = COLO_COMPARE(uc)
> > Chardev *chr
> > - char thread_name[64]
> > - static int compare_id
> >
> > if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
> > error_setg(errp, "colo compare needs 'primary_in' ,"
> > @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc,
> Error **errp)
> > g_free,
> > connection_destroy)
> >
> > - sprintf(thread_name, "colo-compare %d", compare_id)
> > - qemu_thread_create(&s->thread, thread_name,
> > - colo_compare_thread, s,
> > - QEMU_THREAD_JOINABLE)
> > - compare_id++
> > -
> > colo_compare_iothread(s)
> >
> > return
> > @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
> > {
> > CompareState *s = COLO_COMPARE(obj)
> >
> > - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
> > - s->worker_context, true)
> > - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
> > - s->worker_context, true)
> > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > + NULL, NULL, NULL, NULL)
> > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > + NULL, NULL, NULL, NULL)
> > +
> > qemu_chr_fe_deinit(&s->chr_out)
> > colo_compare_timer_del(s)
> >
> > - g_main_loop_quit(s->compare_loop)
> > - qemu_thread_join(&s->thread)
> > -
> > /* Release all unhandled packets after compare thead exited */
> > g_queue_foreach(&s->conn_list, colo_flush_packets, s)
> >
> > diff --git a/net/colo.h b/net/colo.h
> > index 7c524f3..936dea1 100644
> > --- a/net/colo.h
> > +++ b/net/colo.h
> > @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable
> *connection_track_table,
> > void connection_hashtable_reset(GHashTable *connection_track_table)
> > Packet *packet_new(const void *data, int size)
> > void packet_destroy(void *opaque, void *user_data)
> > +void remove_fd_in_watch(Chardev *chr)
> >
> > #endif /* QEMU_COLO_PROXY_H */
>
>
>
- [Qemu-devel] 答复: Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary,
wang.yong155 <=