NewNet/nnreactor.cpp

00001 /*  NewNet - A networking framework in C++
00002     Copyright (C) 2006 Ingmar K. Steen (iksteen@gmail.com)
00003 
00004     This program is free software; you can redistribute it and/or modify
00005     it under the terms of the GNU General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or
00007     (at your option) any later version.
00008 
00009     This program is distributed in the hope that it will be useful,
00010     but WITHOUT ANY WARRANTY; without even the implied warranty of
00011     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012     GNU General Public License for more details.
00013 
00014     You should have received a copy of the GNU General Public License
00015     along with this program; if not, write to the Free Software
00016     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018  */
00019 
00020 #include "nnreactor.h"
00021 #include "nnlog.h"
00022 #include "platform.h"
00023 #include "util.h"
00024 #include <algorithm>
00025 #include <iostream>
00026 #include <queue>
00027 #include <assert.h>
00028 
00029 /* Update timeout to 'ms' miliseconds after the current time if that's
00030    sooner than the current timeout, or if no timeout has been set yet. */
00031 inline void fixtime(struct timeval & timeout, long ms, bool & timeout_set)
00032 {
00033   struct timeval now;
00034   gettimeofday(&now, 0);
00035   if((! timeout_set) || (difftime(timeout, now) > ms))
00036   {
00037     timeout.tv_sec = now.tv_sec + (ms / 1000);
00038     timeout.tv_usec = now.tv_usec + (ms % 1000) * 1000;
00039     if(timeout.tv_usec >= 1000000)
00040     {
00041       timeout.tv_sec += 1;
00042       timeout.tv_usec -= 1000000;
00043     }
00044     timeout_set = true;
00045   }
00046 }
00047 
00048 #ifndef DOXYGEN_UNDOCUMENTED
00049 typedef std::pair<struct timeval, NewNet::RefPtr<NewNet::Reactor::Timeout::Callback> > TimeoutItem;
00050 struct NewNet::Reactor::Timeouts
00051 {
00052   std::vector<TimeoutItem> timeouts;
00053 };
00054 #endif
00055 
00056 /* Check if any timeouts have expired. If so, invoke them and remove them
00057    from the queue. Also, update the timeout if a timeout should be called
00058    before the currently set timeout. */
00059 static bool
00060 checkTimeouts(std::vector<TimeoutItem> & timeouts, struct timeval & timeout, bool & timeout_set)
00061 {
00062   bool retVal = false;
00063 
00064   struct timeval now;
00065   gettimeofday(&now, 0);
00066 
00067   std::vector<TimeoutItem>::iterator it, end = timeouts.end();
00068   std::queue<std::vector<TimeoutItem>::iterator> old;
00069 
00070   for(it = timeouts.begin(); it != end; ++it)
00071   {
00072     // Has the timeout expired?
00073     if(timercmp(&now, &(*it).first, >=))
00074     {
00075       // Calculate how long the timeout is overdue
00076       gettimeofday(&now, 0);
00077       unsigned long diff = difftime(now, (*it).first);
00078 
00079       // And emit it
00080       (*it).second->operator()(diff);
00081 
00082       // Push it on the list of to-be-deleted timeouts
00083       old.push(it);
00084 
00085       retVal = true;
00086     }
00087     else if((! timeout_set) || (timercmp(&(*it).first, &timeout, <)))
00088     {
00089       /* If the timeout expires before the next cycle timeout, adjust
00090          the cycle timeout. */
00091       timeout.tv_sec = (*it).first.tv_sec;
00092       timeout.tv_usec = (*it).first.tv_usec;
00093       timeout_set = true;
00094     }
00095   }
00096 
00097   // Purge old timeouts from the event queue.
00098   while(! old.empty())
00099   {
00100     timeouts.erase(old.front());
00101     old.pop();
00102   }
00103 
00104   return retVal;
00105 }
00106 
00107 NewNet::Reactor::Reactor()
00108 {
00109   m_Timeouts = new Timeouts;
00110 }
00111 
00112 #ifndef DOXYGEN_UNDOCUMENTED
00113 NewNet::Reactor::~Reactor()
00114 {
00115   delete m_Timeouts;
00116 }
00117 #endif // DOXYGEN_UNDOCUMENTED
00118 
00119 void NewNet::Reactor::add(Socket * socket)
00120 {
00121   if(socket->reactor())
00122   {
00123     if(socket->reactor() == this)
00124       return;
00125 
00126     /* Proceed with caution here, otherwise the socket might get prematurely
00127        deleted: first create our shared reference, then remove it from the
00128        other reactor. */
00129     m_Sockets.push_back(socket);
00130     socket->reactor()->remove(socket);
00131   }
00132   else
00133     m_Sockets.push_back(socket);
00134   socket->setReactor(this);
00135 }
00136 
00137 void NewNet::Reactor::remove(Socket * socket)
00138 {
00139   // Removing a socket from the wrong reactor is a programming error, trap it.
00140   assert(socket->reactor() == this);
00141 
00142   socket->setReactor(0);
00143   m_Sockets.erase(std::find(m_Sockets.begin(), m_Sockets.end(), socket));
00144 }
00145 
00146 void NewNet::Reactor::run()
00147 {
00148   int nfds = 0;
00149   fd_set readfds, writefds, exceptfds;
00150 
00151   m_StopReactor = false;
00152   while((! m_StopReactor) && (! (m_Timeouts->timeouts.empty() && m_Sockets.empty())))
00153   {
00154     /* No timeout set yet */
00155     bool timeout_set = false;
00156     struct timeval timeout;
00157 
00158     /* Zero the FD sets */
00159     nfds = 0;
00160     FD_ZERO(&readfds);
00161     FD_ZERO(&writefds);
00162     FD_ZERO(&exceptfds);
00163 
00164     /* Make a copy of our socket list, as they might disappear because of
00165        events that occur and then our iterators go berserk */
00166     std::vector<NewNet::RefPtr<NewNet::Socket> > sockets(m_Sockets);
00167 
00168     /* Check which events we want to hear about from which sockets */
00169     std::vector<NewNet::RefPtr<NewNet::Socket> >::iterator it, end = sockets.end();
00170     for(it = sockets.begin(); it != end; ++it)
00171     {
00172       /* Convenience... */
00173       NewNet::Socket * sock = *it;
00174       int fd = sock->descriptor();
00175 
00176       if(fd == -1)
00177         continue;
00178 
00179       long n; // miliseconds to next window of opportunity
00180 
00181       switch(sock->socketState())
00182       {
00183         /* The socket is dead, no events are interesting */
00184         case NewNet::Socket::SocketUninitialized:
00185         case NewNet::Socket::SocketDisconnecting:
00186         case NewNet::Socket::SocketDisconnected:
00187         case NewNet::Socket::SocketException:
00188           break;
00189 
00190         /* Listening socket, check for read-ready events */
00191         case NewNet::Socket::SocketListening:
00192           FD_SET(fd, &readfds);
00193           nfds = std::max(nfds, fd + 1);
00194           break;
00195 
00196         /* Connecting socket, check for write-ready events */
00197         case NewNet::Socket::SocketConnecting:
00198           FD_SET(fd, &writefds);
00199           nfds = std::max(nfds, fd + 1);
00200           break;
00201 
00202         /* Connected socket, if possible / allowed check for read, write
00203            and OOB events */
00204         case NewNet::Socket::SocketConnected:
00205           /* Check if we're allowed to receive, and if not, when we might be. */
00206           n = (! sock->downRateLimiter()) ? 0 : sock->downRateLimiter()->nextWindow();
00207           if(n == 0)
00208             FD_SET(fd, &readfds);
00209           else
00210           {
00211             NNLOG("newnet.net.debug", "Download limiter for socket %i recommends %li ms sleep.", fd, n);
00212             fixtime(timeout, n, timeout_set);
00213           }
00214 
00215           /* Check if we want to send, if we're allowed to send. And if we're
00216              not allowed to send, when we might be. */
00217           if(sock->dataWaiting())
00218           {
00219             n = (! sock->upRateLimiter()) ? 0 : sock->upRateLimiter()->nextWindow();
00220             if(n == 0)
00221               FD_SET(fd, &writefds);
00222             else
00223             {
00224               NNLOG("newnet.net.debug", "Upload rate limiter for socket %i reports next window in %li ms", fd, n);
00225               fixtime(timeout, n, timeout_set);
00226             }
00227           }
00228 
00229           /* OOB data is not rate limited and we're always interested */
00230           FD_SET(fd, &exceptfds);
00231 
00232           nfds = std::max(nfds, fd + 1);
00233           break;
00234         }
00235     }
00236 
00237     if(checkTimeouts(m_Timeouts->timeouts, timeout, timeout_set))
00238       continue;
00239 
00240     if(timeout_set)
00241     {
00242       /* We know when we need to wake up, but how many sec/usec from
00243          now is that? */
00244       struct timeval now;
00245       gettimeofday(&now, 0);
00246       timeout.tv_sec -= now.tv_sec;
00247       timeout.tv_usec -= now.tv_usec;
00248       if(timeout.tv_usec < 0)
00249       {
00250         timeout.tv_sec -= 1;
00251         timeout.tv_usec += 1000000;
00252       }
00253       if(timeout.tv_sec < 0)
00254       {
00255         timeout.tv_sec = 0;
00256         timeout.tv_usec = 0;
00257       }
00258     }
00259 
00260     /* If we have nothing to do, just sleep a bit */
00261     if(nfds == 0)
00262     {
00263       /* If no timeout is set and there are no active descriptors, the reactor
00264          is dead and it should exit. */
00265       if(! timeout_set)
00266         break;
00267 
00268       NNLOG("newnet.net.debug", "Sleeping %li ms until next timeout.", (timeout.tv_sec * 1000) + (timeout.tv_usec / 1000));
00269 
00270 #ifndef WIN32
00271       if(sleep(timeout.tv_sec) == 0)
00272         usleep(timeout.tv_usec);
00273 #else
00274       Sleep(timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
00275 #endif // ! WIN32
00276       continue;
00277     }
00278 
00279     /* Wait for socket events */
00280     int r = 0;
00281     if(timeout_set)
00282     {
00283       NNLOG("newnet.net.debug", "Waiting at most %li ms until one of max. %i sockets wakes up.", (timeout.tv_sec * 1000) + (timeout.tv_usec / 1000), nfds);
00284       r = select(nfds, &readfds, &writefds, &exceptfds, &timeout);
00285     }
00286     else
00287     {
00288       NNLOG("newnet.net.debug", "Waiting indefinitely until one of max. %i sockets wakes up.", nfds);
00289       r = select(nfds, &readfds, &writefds, &exceptfds, 0);
00290     }
00291 
00292     if(r == -1) // An error occured
00293       continue; // Let's pretend nothing happened and just try again
00294 
00295     for(it = sockets.begin(); (it != end) && (r > 0); ++it)
00296     {
00297       NewNet::Socket * sock = *it;
00298       int fd = sock->descriptor();
00299       if(fd == -1)
00300         continue;
00301 
00302       // Update the socket's ready state
00303       int state = 0;
00304       if(FD_ISSET(fd, &readfds))
00305         state |= NewNet::Socket::StateReceive;
00306       if(FD_ISSET(fd, &writefds))
00307         state |= NewNet::Socket::StateSend;
00308       if(FD_ISSET(fd, &exceptfds))
00309         state |= NewNet::Socket::StateException;
00310       sock->setReadyState(state);
00311 
00312       // If we have something to report, make the socket process the events.
00313       if(state)
00314       {
00315         --r;
00316         sock->process();
00317       }
00318     }
00319   }
00320 }
00321 
00322 void
00323 NewNet::Reactor::stop()
00324 {
00325   m_StopReactor = true;
00326 }
00327 
00328 NewNet::Reactor::Timeout::Callback *
00329 NewNet::Reactor::addTimeout(long msec, Timeout::Callback * callback)
00330 {
00331   // Calculate when the event has to occur
00332   struct timeval tv;
00333   gettimeofday(&tv, 0);
00334   tv.tv_sec += (msec / 1000);
00335   tv.tv_usec += (msec % 1000) * 1000;
00336   if(tv.tv_usec >= 1000000)
00337   {
00338     tv.tv_sec += 1;
00339     tv.tv_usec -= 1000000;
00340   }
00341 
00342   // Push the timeout on our queue
00343   m_Timeouts->timeouts.push_back(TimeoutItem(tv, callback));
00344 
00345   // Return the callback, for convenience
00346   return callback;
00347 }

Generated on Sun Jan 7 14:00:01 2007 for NewNet by  doxygen 1.5.1