wesnoth-cvs-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Wesnoth-cvs-commits] wesnoth/src network_worker.cpp


From: Guillaume Melquiond
Subject: [Wesnoth-cvs-commits] wesnoth/src network_worker.cpp
Date: Sat, 30 Oct 2004 04:52:15 -0400

CVSROOT:        /cvsroot/wesnoth
Module name:    wesnoth
Branch:         
Changes by:     Guillaume Melquiond <address@hidden>    04/10/30 08:46:29

Modified files:
        src            : network_worker.cpp 

Log message:
        A few changes to the network threading code. Don't rely too much on 
iterator validity property of associative containers; in fact, remove them and 
use a vector of pointed buffers to increase performance. Correctly use 
conditions: a mutex should not be freed just after it has been acquired through 
a condition wait; and don't send a condition with the mutex being hold (it kind 
of defeats the purpose of a condition). These changes require testing!

CVSWeb URLs:
http://savannah.gnu.org/cgi-bin/viewcvs/wesnoth/wesnoth/src/network_worker.cpp.diff?tr1=1.10&tr2=1.11&r1=text&r2=text

Patches:
Index: wesnoth/src/network_worker.cpp
diff -u wesnoth/src/network_worker.cpp:1.10 wesnoth/src/network_worker.cpp:1.11
--- wesnoth/src/network_worker.cpp:1.10 Fri Oct 29 22:03:46 2004
+++ wesnoth/src/network_worker.cpp      Sat Oct 30 08:46:28 2004
@@ -7,7 +7,7 @@
 #include <cerrno>
 #include <iostream>
 #include <map>
-#include <set>
+#include <vector>
 
 #define LOG_NW lg::info(lg::network)
 
@@ -16,21 +16,15 @@
 unsigned int buf_id = 0;
 
 struct buffer {
-       explicit buffer(TCPsocket sock) : id(buf_id++), sock(sock), 
processing_started(false)
-       {}
+       explicit buffer(TCPsocket sock) : sock(sock) {}
 
-       unsigned int id;
        TCPsocket sock;
        mutable std::vector<char> buf;
-       mutable bool processing_started;
 };
 
-bool operator<(const buffer& a, const buffer& b) {
-       return a.id < b.id;
-}
-
 bool managed = false;
-std::multiset<buffer> bufs;
+typedef std::vector< buffer * > buffer_set;
+buffer_set bufs;
 
 enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERROR };
 typedef std::map<TCPsocket,SOCKET_STATE> socket_state_map;
