00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "nnreactor.h"
00021 #include "nnlog.h"
00022 #include "platform.h"
00023 #include "util.h"
00024 #include <algorithm>
00025 #include <iostream>
00026 #include <queue>
00027 #include <assert.h>
00028
00029
00030
00031 inline void fixtime(struct timeval & timeout, long ms, bool & timeout_set)
00032 {
00033 struct timeval now;
00034 gettimeofday(&now, 0);
00035 if((! timeout_set) || (difftime(timeout, now) > ms))
00036 {
00037 timeout.tv_sec = now.tv_sec + (ms / 1000);
00038 timeout.tv_usec = now.tv_usec + (ms % 1000) * 1000;
00039 if(timeout.tv_usec >= 1000000)
00040 {
00041 timeout.tv_sec += 1;
00042 timeout.tv_usec -= 1000000;
00043 }
00044 timeout_set = true;
00045 }
00046 }
00047
00048 #ifndef DOXYGEN_UNDOCUMENTED
00049 typedef std::pair<struct timeval, NewNet::RefPtr<NewNet::Reactor::Timeout::Callback> > TimeoutItem;
00050 struct NewNet::Reactor::Timeouts
00051 {
00052 std::vector<TimeoutItem> timeouts;
00053 };
00054 #endif
00055
00056
00057
00058
00059 static bool
00060 checkTimeouts(std::vector<TimeoutItem> & timeouts, struct timeval & timeout, bool & timeout_set)
00061 {
00062 bool retVal = false;
00063
00064 struct timeval now;
00065 gettimeofday(&now, 0);
00066
00067 std::vector<TimeoutItem>::iterator it, end = timeouts.end();
00068 std::queue<std::vector<TimeoutItem>::iterator> old;
00069
00070 for(it = timeouts.begin(); it != end; ++it)
00071 {
00072
00073 if(timercmp(&now, &(*it).first, >=))
00074 {
00075
00076 gettimeofday(&now, 0);
00077 unsigned long diff = difftime(now, (*it).first);
00078
00079
00080 (*it).second->operator()(diff);
00081
00082
00083 old.push(it);
00084
00085 retVal = true;
00086 }
00087 else if((! timeout_set) || (timercmp(&(*it).first, &timeout, <)))
00088 {
00089
00090
00091 timeout.tv_sec = (*it).first.tv_sec;
00092 timeout.tv_usec = (*it).first.tv_usec;
00093 timeout_set = true;
00094 }
00095 }
00096
00097
00098 while(! old.empty())
00099 {
00100 timeouts.erase(old.front());
00101 old.pop();
00102 }
00103
00104 return retVal;
00105 }
00106
00107 NewNet::Reactor::Reactor()
00108 {
00109 m_Timeouts = new Timeouts;
00110 }
00111
00112 #ifndef DOXYGEN_UNDOCUMENTED
00113 NewNet::Reactor::~Reactor()
00114 {
00115 delete m_Timeouts;
00116 }
00117 #endif // DOXYGEN_UNDOCUMENTED
00118
00119 void NewNet::Reactor::add(Socket * socket)
00120 {
00121 if(socket->reactor())
00122 {
00123 if(socket->reactor() == this)
00124 return;
00125
00126
00127
00128
00129 m_Sockets.push_back(socket);
00130 socket->reactor()->remove(socket);
00131 }
00132 else
00133 m_Sockets.push_back(socket);
00134 socket->setReactor(this);
00135 }
00136
00137 void NewNet::Reactor::remove(Socket * socket)
00138 {
00139
00140 assert(socket->reactor() == this);
00141
00142 socket->setReactor(0);
00143 m_Sockets.erase(std::find(m_Sockets.begin(), m_Sockets.end(), socket));
00144 }
00145
00146 void NewNet::Reactor::run()
00147 {
00148 int nfds = 0;
00149 fd_set readfds, writefds, exceptfds;
00150
00151 m_StopReactor = false;
00152 while((! m_StopReactor) && (! (m_Timeouts->timeouts.empty() && m_Sockets.empty())))
00153 {
00154
00155 bool timeout_set = false;
00156 struct timeval timeout;
00157
00158
00159 nfds = 0;
00160 FD_ZERO(&readfds);
00161 FD_ZERO(&writefds);
00162 FD_ZERO(&exceptfds);
00163
00164
00165
00166 std::vector<NewNet::RefPtr<NewNet::Socket> > sockets(m_Sockets);
00167
00168
00169 std::vector<NewNet::RefPtr<NewNet::Socket> >::iterator it, end = sockets.end();
00170 for(it = sockets.begin(); it != end; ++it)
00171 {
00172
00173 NewNet::Socket * sock = *it;
00174 int fd = sock->descriptor();
00175
00176 if(fd == -1)
00177 continue;
00178
00179 long n;
00180
00181 switch(sock->socketState())
00182 {
00183
00184 case NewNet::Socket::SocketUninitialized:
00185 case NewNet::Socket::SocketDisconnecting:
00186 case NewNet::Socket::SocketDisconnected:
00187 case NewNet::Socket::SocketException:
00188 break;
00189
00190
00191 case NewNet::Socket::SocketListening:
00192 FD_SET(fd, &readfds);
00193 nfds = std::max(nfds, fd + 1);
00194 break;
00195
00196
00197 case NewNet::Socket::SocketConnecting:
00198 FD_SET(fd, &writefds);
00199 nfds = std::max(nfds, fd + 1);
00200 break;
00201
00202
00203
00204 case NewNet::Socket::SocketConnected:
00205
00206 n = (! sock->downRateLimiter()) ? 0 : sock->downRateLimiter()->nextWindow();
00207 if(n == 0)
00208 FD_SET(fd, &readfds);
00209 else
00210 {
00211 NNLOG("newnet.net.debug", "Download limiter for socket %i recommends %li ms sleep.", fd, n);
00212 fixtime(timeout, n, timeout_set);
00213 }
00214
00215
00216
00217 if(sock->dataWaiting())
00218 {
00219 n = (! sock->upRateLimiter()) ? 0 : sock->upRateLimiter()->nextWindow();
00220 if(n == 0)
00221 FD_SET(fd, &writefds);
00222 else
00223 {
00224 NNLOG("newnet.net.debug", "Upload rate limiter for socket %i reports next window in %li ms", fd, n);
00225 fixtime(timeout, n, timeout_set);
00226 }
00227 }
00228
00229
00230 FD_SET(fd, &exceptfds);
00231
00232 nfds = std::max(nfds, fd + 1);
00233 break;
00234 }
00235 }
00236
00237 if(checkTimeouts(m_Timeouts->timeouts, timeout, timeout_set))
00238 continue;
00239
00240 if(timeout_set)
00241 {
00242
00243
00244 struct timeval now;
00245 gettimeofday(&now, 0);
00246 timeout.tv_sec -= now.tv_sec;
00247 timeout.tv_usec -= now.tv_usec;
00248 if(timeout.tv_usec < 0)
00249 {
00250 timeout.tv_sec -= 1;
00251 timeout.tv_usec += 1000000;
00252 }
00253 if(timeout.tv_sec < 0)
00254 {
00255 timeout.tv_sec = 0;
00256 timeout.tv_usec = 0;
00257 }
00258 }
00259
00260
00261 if(nfds == 0)
00262 {
00263
00264
00265 if(! timeout_set)
00266 break;
00267
00268 NNLOG("newnet.net.debug", "Sleeping %li ms until next timeout.", (timeout.tv_sec * 1000) + (timeout.tv_usec / 1000));
00269
00270 #ifndef WIN32
00271 if(sleep(timeout.tv_sec) == 0)
00272 usleep(timeout.tv_usec);
00273 #else
00274 Sleep(timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
00275 #endif // ! WIN32
00276 continue;
00277 }
00278
00279
00280 int r = 0;
00281 if(timeout_set)
00282 {
00283 NNLOG("newnet.net.debug", "Waiting at most %li ms until one of max. %i sockets wakes up.", (timeout.tv_sec * 1000) + (timeout.tv_usec / 1000), nfds);
00284 r = select(nfds, &readfds, &writefds, &exceptfds, &timeout);
00285 }
00286 else
00287 {
00288 NNLOG("newnet.net.debug", "Waiting indefinitely until one of max. %i sockets wakes up.", nfds);
00289 r = select(nfds, &readfds, &writefds, &exceptfds, 0);
00290 }
00291
00292 if(r == -1)
00293 continue;
00294
00295 for(it = sockets.begin(); (it != end) && (r > 0); ++it)
00296 {
00297 NewNet::Socket * sock = *it;
00298 int fd = sock->descriptor();
00299 if(fd == -1)
00300 continue;
00301
00302
00303 int state = 0;
00304 if(FD_ISSET(fd, &readfds))
00305 state |= NewNet::Socket::StateReceive;
00306 if(FD_ISSET(fd, &writefds))
00307 state |= NewNet::Socket::StateSend;
00308 if(FD_ISSET(fd, &exceptfds))
00309 state |= NewNet::Socket::StateException;
00310 sock->setReadyState(state);
00311
00312
00313 if(state)
00314 {
00315 --r;
00316 sock->process();
00317 }
00318 }
00319 }
00320 }
00321
00322 void
00323 NewNet::Reactor::stop()
00324 {
00325 m_StopReactor = true;
00326 }
00327
00328 NewNet::Reactor::Timeout::Callback *
00329 NewNet::Reactor::addTimeout(long msec, Timeout::Callback * callback)
00330 {
00331
00332 struct timeval tv;
00333 gettimeofday(&tv, 0);
00334 tv.tv_sec += (msec / 1000);
00335 tv.tv_usec += (msec % 1000) * 1000;
00336 if(tv.tv_usec >= 1000000)
00337 {
00338 tv.tv_sec += 1;
00339 tv.tv_usec -= 1000000;
00340 }
00341
00342
00343 m_Timeouts->timeouts.push_back(TimeoutItem(tv, callback));
00344
00345
00346 return callback;
00347 }