qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enque


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
Date: Wed, 30 Mar 2016 11:36:41 +0100
User-agent: Mutt/1.5.24 (2015-08-30)

* Zhang Chen (address@hidden) wrote:
> In this patch we use kernel jhash table to track
> connection, and then enqueue net packet like this:
> 
> + CompareState ++
> |               |
> +---------------+   +---------------+         +---------------+
> |conn list      +--->conn           +--------->conn           |
> +---------------+   +---------------+         +---------------+
> |               |     |           |             |          |
> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
>                       |           |             |          |
>                   +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
>                       |           |             |          |
>                   +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
> 
> Signed-off-by: Zhang Chen <address@hidden>
> Signed-off-by: Li Zhijian <address@hidden>
> Signed-off-by: Wen Congyang <address@hidden>
> ---
>  include/qemu/jhash.h |  59 ++++++++++
>  net/colo-compare.c   | 324 
> ++++++++++++++++++++++++++++++++++++++++++++++++++-
>  2 files changed, 380 insertions(+), 3 deletions(-)
>  create mode 100644 include/qemu/jhash.h
> 
> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
> new file mode 100644
> index 0000000..8a8ff0f
> --- /dev/null
> +++ b/include/qemu/jhash.h
> @@ -0,0 +1,59 @@
> +/* jhash.h: Jenkins hash support.
> +  *
> +  * Copyright (C) 2006. Bob Jenkins (address@hidden)
> +  *
> +  * http://burtleburtle.net/bob/hash/
> +  *
> +  * These are the credits from Bob's sources:
> +  *
> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
> +  *
> +  * These are functions for producing 32-bit hashes for hash table lookup.
> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
> +  * are externally useful functions.  Routines to test the hash are included
> +  * if SELF_TEST is defined.  You can use this free for any purpose.It's in
> +  * the public domain.  It has no warranty.
> +  *
> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (address@hidden)
> +  *
> +  * I've modified Bob's hash to be useful in the Linux kernel, and
> +  * any bugs present are my fault.
> +  * Jozsef
> +  */
> +
> +#ifndef QEMU_JHASH_H__
> +#define QEMU_JHASH_H__
> +
> +#include "qemu/bitops.h"
> +
> +/*
> + * hashtable related is copied from linux kernel jhash
> + */
> +
> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
> +#define __jhash_mix(a, b, c)                \
> +{                                           \
> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
> +    a -= c;  a ^= rol32(c, 16); c += b;     \
> +    b -= a;  b ^= rol32(a, 19); a += c;     \
> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
> +}
> +
> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
> +#define __jhash_final(a, b, c)  \
> +{                               \
> +    c ^= b; c -= rol32(b, 14);  \
> +    a ^= c; a -= rol32(c, 11);  \
> +    b ^= a; b -= rol32(a, 25);  \
> +    c ^= b; c -= rol32(b, 16);  \
> +    a ^= c; a -= rol32(c, 4);   \
> +    b ^= a; b -= rol32(a, 14);  \
> +    c ^= b; c -= rol32(b, 24);  \
> +}
> +
> +/* An arbitrary initial parameter */
> +#define JHASH_INITVAL           0xdeadbeef
> +
> +#endif /* QEMU_JHASH_H__ */
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 62c66df..0bb5a51 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -20,15 +20,22 @@
>  #include "net/queue.h"
>  #include "sysemu/char.h"
>  #include "qemu/sockets.h"
> +#include <sys/sysinfo.h>
> +#include "slirp/slirp.h"
> +#include "qemu/jhash.h"
> +#include <sys/sysinfo.h>
>  
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>  
>  #define COMPARE_READ_LEN_MAX NET_BUFSIZE
> +#define PAGE_SIZE 4096
> +#define ETH_HLEN 14

PAGE_SIZE is not just 4k; use one of the system headers.
Also, don't define ETH_HLEN - include net/eth.h

