--- a/src/netcon.cpp
+++ b/src/netcon.cpp
@@ -29,6 +29,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
+#include <stdint.h>
#ifdef _AIX
#include <strings.h>
@@ -36,13 +37,17 @@
#include <unistd.h>
#include <fcntl.h>
-#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
+#ifdef HAVE_KQUEUE
+#include <sys/types.h>
+#include <sys/event.h>
+#include <vector>
+#endif
#include <map>
@@ -55,7 +60,7 @@
#endif
// Size of path buffer in sockaddr_un (AF_UNIX socket
-// addr). Mysteriously it's 108 (explicit value) under linux, no
+// addr). Mysteriously it is 108 (explicit value) under linux, no
// define accessible. Let's take a little margin as it appears that
// some systems use 92. I believe we could also malloc a variable size
// struct but why bother.
@@ -81,8 +86,8 @@
#define freeZ(X) if (X) {free(X);X=0;}
#endif
-#define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \
- ((NEW).tv_usec - (OLD).tv_usec) / 1000))
+#define MILLIS(OLD, NEW) ( (uint64_t((NEW).tv_sec) - (OLD).tv_sec) * 1000 + \
+ ((NEW).tv_usec - (OLD).tv_usec) / 1000 )
// Static method
// Simplified interface to 'select()'. Only use one fd, for either
@@ -109,24 +114,87 @@
return ret;
}
+
+///////////////////////////////////////////
+// SelectLoop
+
+class SelectLoop::Internal {
+public:
+ Internal() {
+#ifdef HAVE_KQUEUE
+ if ((kq = kqueue()) == -1) {
+ LOGSYSERR("Netcon::selectloop", "kqueue", "");
+ }
+#endif
+ }
+
+ ~Internal() {
+#ifdef HAVE_KQUEUE
+ if (kq >= 0)
+ close(kq);
+#endif
+ }
+
+ // Set by client callback to tell selectloop to return.
+ bool selectloopDoReturn{false};
+ int selectloopReturnValue{0};
+ int placetostart{0};
+
+ // Map of NetconP indexed by fd
+ map<int, NetconP> polldata;
+#ifdef HAVE_KQUEUE
+ int kq{-1};
+#endif
+ // The last time we did the periodic thing. Initialized by setperiodic()
+ struct timeval lasthdlcall;
+
+ // The call back function and its parameter
+ int (*periodichandler)(void *){0};
+ void *periodicparam{0};
+ // The periodic interval
+ int periodicmillis{0};
+
+ void periodictimeout(struct timeval *tv);
+ void periodictimeout(struct timespec *ts);
+ int maybecallperiodic();
+ int setselevents(int fd, int events);
+ int setselevents(NetconP& con, int events);
+};
+
+SelectLoop::SelectLoop()
+{
+ m = new Internal;
+}
+
+SelectLoop::~SelectLoop()
+{
+ delete m;
+}
+
+void SelectLoop::loopReturn(int value)
+{
+ m->selectloopDoReturn = true;
+ m->selectloopReturnValue = value;
+}
+
void SelectLoop::setperiodichandler(int (*handler)(void *), void *p, int ms)
{
- m_periodichandler = handler;
- m_periodicparam = p;
- m_periodicmillis = ms;
- if (m_periodicmillis > 0) {
- gettimeofday(&m_lasthdlcall, 0);
+ m->periodichandler = handler;
+ m->periodicparam = p;
+ m->periodicmillis = ms;
+ if (m->periodicmillis > 0) {
+ gettimeofday(&m->lasthdlcall, 0);
}
}
// Compute the appropriate timeout so that the select call returns in
// time to call the periodic routine.
-void SelectLoop::periodictimeout(struct timeval *tv)
+void SelectLoop::Internal::periodictimeout(struct timeval *tv)
{
// If periodic not set, the select call times out and we loop
// after a very long time (we'd need to pass NULL to select for an
// infinite wait, and I'm too lazy to handle it)
- if (m_periodicmillis <= 0) {
+ if (periodicmillis <= 0) {
tv->tv_sec = 10000;
tv->tv_usec = 0;
return;
@@ -134,7 +202,7 @@
struct timeval mtv;
gettimeofday(&mtv, 0);
- int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
+ int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
// millis <= 0 means we should have already done the thing. *dont* set the
// tv to 0, which means no timeout at all !
@@ -145,20 +213,31 @@
tv->tv_usec = (millis % 1000) * 1000;
}
+void SelectLoop::Internal::periodictimeout(struct timespec *ts)
+{
+ struct timeval tv;
+ periodictimeout(&tv);
+ ts->tv_sec = tv.tv_sec;
+ ts->tv_nsec = tv.tv_usec * 1000;
+}
+
+
// Check if it's time to call the handler. selectloop will return to
-// caller if it or we return 0
-int SelectLoop::maybecallperiodic()
-{
- if (m_periodicmillis <= 0) {
+// caller if either we or the handler return 0
+int SelectLoop::Internal::maybecallperiodic()
+{
+ if (periodicmillis <= 0) {
return 1;
}
+
struct timeval mtv;
gettimeofday(&mtv, 0);
- int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
+ int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
+
if (millis <= 0) {
- gettimeofday(&m_lasthdlcall, 0);
- if (m_periodichandler) {
- return m_periodichandler(m_periodicparam);
+ lasthdlcall = mtv;
+ if (periodichandler) {
+ return periodichandler(periodicparam);
} else {
return 0;
}
@@ -166,14 +245,17 @@
return 1;
}
+#ifndef HAVE_KQUEUE
+
int SelectLoop::doLoop()
{
for (;;) {
- if (m_selectloopDoReturn) {
- m_selectloopDoReturn = false;
+ if (m->selectloopDoReturn) {
+ m->selectloopDoReturn = false;
LOGDEB("Netcon::selectloop: returning on request\n");
- return m_selectloopReturnValue;
- }
+ return m->selectloopReturnValue;
+ }
+
int nfds;
fd_set rd, wd;
FD_ZERO(&rd);
@@ -182,10 +264,9 @@
// Walk the netcon map and set up the read and write fd_sets
// for select()
nfds = 0;
- for (map<int, NetconP>::iterator it = m_polldata.begin();
- it != m_polldata.end(); it++) {
- NetconP& pll = it->second;
- int fd = it->first;
+ for (auto& entry : m->polldata) {
+ NetconP& pll = entry.second;
+ int fd = entry.first;
LOGDEB2("Selectloop: fd " << fd << " flags 0x" <<
pll->m_wantedEvents << "\n");
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
@@ -206,7 +287,7 @@
// Just in case there would still be open fds in there
// (with no r/w flags set). Should not be needed, but safer
- m_polldata.clear();
+ m->polldata.clear();
LOGDEB1("Netcon::selectloop: no fds\n");
return 0;
}
@@ -216,7 +297,7 @@
// Compute the next timeout according to what might need to be
// done apart from waiting for data
struct timeval tv;
- periodictimeout(&tv);
+ m->periodictimeout(&tv);
// Wait for something to happen
int ret = select(nfds, &rd, &wd, 0, &tv);
LOGDEB2("Netcon::selectloop: nfds " << nfds <<
@@ -225,7 +306,7 @@
LOGSYSERR("Netcon::selectloop", "select", "");
return -1;
}
- if (m_periodicmillis > 0 && maybecallperiodic() <= 0) {
+ if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
return 1;
}
@@ -242,12 +323,12 @@
// map may change between 2 sweeps, so that we'd have to be smart
// with the iterator. As the cost per unused fd is low (just 2 bit
// flag tests), we keep it like this for now
- if (m_placetostart >= nfds) {
- m_placetostart = 0;
+ if (m->placetostart >= nfds) {
+ m->placetostart = 0;
}
int i, fd;
int activefds = 0;
- for (i = 0, fd = m_placetostart; i < nfds; i++, fd++) {
+ for (i = 0, fd = m->placetostart; i < nfds; i++, fd++) {
if (fd >= nfds) {
fd = 0;
}
@@ -263,8 +344,8 @@
continue;
}
- map<int, NetconP>::iterator it = m_polldata.find(fd);
- if (it == m_polldata.end()) {
+ auto it = m->polldata.find(fd);
+ if (it == m->polldata.end()) {
// This should never happen, because we only set our
// own fds in the mask !
LOGERR("Netcon::selectloop: fd " << fd << " not found\n");
@@ -272,7 +353,7 @@
}
activefds++;
// Next start will be one beyond last serviced (modulo nfds)
- m_placetostart = fd + 1;
+ m->placetostart = fd + 1;
NetconP& pll = it->second;
if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
@@ -285,7 +366,7 @@
(Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
<< it->second->m_wantedEvents << " mask, erasing\n");
- m_polldata.erase(it);
+ m->polldata.erase(it);
}
} // fd sweep
@@ -299,7 +380,155 @@
return -1;
}
-// Add a connection to the monitored set.
+#else // -> Using kqueue: use select()
+
+int SelectLoop::doLoop()
+{
+ for (;;) {
+ if (m->selectloopDoReturn) {
+ m->selectloopDoReturn = false;
+ LOGDEB("Netcon::selectloop: returning on request\n");
+ return m->selectloopReturnValue;
+ }
+
+ // Check that we do have something to wait for.
+ int nfds = 0;
+ for (auto& entry : m->polldata) {
+ NetconP& pll = entry.second;
+ if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
+ nfds++;
+ } else if (pll->m_wantedEvents & Netcon::NETCONPOLL_WRITE) {
+ nfds++;
+ }
+ }
+ if (nfds == 0) {
+ // This should never happen in a server as we should at least
+ // always monitor the main listening server socket. For a
+ // client, it's up to client code to avoid or process this
+ // condition.
+
+ // Just in case there would still be open fds in there
+ // (with no r/w flags set). Should not be needed, but safer
+ m->polldata.clear();
+ LOGDEB1("Netcon::selectloop: no fds\n");
+ return 0;
+ }
+
+ // Compute the next timeout according to what might need to be
+ // done apart from waiting for data
+ struct timespec ts;
+ m->periodictimeout(&ts);
+ // Wait for something to happen
+ vector<struct kevent> events;
+ events.resize(nfds);
+ LOGDEB("Netcon::selectloop: kevent(), nfds = " << nfds << "\n");
+ int ret = kevent(m->kq, 0, 0, &events[0], events.size(), &ts);
+ LOGDEB("Netcon::selectloop: nfds " << nfds <<
+ " kevent returns " << ret << "\n");
+ if (ret < 0) {
+ LOGSYSERR("Netcon::selectloop", "kevent", "");
+ return -1;
+ }
+ if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
+ return 1;
+ }
+ if (ret == 0) {
+ // Timeout, do it again.
+ continue;
+ }
+
+ for (int i = 0; i < ret; i++) {
+ struct kevent& ev = events[i];
+ if (ev.flags & EV_ERROR) {
+ LOGSYSERR("Netcon::selectLoop", "kevent", "");
+ LOGERR("Netcon::selectLoop: event error: " <<
+ strerror(ev.data));
+ return -1;
+ }
+ int canread = ev.filter == EVFILT_READ;
+ int canwrite = ev.filter == EVFILT_WRITE;
+ bool none = !canread && !canwrite;
+ LOGDEB("Netcon::selectloop: fd " << int(ev.ident) << " " <<
+ (none ? "blocked" : "can") << " " <<
+ (canread ? "read" : "") << " " <<
+ (canwrite ? "write" : "") << "\n");
+ if (none) {
+ LOGERR("Kevent returned unknown filter " << ev.filter <<endl);
+ continue;
+ }
+
+ auto it = m->polldata.find(int(ev.ident));
+ if (it == m->polldata.end()) {
+ LOGERR("Netcon::selectloop: fd " << int(ev.ident) <<
+ " not found\n");
+ continue;
+ }
+ NetconP& pll = it->second;
+ if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
+ pll->setselevents(pll->getselevents() &
+ ~Netcon::NETCONPOLL_READ);
+ }
+ if (canwrite && pll->cando(Netcon::NETCONPOLL_WRITE) <= 0) {
+ pll->setselevents(pll->getselevents() &
+ ~Netcon::NETCONPOLL_WRITE);
+ }
+ if (!(pll->getselevents() &
+ (Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
+ LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
+ << it->second->getselevents() << " mask, erasing\n");
+ m->polldata.erase(it);
+ }
+ } // fd sweep
+
+ } // forever loop
+ LOGERR("SelectLoop::doLoop: got out of loop !\n");
+ return -1;
+}
+
+#endif // kqueue version
+
+int SelectLoop::Internal::setselevents(int fd, int events)
+{
+#ifdef HAVE_KQUEUE
+ auto it = polldata.find(fd);
+ if (it == polldata.end()) {
+ return -1;
+ }
+ return setselevents(it->second, events);
+#endif
+ return 0;
+}
+
+int SelectLoop::Internal::setselevents(NetconP& con, int events)
+{
+#ifdef HAVE_KQUEUE
+ struct kevent event;
+ if (events & Netcon::NETCONPOLL_READ) {
+ EV_SET(&event, con->m_fd, EVFILT_READ, EV_ADD, 0, 0, 0);
+ if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
+ LOGSYSERR("SelectLoop::addselcon", "kevent", "");
+ return -1;
+ }
+ } else {
+ EV_SET(&event, con->m_fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ kevent(kq, &event, 1, 0, 0, 0);
+ }
+ if (events & Netcon::NETCONPOLL_WRITE) {
+ EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
+ if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
+ LOGSYSERR("SelectLoop::addselcon", "kevent", "");
+ return -1;
+ }
+ } else {
+ EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ kevent(kq, &event, 1, 0, 0, 0);
+ }
+#endif
+ return 0;
+}
+
+// Add a connection to the monitored set. This can be used to change
+// the event flags too (won't add duplicates)
int SelectLoop::addselcon(NetconP con, int events)
{
if (!con) {
@@ -307,10 +536,10 @@
}
LOGDEB1("Netcon::addselcon: fd " << con->m_fd << "\n");
con->set_nonblock(1);
- con->setselevents(events);
- m_polldata[con->m_fd] = con;
+ con->m_wantedEvents = events;
+ m->polldata[con->m_fd] = con;
con->setloop(this);
- return 0;
+ return m->setselevents(con, events);
}
// Remove a connection from the monitored set.
@@ -320,14 +549,15 @@
return -1;
}
LOGDEB1("Netcon::remselcon: fd " << con->m_fd << "\n");
- map<int, NetconP>::iterator it = m_polldata.find(con->m_fd);
- if (it == m_polldata.end()) {
+ m->setselevents(con, 0);
+ auto it = m->polldata.find(con->m_fd);
+ if (it == m->polldata.end()) {
LOGDEB1("Netcon::remselcon: con not found for fd " <<
con->m_fd << "\n");
return -1;
}
con->setloop(0);
- m_polldata.erase(it);
+ m->polldata.erase(it);
return 0;
}
@@ -394,6 +624,15 @@
return flags;
}
+int Netcon::setselevents(int events)
+{
+ m_wantedEvents = events;
+ if (m_loop) {
+ m_loop->m->setselevents(m_fd, events);
+ }
+ return m_wantedEvents;
+}
+
/////////////////////////////////////////////////////////////////////
// Data socket (NetconData) methods
@@ -651,7 +890,7 @@
return 0;
}
}
- clearselevents(NETCONPOLL_WRITE);
+ m_wantedEvents &= ~NETCONPOLL_WRITE;
return 1;
}