@@ -46,48 +40,47 @@
        LOG_NW << "thread started...\n";
        for(;;) {
 
-               std::multiset<buffer>::iterator itor;
-               socket_state_map::iterator lock_it;
+               buffer *sent_buf = NULL;
 
                {
                        const threading::lock lock(*global_mutex);
 
-                       for(itor = bufs.begin(); itor != bufs.end(); ++itor) {
-                               if(itor->processing_started) {
-                                       continue;
-                               }
-
-                               LOG_NW << "thread found a buffer...\n";
+                       for(;;) {
 
-                               lock_it = sockets_locked.find(itor->sock);
-                               assert(lock_it != sockets_locked.end());
-                               if(lock_it->second == SOCKET_READY) {
-                                       //some implementations don't allow 
modification of items
-                                       //in a set, so we fix this with a 
const_cast
-                                       itor->processing_started = true;
-                                       lock_it->second = SOCKET_LOCKED;
-                                       break;
+                               buffer_set::iterator itor = bufs.begin(), 
itor_end = bufs.end();
+                               for(; itor != itor_end; ++itor) {
+                                       socket_state_map::iterator lock_it = 
sockets_locked.find((*itor)->sock);
+                                       assert(lock_it != sockets_locked.end());
+                                       if(lock_it->second == SOCKET_READY) {
+                                               lock_it->second = SOCKET_LOCKED;
+                                               break;
+                                       }
                                }
-                       }
 
-                       if(itor == bufs.end()) {
-                               if(managed == false) {
-                                       LOG_NW << "worker thread exiting...\n";
-                                       return 0;
-                               }
+                               if(itor == itor_end) {
+                                       if(managed == false) {
+                                               LOG_NW << "worker thread 
exiting...\n";
+                                               return 0;
+                                       }
 
-                               cond->wait(*global_mutex);
-                               LOG_NW << "thread couldn't find a buffer...\n";
-                               continue;
+                                       cond->wait(*global_mutex); // 
temporarily release the mutex and wait for a buffer
+                                       continue;
+                               } else {
+                                       sent_buf = *itor;
+                                       bufs.erase(itor);
+                                       break; // a buffer has been found
+                               }
                        }
                }
 
+               LOG_NW << "thread found a buffer...\n";
+
                SOCKET_STATE result = SOCKET_READY;
 
-               std::vector<char>& v = itor->buf;
-               for(size_t upto = 0; result != SOCKET_ERROR && upto < v.size(); 
) {
-                       const int bytes_to_send = int(v.size() - upto);
-                       const int res = 
SDLNet_TCP_Send(itor->sock,&v[upto],bytes_to_send);
+               std::vector<char> &v = sent_buf->buf;
+               for(size_t upto = 0, size = v.size(); result != SOCKET_ERROR && 
upto < size; ) {
+                       const int bytes_to_send = int(size - upto);
+                       const int res = SDLNet_TCP_Send(sent_buf->sock, 
&v[upto], bytes_to_send);
                        if(res < 0 || res != bytes_to_send && errno != EAGAIN) {
                                result = SOCKET_ERROR;
                        } else {
@@ -99,15 +92,16 @@
 
                {
                        const threading::lock lock(*global_mutex);
-                       bufs.erase(itor);
+                       socket_state_map::iterator lock_it = 
sockets_locked.find(sent_buf->sock);
+                       assert(lock_it != sockets_locked.end());
                        lock_it->second = result;
                        if(result == SOCKET_ERROR) {
                                ++socket_errors;
                        }
                }
+               delete sent_buf;
        }
-
-       return 0;
+       // unreachable
 }
 
 }
@@ -136,8 +130,8 @@
                        managed = false;
                        sockets_locked.clear();
                        socket_errors = 0;
-                       cond->notify_all();
                }
+               cond->notify_all();
 
                for(std::vector<threading::thread*>::const_iterator i = 
threads.begin(); i != threads.end(); ++i) {
                        LOG_NW << "waiting for thread " << int(i - 
threads.begin()) << " to exit...\n";
@@ -158,14 +152,17 @@
 
 void queue_data(TCPsocket sock, std::vector<char>& buf)
 {
-       const threading::lock lock(*global_mutex);
-
        LOG_NW << "queuing " << buf.size() << " bytes of data...\n";
 
-       const std::multiset<buffer>::iterator i = bufs.insert(buffer(sock));
-       i->buf.swap(buf);
+       {
+               const threading::lock lock(*global_mutex);
+
+               buffer *queued_buf = new buffer(sock);
+               queued_buf->buf.swap(buf);
+               bufs.push_back(queued_buf);
 
-       
sockets_locked.insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY));
+               
sockets_locked.insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY));
+       }
 
        cond->notify_one();
 }
@@ -185,15 +182,17 @@
                        if(lock_it != sockets_locked.end()) {
                                sockets_locked.erase(lock_it);
                        }
-
-                       std::multiset<buffer>::iterator i = bufs.begin();
-                       while(i != bufs.end()) {
-                               if(i->sock == sock) {
-                                       bufs.erase(i++);
-                               } else {
-                                       ++i;
-                               }
-                       }
+
+                       size_t size = bufs.size();
+                       buffer_set new_bufs;
+                       new_bufs.reserve(size);
+                       for(buffer_set::iterator i = bufs.begin(), i_end = 
bufs.end(); i != i_end; ++i) {
+                               if ((*i)->sock == sock)
+                                       delete *i;
+                               else
+                                       new_bufs.push_back(*i);
+                       }
+                       bufs.swap(new_bufs);
 
                        break;
                }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]