[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PULL 08/27] colo-compare: introduce packet comparison thre
From: |
Jason Wang |
Subject: |
[Qemu-devel] [PULL 08/27] colo-compare: introduce packet comparison thread |
Date: |
Mon, 26 Sep 2016 16:59:16 +0800 |
From: Zhang Chen <address@hidden>
If primary packet is same with secondary packet,
we will send primary packet and drop secondary
packet, otherwise notify COLO frame to do checkpoint.
If primary packet comes but secondary packet does not,
after REGULAR_PACKET_CHECK_MS milliseconds we set
the primary packet as old_packet,then do a checkpoint.
Signed-off-by: Zhang Chen <address@hidden>
Signed-off-by: Li Zhijian <address@hidden>
Signed-off-by: Wen Congyang <address@hidden>
Signed-off-by: Jason Wang <address@hidden>
---
net/colo-compare.c | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++++
net/colo.c | 1 +
net/colo.h | 3 +
trace-events | 2 +
4 files changed, 239 insertions(+)
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 231654c..645126e 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -33,8 +33,12 @@
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+#define COMPARE_READ_LEN_MAX NET_BUFSIZE
#define MAX_QUEUE_SIZE 1024
+/* TODO: Should be configurable */
+#define REGULAR_PACKET_CHECK_MS 3000
+
/*
+ CompareState ++
| |
@@ -76,6 +80,11 @@ typedef struct CompareState {
GQueue conn_list;
/* hashtable to save connection */
GHashTable *connection_track_table;
+ /* compare thread, a thread for each NIC */
+ QemuThread thread;
+ /* Timer used on the primary to find packets that are never matched */
+ QEMUTimer *timer;
+ QemuMutex timer_check_lock;
} CompareState;
typedef struct CompareClass {
@@ -148,6 +157,118 @@ static int packet_enqueue(CompareState *s, int mode)
return 0;
}
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return: 0 means packet same
+ * > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+ trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
+ inet_ntoa(ppkt->ip->ip_dst), spkt->size,
+ inet_ntoa(spkt->ip->ip_src),
+ inet_ntoa(spkt->ip->ip_dst));
+
+ if (ppkt->size == spkt->size) {
+ return memcmp(ppkt->data, spkt->data, spkt->size);
+ } else {
+ return -1;
+ }
+}
+
+static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+{
+ trace_colo_compare_main("compare all");
+ return colo_packet_compare(ppkt, spkt);
+}
+
+static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
+{
+ int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+
+ if ((now - pkt->creation_ms) > (*check_time)) {
+ trace_colo_old_packet_check_found(pkt->creation_ms);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static void colo_old_packet_check_one_conn(void *opaque,
+ void *user_data)
+{
+ Connection *conn = opaque;
+ GList *result = NULL;
+ int64_t check_time = REGULAR_PACKET_CHECK_MS;
+
+ result = g_queue_find_custom(&conn->primary_list,
+ &check_time,
+ (GCompareFunc)colo_old_packet_check_one);
+
+ if (result) {
+ /* do checkpoint will flush old packet */
+ /* TODO: colo_notify_checkpoint();*/
+ }
+}
+
+/*
+ * Look for old packets that the secondary hasn't matched,
+ * if we have some then we have to checkpoint to wake
+ * the secondary up.
+ */
+static void colo_old_packet_check(void *opaque)
+{
+ CompareState *s = opaque;
+
+ g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL);
+}
+
+/*
+ * Called from the compare thread on the primary
+ * for compare connection
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+ CompareState *s = user_data;
+ Connection *conn = opaque;
+ Packet *pkt = NULL;
+ GList *result = NULL;
+ int ret;
+
+ while (!g_queue_is_empty(&conn->primary_list) &&
+ !g_queue_is_empty(&conn->secondary_list)) {
+ qemu_mutex_lock(&s->timer_check_lock);
+ pkt = g_queue_pop_tail(&conn->primary_list);
+ qemu_mutex_unlock(&s->timer_check_lock);
+ result = g_queue_find_custom(&conn->secondary_list,
+ pkt, (GCompareFunc)colo_packet_compare_all);
+
+ if (result) {
+ ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
+ if (ret < 0) {
+ error_report("colo_send_primary_packet failed");
+ }
+ trace_colo_compare_main("packet same and release packet");
+ g_queue_remove(&conn->secondary_list, result->data);
+ packet_destroy(pkt, NULL);
+ } else {
+ /*
+ * If one packet arrive late, the secondary_list or
+ * primary_list will be empty, so we can't compare it
+ * until next comparison.
+ */
+ trace_colo_compare_main("packet different");
+ qemu_mutex_lock(&s->timer_check_lock);
+ g_queue_push_tail(&conn->primary_list, pkt);
+ qemu_mutex_unlock(&s->timer_check_lock);
+ /* TODO: colo_notify_checkpoint();*/
+ break;
+ }
+ }
+}
+
static int compare_chr_send(CharDriverState *out,
const uint8_t *buf,
uint32_t size)
@@ -175,6 +296,65 @@ err:
return ret < 0 ? ret : -EIO;
}
+static int compare_chr_can_read(void *opaque)
+{
+ return COMPARE_READ_LEN_MAX;
+}
+
+/*
+ * Called from the main thread on the primary for packets
+ * arriving over the socket from the primary.
+ */
+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+ CompareState *s = COLO_COMPARE(opaque);
+ int ret;
+
+ ret = net_fill_rstate(&s->pri_rs, buf, size);
+ if (ret == -1) {
+ qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+ error_report("colo-compare primary_in error");
+ }
+}
+
+/*
+ * Called from the main thread on the primary for packets
+ * arriving over the socket from the secondary.
+ */
+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+ CompareState *s = COLO_COMPARE(opaque);
+ int ret;
+
+ ret = net_fill_rstate(&s->sec_rs, buf, size);
+ if (ret == -1) {
+ qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+ error_report("colo-compare secondary_in error");
+ }
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+ GMainContext *worker_context;
+ GMainLoop *compare_loop;
+ CompareState *s = opaque;
+
+ worker_context = g_main_context_new();
+
+ qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
+ compare_pri_chr_in, NULL, s, worker_context);
+ qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
+ compare_sec_chr_in, NULL, s, worker_context);
+
+ compare_loop = g_main_loop_new(worker_context, FALSE);
+
+ g_main_loop_run(compare_loop);
+
+ g_main_loop_unref(compare_loop);
+ g_main_context_unref(worker_context);
+ return NULL;
+}
+
static char *compare_get_pri_indev(Object *obj, Error **errp)
{
CompareState *s = COLO_COMPARE(obj);
@@ -227,6 +407,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
if (packet_enqueue(s, PRIMARY_IN)) {
trace_colo_compare_main("primary: unsupported packet in");
compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+ } else {
+ /* compare connection */
+ g_queue_foreach(&s->conn_list, colo_compare_connection, s);
}
}
@@ -236,6 +419,9 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
if (packet_enqueue(s, SECONDARY_IN)) {
trace_colo_compare_main("secondary: unsupported packet in");
+ } else {
+ /* compare connection */
+ g_queue_foreach(&s->conn_list, colo_compare_connection, s);
}
}
@@ -294,12 +480,34 @@ static int find_and_check_chardev(CharDriverState **chr,
}
/*
+ * Check old packet regularly so it can watch for any packets
+ * that the secondary hasn't produced equivalents of.
+ */
+static void check_old_packet_regular(void *opaque)
+{
+ CompareState *s = opaque;
+
+ timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+ REGULAR_PACKET_CHECK_MS);
+ /* if have old packet we will notify checkpoint */
+ /*
+ * TODO: Make timer handler run in compare thread
+ * like qemu_chr_add_handlers_full.
+ */
+ qemu_mutex_lock(&s->timer_check_lock);
+ colo_old_packet_check(s);
+ qemu_mutex_unlock(&s->timer_check_lock);
+}
+
+/*
* Called from the main thread on the primary
* to setup colo-compare.
*/
static void colo_compare_complete(UserCreatable *uc, Error **errp)
{
CompareState *s = COLO_COMPARE(uc);
+ char thread_name[64];
+ static int compare_id;
if (!s->pri_indev || !s->sec_indev || !s->outdev) {
error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -335,12 +543,25 @@ static void colo_compare_complete(UserCreatable *uc,
Error **errp)
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
g_queue_init(&s->conn_list);
+ qemu_mutex_init(&s->timer_check_lock);
s->connection_track_table = g_hash_table_new_full(connection_key_hash,
connection_key_equal,
g_free,
connection_destroy);
+ sprintf(thread_name, "colo-compare %d", compare_id);
+ qemu_thread_create(&s->thread, thread_name,
+ colo_compare_thread, s,
+ QEMU_THREAD_JOINABLE);
+ compare_id++;
+
+ /* A regular timer to kick any packets that the secondary doesn't match */
+ s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
+ check_old_packet_regular, s);
+ timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+ REGULAR_PACKET_CHECK_MS);
+
return;
}
@@ -382,6 +603,18 @@ static void colo_compare_finalize(Object *obj)
g_queue_free(&s->conn_list);
+ if (qemu_thread_is_self(&s->thread)) {
+ /* compare connection */
+ g_queue_foreach(&s->conn_list, colo_compare_connection, s);
+ qemu_thread_join(&s->thread);
+ }
+
+ if (s->timer) {
+ timer_del(s->timer);
+ }
+
+ qemu_mutex_destroy(&s->timer_check_lock);
+
g_free(s->pri_indev);
g_free(s->sec_indev);
g_free(s->outdev);
diff --git a/net/colo.c b/net/colo.c
index 13a8090..e517521 100644
--- a/net/colo.c
+++ b/net/colo.c
@@ -144,6 +144,7 @@ Packet *packet_new(const void *data, int size)
pkt->data = g_memdup(data, size);
pkt->size = size;
+ pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
return pkt;
}
diff --git a/net/colo.h b/net/colo.h
index c511bcd..9a7d5e0 100644
--- a/net/colo.h
+++ b/net/colo.h
@@ -17,6 +17,7 @@
#include "slirp/slirp.h"
#include "qemu/jhash.h"
+#include "qemu/timer.h"
#define HASHTABLE_MAX_SIZE 16384
@@ -40,6 +41,8 @@ typedef struct Packet {
};
uint8_t *transport_header;
int size;
+ /* Time of packet creation, in wall clock ms */
+ int64_t creation_ms;
} Packet;
typedef struct ConnectionKey {
diff --git a/trace-events b/trace-events
index 9b4186f..81de82c 100644
--- a/trace-events
+++ b/trace-events
@@ -144,6 +144,8 @@ colo_proxy_main(const char *chr) ": %s"
# net/colo-compare.c
colo_compare_main(const char *chr) ": %s"
+colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize,
const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s,
spkt size = %d, ip_src = %s, ip_dst = %s"
+colo_old_packet_check_found(int64_t old_time) "%" PRId64
### Guest events, keep at bottom
--
2.7.4
- [Qemu-devel] [PULL 00/27] Net patches, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 01/27] virtio-net: allow increasing rx queue size, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 02/27] net: hmp_host_net_remove: Del the -net option of the removed host_net, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 03/27] qemu-char: Add qemu_chr_add_handlers_full() for GMaincontext, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 05/27] net/colo.c: add colo.c to define and handle packet, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 04/27] colo-compare: introduce colo compare initialization, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 06/27] Jhash: add linux kernel jhashtable in qemu, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 07/27] colo-compare: track connection and enqueue packet, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 08/27] colo-compare: introduce packet comparison thread,
Jason Wang <=
- [Qemu-devel] [PULL 09/27] colo-compare: add TCP, UDP, ICMP packet comparison, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 10/27] filter-rewriter: introduce filter-rewriter initialization, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 11/27] filter-rewriter: track connection and parse packet, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 12/27] filter-rewriter: rewrite tcp packet to keep secondary connection, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 13/27] MAINTAINERS: add maintainer for COLO-proxy, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 14/27] docs: Add documentation for COLO-proxy, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 15/27] e1000: fix buliding complaint, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 16/27] tap: Allow specifying a bridge, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 18/27] e1000e: Flush all receive queues on receive enable, Jason Wang, 2016/09/26
- [Qemu-devel] [PULL 17/27] net: limit allocation in nc_sendv_compat, Jason Wang, 2016/09/26