[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v1 04/14] net: port socket to GSource
From: |
Liu Ping Fan |
Subject: |
[Qemu-devel] [PATCH v1 04/14] net: port socket to GSource |
Date: |
Tue, 7 May 2013 13:46:52 +0800 |
From: Liu Ping Fan <address@hidden>
Port NetSocketState onto NetClientSource. The only thing specail is that
owning to the socket's state machine changes, we need to change the handler.
We implement that by destroy the old NetClientSource and attach a new one
with NetSocketState.
Signed-off-by: Liu Ping Fan <address@hidden>
---
net/socket.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++++----------
1 files changed, 161 insertions(+), 33 deletions(-)
diff --git a/net/socket.c b/net/socket.c
index 396dc8c..d52991d 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -31,6 +31,8 @@
#include "qemu/option.h"
#include "qemu/sockets.h"
#include "qemu/iov.h"
+#include "util/event_gsource.h"
+
typedef struct NetSocketState {
NetClientState nc;
@@ -42,13 +44,15 @@ typedef struct NetSocketState {
unsigned int send_index; /* number of bytes sent (only SOCK_STREAM) */
uint8_t buf[4096];
struct sockaddr_in dgram_dst; /* contains inet host and port destination
iff connectionless (SOCK_DGRAM) */
- IOHandler *send_fn; /* differs between SOCK_STREAM/SOCK_DGRAM */
bool read_poll; /* waiting to receive data? */
bool write_poll; /* waiting to transmit data? */
+ EventGSource *nsrc;
} NetSocketState;
-static void net_socket_accept(void *opaque);
static void net_socket_writable(void *opaque);
+static gboolean net_socket_listen_handler(gpointer data);
+static gboolean net_socket_establish_handler(gpointer data);
+
/* Only read packets from socket when peer can receive them */
static int net_socket_can_send(void *opaque)
@@ -58,25 +62,14 @@ static int net_socket_can_send(void *opaque)
return qemu_can_send_packet(&s->nc);
}
-static void net_socket_update_fd_handler(NetSocketState *s)
-{
- qemu_set_fd_handler2(s->fd,
- s->read_poll ? net_socket_can_send : NULL,
- s->read_poll ? s->send_fn : NULL,
- s->write_poll ? net_socket_writable : NULL,
- s);
-}
-
static void net_socket_read_poll(NetSocketState *s, bool enable)
{
s->read_poll = enable;
- net_socket_update_fd_handler(s);
}
static void net_socket_write_poll(NetSocketState *s, bool enable)
{
s->write_poll = enable;
- net_socket_update_fd_handler(s);
}
static void net_socket_writable(void *opaque)
@@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState
*nc, const uint8_t *buf,
return ret;
}
+static gushort socket_connecting_writable(void *opaque)
+{
+ return G_IO_OUT | G_IO_ERR;
+}
+
+static gushort socket_listen_readable(void *opaque)
+{
+ return G_IO_IN | G_IO_HUP | G_IO_ERR;
+}
+
+static gushort socket_establish_readable(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ /* rely on net_socket_send to handle err */
+ if (s->read_poll && net_socket_can_send(s)) {
+ return G_IO_IN | G_IO_HUP | G_IO_ERR;
+ }
+ return 0;
+}
+
+static gushort socket_establish_writable(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ if (s->write_poll) {
+ return G_IO_OUT | G_IO_HUP | G_IO_ERR;
+ }
+ return 0;
+}
+
+static gushort socket_dgram_readable(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ /* rely on net_socket_send_dgram to handle err */
+ if (s->read_poll && net_socket_can_send(s)) {
+ return G_IO_IN | G_IO_ERR;
+ }
+ return 0;
+}
+
+static gushort socket_dgram_writable(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ if (s->write_poll) {
+ return G_IO_OUT | G_IO_ERR;
+ }
+ return 0;
+}
+
+/* common handler for accept-established or connecting case */
static void net_socket_send(void *opaque)
{
NetSocketState *s = opaque;
@@ -159,8 +205,13 @@ static void net_socket_send(void *opaque)
eoc:
net_socket_read_poll(s, false);
net_socket_write_poll(s, false);
+ /* rely on this to tell the accept-established or connecting case */
if (s->listen_fd != -1) {
- qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+ event_source_release(s->nsrc);
+ s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler,
+ s);
+ s->nsrc->readable = socket_listen_readable;
+ s->nc.info->bind_ctx(&s->nc, NULL);
}
closesocket(s->fd);
@@ -231,6 +282,8 @@ static void net_socket_send_dgram(void *opaque)
/* end of connection */
net_socket_read_poll(s, false);
net_socket_write_poll(s, false);
+ /* for dgram err, removing it */
+ g_source_remove_poll(&s->nsrc->source, &s->nsrc->gfd);
return;
}
qemu_send_packet(&s->nc, s->buf, size);
@@ -331,6 +384,14 @@ static void net_socket_cleanup(NetClientState *nc)
closesocket(s->listen_fd);
s->listen_fd = -1;
}
+ event_source_release(s->nsrc);
+}
+
+static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx)
+{
+ NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
+
+ g_source_attach(&s->nsrc->source, ctx);
}
static NetClientInfo net_dgram_socket_info = {
@@ -338,8 +399,24 @@ static NetClientInfo net_dgram_socket_info = {
.size = sizeof(NetSocketState),
.receive = net_socket_receive_dgram,
.cleanup = net_socket_cleanup,
+ .bind_ctx = net_socket_bind_ctx,
};
+static gboolean net_socket_dgram_handler(gpointer data)
+{
+ EventGSource *nsrc = (EventGSource *)data;
+ NetSocketState *s = nsrc->opaque;
+
+ /* for err, unregister the handler */
+ if (nsrc->gfd.revents & (G_IO_IN | G_IO_ERR)) {
+ net_socket_send_dgram(s);
+ }
+ if (nsrc->gfd.revents & (G_IO_OUT | G_IO_ERR)) {
+ net_socket_writable(s);
+ }
+ return true;
+}
+
static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
const char *model,
const char *name,
@@ -393,8 +470,12 @@ static NetSocketState
*net_socket_fd_init_dgram(NetClientState *peer,
s->fd = fd;
s->listen_fd = -1;
- s->send_fn = net_socket_send_dgram;
+ s->nsrc = event_source_new(fd, net_socket_dgram_handler, s);
+ s->nsrc->readable = socket_dgram_readable;
+ s->nsrc->writable = socket_dgram_writable;
+ nc->info->bind_ctx(nc, NULL);
net_socket_read_poll(s, true);
+ net_socket_write_poll(s, true);
/* mcast: save bound address as dst */
if (is_connected) {
@@ -408,20 +489,31 @@ err:
return NULL;
}
-static void net_socket_connect(void *opaque)
-{
- NetSocketState *s = opaque;
- s->send_fn = net_socket_send;
- net_socket_read_poll(s, true);
-}
-
static NetClientInfo net_socket_info = {
.type = NET_CLIENT_OPTIONS_KIND_SOCKET,
.size = sizeof(NetSocketState),
.receive = net_socket_receive,
.cleanup = net_socket_cleanup,
+ .bind_ctx = net_socket_bind_ctx,
};
+static gboolean net_socket_connect_handler(gpointer data)
+{
+ EventGSource *nsrc = data;
+ NetSocketState *s = nsrc->opaque;
+
+ event_source_release(s->nsrc);
+ /* For error handle, delay to net_socket_establish_handler */
+ s->nsrc = event_source_new(s->fd, net_socket_establish_handler, s);
+ s->nsrc->readable = socket_establish_readable;
+ s->nsrc->writable = socket_establish_writable;
+ s->nc.info->bind_ctx(&s->nc, NULL);
+ net_socket_read_poll(s, true);
+ net_socket_write_poll(s, true);
+
+ return true;
+}
+
static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
const char *model,
const char *name,
@@ -440,9 +532,20 @@ static NetSocketState
*net_socket_fd_init_stream(NetClientState *peer,
s->listen_fd = -1;
if (is_connected) {
- net_socket_connect(s);
+ assert(!s->nsrc);
+ s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
+ s->nsrc->readable = socket_establish_readable;
+ s->nsrc->writable = socket_establish_writable;
+ nc->info->bind_ctx(nc, NULL);
+ net_socket_read_poll(s, true);
+ net_socket_write_poll(s, true);
} else {
- qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
+ assert(!s->nsrc);
+ s->nsrc = event_source_new(fd, net_socket_connect_handler, s);
+ s->nsrc->writable = socket_connecting_writable;
+ nc->info->bind_ctx(nc, NULL);
+ net_socket_read_poll(s, false);
+ net_socket_write_poll(s, true);
}
return s;
}
@@ -473,30 +576,52 @@ static NetSocketState *net_socket_fd_init(NetClientState
*peer,
return NULL;
}
-static void net_socket_accept(void *opaque)
+static gboolean net_socket_establish_handler(gpointer data)
{
- NetSocketState *s = opaque;
+ EventGSource *nsrc = (EventGSource *)data;
+ NetSocketState *s = nsrc->opaque;
+
+ if (nsrc->gfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) {
+ net_socket_send(s);
+ }
+ if ((nsrc->gfd.revents & (G_IO_OUT | G_IO_HUP | G_IO_ERR))) {
+ net_socket_writable(s);
+ }
+ return true;
+}
+
+static gboolean net_socket_listen_handler(gpointer data)
+{
+ EventGSource *nsrc = data;
+ NetSocketState *s = nsrc->opaque;
struct sockaddr_in saddr;
socklen_t len;
int fd;
- for(;;) {
- len = sizeof(saddr);
+ len = sizeof(saddr);
+ do {
fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
if (fd < 0 && errno != EINTR) {
- return;
+ return false;
} else if (fd >= 0) {
- qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
break;
}
- }
+ } while (true);
s->fd = fd;
s->nc.link_down = false;
- net_socket_connect(s);
+ /* prevent more than one connect req */
+ event_source_release(s->nsrc);
+ s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
+ s->nsrc->readable = socket_establish_readable;
+ s->nsrc->writable = socket_establish_writable;
+ s->nc.info->bind_ctx(&s->nc, NULL);
+ net_socket_read_poll(s, true);
snprintf(s->nc.info_str, sizeof(s->nc.info_str),
"socket: connection from %s:%d",
inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port));
+
+ return true;
}
static int net_socket_listen_init(NetClientState *peer,
@@ -542,7 +667,10 @@ static int net_socket_listen_init(NetClientState *peer,
s->listen_fd = fd;
s->nc.link_down = true;
- qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+ s->nsrc = event_source_new(fd, net_socket_listen_handler, s);
+ s->nsrc->readable = socket_listen_readable;
+ nc->info->bind_ctx(nc, NULL);
+
return 0;
}
--
1.7.4.4
- [Qemu-devel] [PATCH v1 00/14] port network layer onto glib, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 01/14] util: introduce gsource event abstraction, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 02/14] net: introduce bind_ctx to NetClientInfo, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 03/14] net: port vde onto GSource, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 04/14] net: port socket to GSource,
Liu Ping Fan <=
- [Qemu-devel] [PATCH v1 05/14] net: port tap onto GSource, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 06/14] net: port tap-win32 onto GSource, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 07/14] net: hub use lock to protect ports list, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 08/14] net: introduce lock to protect NetQueue, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 09/14] net: introduce lock to protect NetClientState's peer's access, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 10/14] net: make netclient re-entrant with refcnt, Liu Ping Fan, 2013/05/07
- [Qemu-devel] [PATCH v1 11/14] slirp: make timeout local, Liu Ping Fan, 2013/05/07