>  static QTAILQ_HEAD(, CompareState) net_compares =
>         QTAILQ_HEAD_INITIALIZER(net_compares);
> +static ssize_t hashtable_max_size;
>  
>  typedef struct ReadState {
>      int state; /* 0 = getting length, 1 = getting data */
> @@ -37,6 +44,28 @@ typedef struct ReadState {
>      uint8_t buf[COMPARE_READ_LEN_MAX];
>  } ReadState;
>  
> +/*
> +  + CompareState ++
> +  |               |
> +  +---------------+   +---------------+         +---------------+
> +  |conn list      +--->conn           +--------->conn           |
> +  +---------------+   +---------------+         +---------------+
> +  |               |     |           |             |          |
> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +*/
>  typedef struct CompareState {
>      Object parent;
>  
> @@ -49,8 +78,268 @@ typedef struct CompareState {
>      QTAILQ_ENTRY(CompareState) next;
>      ReadState pri_rs;
>      ReadState sec_rs;
> +
> +    /* connection list: the connections belonged to this NIC could be found
> +     * in this list.
> +     * element type: Connection
> +     */
> +    GQueue conn_list;
> +    QemuMutex conn_list_lock; /* to protect conn_list */
> +    /* hashtable to save connection */
> +    GHashTable *connection_track_table;
> +    /* to save unprocessed_connections */
> +    GQueue unprocessed_connections;
> +    /* proxy current hash size */
> +    ssize_t hashtable_size;
>  } CompareState;
>  
> +typedef struct Packet {
> +    void *data;
> +    union {
> +        uint8_t *network_layer;
> +        struct ip *ip;
> +    };
> +    uint8_t *transport_layer;
> +    int size;
> +    CompareState *s;
> +} Packet;
> +
> +typedef struct ConnectionKey {
> +    /* (src, dst) must be grouped, in the same way than in IP header */
> +    struct in_addr src;
> +    struct in_addr dst;
> +    uint16_t src_port;
> +    uint16_t dst_port;
> +    uint8_t ip_proto;
> +} QEMU_PACKED ConnectionKey;

Someone will want IPv6 at some point, so think about that, but not
too worried for now.

> +typedef struct Connection {
> +    QemuMutex list_lock;
> +    /* connection primary send queue: element type: Packet */
> +    GQueue primary_list;
> +    /* connection secondary send queue: element type: Packet */
> +    GQueue secondary_list;
> +    /* flag to enqueue unprocessed_connections */
> +    bool processing;
> +    int ip_proto;

in ConnectionKey you use uint8_t for ip_proto  - should
be consistent?

> +} Connection;
> +
> +enum {
> +    PRIMARY_IN = 0,
> +    SECONDARY_IN,
> +};
> +
> +static void packet_destroy(void *opaque, void *user_data);
> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int 
> size);
> +
> +static uint32_t connection_key_hash(const void *opaque)
> +{
> +    const ConnectionKey *key = opaque;
> +    uint32_t a, b, c;
> +
> +    /* Jenkins hash */
> +    a = b = c = JHASH_INITVAL + sizeof(*key);
> +    a += key->src.s_addr;
> +    b += key->dst.s_addr;
> +    c += (key->src_port | key->dst_port << 16);
> +    __jhash_mix(a, b, c);
> +
> +    a += key->ip_proto;
> +    __jhash_final(a, b, c);
> +
> +    return c;
> +}
> +
> +static int connection_key_equal(const void *opaque1, const void *opaque2)
> +{
> +    return memcmp(opaque1, opaque2, sizeof(ConnectionKey)) == 0;
> +}
> +
> +/*
> + *  initialize connecon_key for packet
                        ^ti

> + *  Return 0 on success, if return 1 the pkt will be sent later
> + */
> +static int connection_key_init(Packet *pkt, ConnectionKey *key)
> +{
> +    int network_length;
> +    uint8_t *data = pkt->data;
> +    uint16_t l3_proto;
> +    uint32_t tmp_ports;
> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
> +
> +    pkt->network_layer = data + ETH_HLEN;
> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
> +    if (l3_proto != ETH_P_IP) {
> +        return 1;
> +    }
> +
> +    network_length = pkt->ip->ip_hl * 4;
> +    pkt->transport_layer = pkt->network_layer + network_length;

Have we checked that this is valid - this is guest/external network
data, so is that 'network_length' actually pointing to valid data
or off the end of the packet?

> +    key->ip_proto = pkt->ip->ip_p;
> +    key->src = pkt->ip->ip_src;
> +    key->dst = pkt->ip->ip_dst;
> +
> +    switch (key->ip_proto) {
> +    case IPPROTO_TCP:
> +    case IPPROTO_UDP:
> +    case IPPROTO_DCCP:
> +    case IPPROTO_ESP:
> +    case IPPROTO_SCTP:
> +    case IPPROTO_UDPLITE:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
> +        key->src_port = tmp_ports & 0xffff;
> +        key->dst_port = tmp_ports >> 16;

Do these need ntohs - or do you want to keep them in network
order?  In my world on your older code I added ntohs's because
it made debugging make a lot more sense when you print out src_port/dst_port.

> +        break;
> +    case IPPROTO_AH:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
> +        key->src_port = tmp_ports & 0xffff;
> +        key->dst_port = tmp_ports >> 16;
> +        break;
> +    default:

   Do you need to set src_port/dst_port here (to 0 ?? ) ?

> +        break;
> +    }
> +
> +    return 0;
> +}
> +
> +static Connection *connection_new(ConnectionKey *key)
> +{
> +    Connection *conn = g_slice_new(Connection);
> +
> +    qemu_mutex_init(&conn->list_lock);
> +    conn->ip_proto = key->ip_proto;
> +    conn->processing = false;
> +    g_queue_init(&conn->primary_list);
> +    g_queue_init(&conn->secondary_list);
> +
> +    return conn;
> +}
> +
> +/*
> + * Clear hashtable, stop this hash growing really huge
> + */
> +static void connection_hashtable_reset(CompareState *s)
> +{
> +    s->hashtable_size = 0;
> +    g_hash_table_remove_all(s->connection_track_table);
> +}
> +
> +/* if not found, creata a new connection and add to hash table */

 Typo                    ^

> +static Connection *connection_get(CompareState *s, ConnectionKey *key)
> +{
> +    /* FIXME: protect connection_track_table */
> +    Connection *conn = g_hash_table_lookup(s->connection_track_table, key);
> +
> +    if (conn == NULL) {
> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
> +
> +        conn = connection_new(key);
> +
> +        s->hashtable_size++;
> +        if (s->hashtable_size > hashtable_max_size) {
> +            error_report("colo proxy connection hashtable full, clear it");
> +            connection_hashtable_reset(s);
> +            /* TODO:clear conn_list */

> +        } else {

This feels wrong; should this actually be in an else? If you've just cleared
the hash table, then you probably want to add this new connection to the empty
table? (And for example at the moment the 'new_key' is not used if we go down
this if).

> +            g_hash_table_insert(s->connection_track_table, new_key, conn);
> +        }
> +    }
> +
> +     return conn;
> +}
> +
> +static void connection_destroy(void *opaque)
> +{
> +    Connection *conn = opaque;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->primary_list);
> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->secondary_list);
> +    qemu_mutex_unlock(&conn->list_lock);
> +    qemu_mutex_destroy(&conn->list_lock);
> +    g_slice_free(Connection, conn);
> +}
> +
> +static Packet *packet_new(CompareState *s, const void *data,
> +                              int size, ConnectionKey *key)
> +{
> +    Packet *pkt = g_slice_new(Packet);
> +
> +    pkt->data = g_memdup(data, size);
> +    pkt->size = size;
> +    pkt->s = s;
> +
> +    if (connection_key_init(pkt, key)) {
> +        packet_destroy(pkt, NULL);
> +        pkt = NULL;
> +    }
> +
> +    return pkt;
> +}
> +
> +static int packet_enqueue(CompareState *s, int mode)
> +{
> +    ConnectionKey key = {{ 0 } };
> +    Packet *pkt = NULL;
> +    Connection *conn;
> +
> +    /* arp packet will be sent */

Can you add some more detail about that - what do the return
values of packet_enqueue mean; what happens to things like IPv6 or ARP packets?

> +    if (mode == PRIMARY_IN) {
> +        pkt = packet_new(s, s->pri_rs.buf, s->pri_rs.packet_len, &key);
> +    } else {
> +        pkt = packet_new(s, s->sec_rs.buf, s->sec_rs.packet_len, &key);
> +    }
> +    if (!pkt) {
> +        return -1;
> +    }
> +
> +    conn = connection_get(s, &key);
> +    if (!conn->processing) {
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_push_tail(&s->conn_list, conn);
> +        qemu_mutex_unlock(&s->conn_list_lock);
> +        conn->processing = true;
> +    }
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    if (mode == PRIMARY_IN) {
> +        g_queue_push_tail(&conn->primary_list, pkt);
> +    } else {
> +        g_queue_push_tail(&conn->secondary_list, pkt);
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +
> +    return 0;
> +}
> +
> +static void packet_destroy(void *opaque, void *user_data)
> +{
> +    Packet *pkt = opaque;
> +
> +    g_free(pkt->data);
> +    g_slice_free(Packet, pkt);
> +}
> +
> +static inline void colo_flush_connection(void *opaque, void *user_data)
> +{

Is this used?

> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +        /* FIXME: destroy pkt ?*/
> +    }
> +    while (!g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->secondary_list);
> +        packet_destroy(pkt, NULL);
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int 
> size)
>  {
>      int ret = 0;
> @@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>  
>      ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>      if (ret == 1) {
> -        /* FIXME: enqueue to primary packet list */
> -        compare_chr_send(s->chr_out, buf, size);
> +        if (packet_enqueue(s, PRIMARY_IN)) {
> +            error_report("primary: unsupported packet in");

Is this for non-IP packets?  If so you don't want an error_report - because 
non-IP are
quite common; a trace would be useful giving the packet type etc

> +            compare_chr_send(s->chr_out, buf, size);
> +        }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>      }
> @@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const 
> uint8_t *buf, int size)
>  
>      ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>      if (ret == 1) {
> -        /* TODO: enqueue to secondary packet list*/
> +        if (packet_enqueue(s, SECONDARY_IN)) {
> +            error_report("secondary: unsupported packet in");
> +        }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>      }
> @@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char 
> *value, Error **errp)
>  static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
> +    struct sysinfo si;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, 
> Error **errp)
>  
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
>  
> +    g_queue_init(&s->conn_list);
> +    qemu_mutex_init(&s->conn_list_lock);
> +
> +    s->hashtable_size = 0;
> +    /*
> +     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> +     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
> +     */
> +    sysinfo(&si);
> +    hashtable_max_size = si.totalram / 16384;
> +    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
> +        hashtable_max_size = 16384;
> +    }
> +    if (hashtable_max_size < 32) {
> +        hashtable_max_size = 32;
> +    }
> +    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */

Make this a lot simpler; just pick a size and if it's a problem then we'll worry
about it later, or make it an option on the filter if you want it changeable.

> +    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
> +                                                      connection_key_equal,
> +                                                      g_free,
> +                                                      connection_destroy);
> +
>      return;
>  
>  out:
> @@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, 
> void *data)
>      if (!QTAILQ_EMPTY(&net_compares)) {
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
> +    qemu_mutex_destroy(&s->conn_list_lock);
>  }
>  
>  static void colo_compare_init(Object *obj)
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]