Switch to side-by-side view

--- a/src/netcon.cpp
+++ b/src/netcon.cpp
@@ -29,6 +29,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <errno.h>
+#include <stdint.h>
 
 #ifdef _AIX
 #include <strings.h>
@@ -36,13 +37,17 @@
 
 #include <unistd.h>
 #include <fcntl.h>
-#include <sys/time.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
+#ifdef HAVE_KQUEUE
+#include <sys/types.h>
+#include <sys/event.h>
+#include <vector>
+#endif
 
 #include <map>
 
@@ -55,7 +60,7 @@
 #endif
 
 // Size of path buffer in sockaddr_un (AF_UNIX socket
-// addr). Mysteriously it's 108 (explicit value) under linux, no
+// addr). Mysteriously it is 108 (explicit value) under linux, no
 // define accessible. Let's take a little margin as it appears that
 // some systems use 92. I believe we could also malloc a variable size
 // struct but why bother.
@@ -81,8 +86,8 @@
 #define freeZ(X) if (X) {free(X);X=0;}
 #endif
 
-#define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \
-                  ((NEW).tv_usec - (OLD).tv_usec) / 1000))
+#define MILLIS(OLD, NEW) ( (uint64_t((NEW).tv_sec) - (OLD).tv_sec) * 1000 + \
+                            ((NEW).tv_usec - (OLD).tv_usec) / 1000 )
 
 // Static method
 // Simplified interface to 'select()'. Only use one fd, for either
@@ -109,24 +114,87 @@
     return ret;
 }
 
+
+///////////////////////////////////////////
+// SelectLoop
+
+class SelectLoop::Internal {
+public:
+    Internal() {
+#ifdef HAVE_KQUEUE
+        if ((kq = kqueue()) == -1) {
+            LOGSYSERR("Netcon::selectloop", "kqueue", "");
+        }
+#endif
+    }
+
+    ~Internal() {
+#ifdef HAVE_KQUEUE
+        if (kq >= 0)
+            close(kq);
+#endif
+    }
+    
+    // Set by client callback to tell selectloop to return.
+    bool selectloopDoReturn{false};
+    int  selectloopReturnValue{0};
+    int  placetostart{0};
+
+    // Map of NetconP indexed by fd
+    map<int, NetconP> polldata;
+#ifdef HAVE_KQUEUE
+    int kq{-1};
+#endif
+    // The last time we did the periodic thing. Initialized by setperiodic()
+    struct timeval lasthdlcall;
+
+    // The call back function and its parameter
+    int (*periodichandler)(void *){0};
+    void *periodicparam{0};
+    // The periodic interval
+    int periodicmillis{0};
+
+    void periodictimeout(struct timeval *tv);
+    void periodictimeout(struct timespec *ts);
+    int maybecallperiodic();
+    int setselevents(int fd, int events);
+    int setselevents(NetconP& con, int events);
+};
+
+SelectLoop::SelectLoop()
+{
+    m = new Internal;
+}
+
+SelectLoop::~SelectLoop()
+{
+    delete m;
+}
+
+void SelectLoop::loopReturn(int value)
+{
+    m->selectloopDoReturn = true;
+    m->selectloopReturnValue = value;
+}
+        
 void SelectLoop::setperiodichandler(int (*handler)(void *), void *p, int ms)
 {
-    m_periodichandler = handler;
-    m_periodicparam = p;
-    m_periodicmillis = ms;
-    if (m_periodicmillis > 0) {
-        gettimeofday(&m_lasthdlcall, 0);
+    m->periodichandler = handler;
+    m->periodicparam = p;
+    m->periodicmillis = ms;
+    if (m->periodicmillis > 0) {
+        gettimeofday(&m->lasthdlcall, 0);
     }
 }
 
 // Compute the appropriate timeout so that the select call returns in
 // time to call the periodic routine.
-void SelectLoop::periodictimeout(struct timeval *tv)
+void SelectLoop::Internal::periodictimeout(struct timeval *tv)
 {
     // If periodic not set, the select call times out and we loop
     // after a very long time (we'd need to pass NULL to select for an
     // infinite wait, and I'm too lazy to handle it)
-    if (m_periodicmillis <= 0) {
+    if (periodicmillis <= 0) {
         tv->tv_sec = 10000;
         tv->tv_usec = 0;
         return;
@@ -134,7 +202,7 @@
 
     struct timeval mtv;
     gettimeofday(&mtv, 0);
-    int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
+    int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
     
     // millis <= 0 means we should have already done the thing. *dont* set the
     // tv to 0, which means no timeout at all !
@@ -145,20 +213,31 @@
     tv->tv_usec = (millis % 1000) * 1000;
 }
 
+void SelectLoop::Internal::periodictimeout(struct timespec *ts)
+{
+    struct timeval tv;
+    periodictimeout(&tv);
+    ts->tv_sec = tv.tv_sec;
+    ts->tv_nsec = tv.tv_usec * 1000;
+}
+
+
 // Check if it's time to call the handler. selectloop will return to
-// caller if it or we return 0
-int SelectLoop::maybecallperiodic()
-{
-    if (m_periodicmillis <= 0) {
+// caller if either we or the handler return 0
+int SelectLoop::Internal::maybecallperiodic()
+{
+    if (periodicmillis <= 0) {
         return 1;
     }
+
     struct timeval mtv;
     gettimeofday(&mtv, 0);
-    int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
+    int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
+
     if (millis <= 0) {
-        gettimeofday(&m_lasthdlcall, 0);
-        if (m_periodichandler) {
-            return m_periodichandler(m_periodicparam);
+        lasthdlcall = mtv;
+        if (periodichandler) {
+            return periodichandler(periodicparam);
         } else {
             return 0;
         }
@@ -166,14 +245,17 @@
     return 1;
 }
 
+#ifndef HAVE_KQUEUE
+
 int SelectLoop::doLoop()
 {
     for (;;) {
-        if (m_selectloopDoReturn) {
-            m_selectloopDoReturn = false;
+        if (m->selectloopDoReturn) {
+            m->selectloopDoReturn = false;
             LOGDEB("Netcon::selectloop: returning on request\n");
-            return m_selectloopReturnValue;
-        }
+            return m->selectloopReturnValue;
+        }
+
         int nfds;
         fd_set rd, wd;
         FD_ZERO(&rd);
@@ -182,10 +264,9 @@
         // Walk the netcon map and set up the read and write fd_sets
         // for select()
         nfds = 0;
-        for (map<int, NetconP>::iterator it = m_polldata.begin();
-                it != m_polldata.end(); it++) {
-            NetconP& pll = it->second;
-            int fd  = it->first;
+        for (auto& entry : m->polldata) {
+            NetconP& pll = entry.second;
+            int fd  = entry.first;
             LOGDEB2("Selectloop: fd " << fd << " flags 0x"  <<
                     pll->m_wantedEvents << "\n");
             if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
@@ -206,7 +287,7 @@
 
             // Just in case there would still be open fds in there
             // (with no r/w flags set). Should not be needed, but safer
-            m_polldata.clear();
+            m->polldata.clear();
             LOGDEB1("Netcon::selectloop: no fds\n");
             return 0;
         }
@@ -216,7 +297,7 @@
         // Compute the next timeout according to what might need to be
         // done apart from waiting for data
         struct timeval tv;
-        periodictimeout(&tv);
+        m->periodictimeout(&tv);
         // Wait for something to happen
         int ret = select(nfds, &rd, &wd, 0, &tv);
         LOGDEB2("Netcon::selectloop: nfds " << nfds <<
@@ -225,7 +306,7 @@
             LOGSYSERR("Netcon::selectloop", "select", "");
             return -1;
         }
-        if (m_periodicmillis > 0 && maybecallperiodic() <= 0) {
+        if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
             return 1;
         }
 
@@ -242,12 +323,12 @@
         // map may change between 2 sweeps, so that we'd have to be smart
         // with the iterator. As the cost per unused fd is low (just 2 bit
         // flag tests), we keep it like this for now
-        if (m_placetostart >= nfds) {
-            m_placetostart = 0;
+        if (m->placetostart >= nfds) {
+            m->placetostart = 0;
         }
         int i, fd;
         int activefds = 0;
-        for (i = 0, fd = m_placetostart; i < nfds; i++, fd++) {
+        for (i = 0, fd = m->placetostart; i < nfds; i++, fd++) {
             if (fd >= nfds) {
                 fd = 0;
             }
@@ -263,8 +344,8 @@
                 continue;
             }
 
-            map<int, NetconP>::iterator it = m_polldata.find(fd);
-            if (it == m_polldata.end()) {
+            auto it = m->polldata.find(fd);
+            if (it == m->polldata.end()) {
                 // This should never happen, because we only set our
                 // own fds in the mask !
                 LOGERR("Netcon::selectloop: fd "  << fd << " not found\n");
@@ -272,7 +353,7 @@
             }
             activefds++;
             // Next start will be one beyond last serviced (modulo nfds)
-            m_placetostart = fd + 1;
+            m->placetostart = fd + 1;
 
             NetconP& pll = it->second;
             if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
@@ -285,7 +366,7 @@
                   (Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
                 LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
                         << it->second->m_wantedEvents << " mask, erasing\n");
-                m_polldata.erase(it);
+                m->polldata.erase(it);
             }
         } // fd sweep
 
@@ -299,7 +380,155 @@
     return -1;
 }
 
-// Add a connection to the monitored set.
+#else // -> Using kqueue: use select()
+
+int SelectLoop::doLoop()
+{
+    for (;;) {
+        if (m->selectloopDoReturn) {
+            m->selectloopDoReturn = false;
+            LOGDEB("Netcon::selectloop: returning on request\n");
+            return m->selectloopReturnValue;
+        }
+
+        // Check that we do have something to wait for.
+        int nfds = 0;
+        for (auto& entry : m->polldata) {
+            NetconP& pll = entry.second;
+            if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
+                nfds++;
+            } else if (pll->m_wantedEvents & Netcon::NETCONPOLL_WRITE) {
+                nfds++;
+            }
+        }
+        if (nfds == 0) {
+            // This should never happen in a server as we should at least
+            // always monitor the main listening server socket. For a
+            // client, it's up to client code to avoid or process this
+            // condition.
+
+            // Just in case there would still be open fds in there
+            // (with no r/w flags set). Should not be needed, but safer
+            m->polldata.clear();
+            LOGDEB1("Netcon::selectloop: no fds\n");
+            return 0;
+        }
+
+        // Compute the next timeout according to what might need to be
+        // done apart from waiting for data
+        struct timespec ts;
+        m->periodictimeout(&ts);
+        // Wait for something to happen
+        vector<struct kevent> events;
+        events.resize(nfds);
+        LOGDEB("Netcon::selectloop: kevent(), nfds = " << nfds << "\n");
+        int ret = kevent(m->kq, 0, 0, &events[0], events.size(), &ts);
+        LOGDEB("Netcon::selectloop: nfds " << nfds <<
+                " kevent returns " << ret << "\n");
+        if (ret < 0) {
+            LOGSYSERR("Netcon::selectloop", "kevent", "");
+            return -1;
+        }
+        if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
+            return 1;
+        }
+        if (ret == 0) {
+            // Timeout, do it again.
+            continue;
+        }
+ 
+        for (int i = 0; i < ret; i++) {
+            struct kevent& ev = events[i];
+            if (ev.flags & EV_ERROR) {
+                LOGSYSERR("Netcon::selectLoop", "kevent", "");
+                LOGERR("Netcon::selectLoop: event error: " <<
+                       strerror(ev.data));
+                return -1;
+            }
+            int canread = ev.filter == EVFILT_READ;
+            int canwrite = ev.filter == EVFILT_WRITE; 
+            bool none = !canread && !canwrite;
+            LOGDEB("Netcon::selectloop: fd " << int(ev.ident) << " "  << 
+                    (none ? "blocked" : "can") << " "  << 
+                    (canread ? "read" : "") << " "  << 
+                    (canwrite ? "write" : "") << "\n");
+            if (none) {
+                LOGERR("Kevent returned unknown filter " << ev.filter <<endl);
+                continue;
+            }
+
+            auto it = m->polldata.find(int(ev.ident));
+            if (it == m->polldata.end()) {
+                LOGERR("Netcon::selectloop: fd " << int(ev.ident) <<
+                       " not found\n");
+                continue;
+            }
+            NetconP& pll = it->second;
+            if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
+                pll->setselevents(pll->getselevents() &
+                                  ~Netcon::NETCONPOLL_READ);
+            }
+            if (canwrite && pll->cando(Netcon::NETCONPOLL_WRITE) <= 0) {
+                pll->setselevents(pll->getselevents() &
+                                  ~Netcon::NETCONPOLL_WRITE);
+            }
+            if (!(pll->getselevents() &
+                  (Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
+                LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
+                        << it->second->getselevents() << " mask, erasing\n");
+                m->polldata.erase(it);
+            }
+        } // fd sweep
+
+    } // forever loop
+    LOGERR("SelectLoop::doLoop: got out of loop !\n");
+    return -1;
+}
+
+#endif // kqueue version
+
+int SelectLoop::Internal::setselevents(int fd, int events)
+{
+#ifdef HAVE_KQUEUE
+    auto it = polldata.find(fd);
+    if (it == polldata.end()) {
+        return -1;
+    }
+    return setselevents(it->second, events);
+#endif
+    return 0;
+}
+
+int SelectLoop::Internal::setselevents(NetconP& con, int events)
+{
+#ifdef HAVE_KQUEUE
+    struct kevent event;
+    if (events & Netcon::NETCONPOLL_READ) {
+        EV_SET(&event, con->m_fd, EVFILT_READ, EV_ADD, 0, 0, 0);
+        if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
+            LOGSYSERR("SelectLoop::addselcon", "kevent", "");
+            return -1;
+        }
+    } else {
+        EV_SET(&event, con->m_fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+        kevent(kq, &event, 1, 0, 0, 0);
+    }
+    if (events & Netcon::NETCONPOLL_WRITE) {
+        EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
+        if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
+            LOGSYSERR("SelectLoop::addselcon", "kevent", "");
+            return -1;
+        }
+    } else {
+        EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+        kevent(kq, &event, 1, 0, 0, 0);
+    }
+#endif
+    return 0;
+}
+
+// Add a connection to the monitored set. This can be used to change
+// the event flags too (won't add duplicates)
 int SelectLoop::addselcon(NetconP con, int events)
 {
     if (!con) {
@@ -307,10 +536,10 @@
     }
     LOGDEB1("Netcon::addselcon: fd " << con->m_fd << "\n");
     con->set_nonblock(1);
-    con->setselevents(events);
-    m_polldata[con->m_fd] = con;
+    con->m_wantedEvents = events;
+    m->polldata[con->m_fd] = con;
     con->setloop(this);
-    return 0;
+    return m->setselevents(con, events);
 }
 
 // Remove a connection from the monitored set.
@@ -320,14 +549,15 @@
         return -1;
     }
     LOGDEB1("Netcon::remselcon: fd " << con->m_fd << "\n");
-    map<int, NetconP>::iterator it = m_polldata.find(con->m_fd);
-    if (it == m_polldata.end()) {
+    m->setselevents(con, 0);
+    auto it = m->polldata.find(con->m_fd);
+    if (it == m->polldata.end()) {
         LOGDEB1("Netcon::remselcon: con not found for fd " << 
                 con->m_fd << "\n");
         return -1;
     }
     con->setloop(0);
-    m_polldata.erase(it);
+    m->polldata.erase(it);
     return 0;
 }
 
@@ -394,6 +624,15 @@
     return flags;
 }
 
+int Netcon::setselevents(int events)
+{
+    m_wantedEvents = events;
+    if (m_loop) {
+        m_loop->m->setselevents(m_fd, events);
+    }
+    return m_wantedEvents;
+}
+        
 /////////////////////////////////////////////////////////////////////
 // Data socket (NetconData) methods
 
@@ -651,7 +890,7 @@
             return 0;
         }
     }
-    clearselevents(NETCONPOLL_WRITE);
+    m_wantedEvents &= ~NETCONPOLL_WRITE;
     return 1;
 }