--- a/src/utils/netcon.cpp
+++ b/src/utils/netcon.cpp
@@ -67,9 +67,9 @@
static const int one = 1;
static const int zero = 0;
-#define LOGSYSERR(who, call, spar) \
- LOGERR((who) << ": " << (call) << "(" << (spar) << ") errno " << \
- (errno) << " (" << (strerror(errno)) << ")\n")
+#define LOGSYSERR(who, call, spar) \
+ LOGERR(who << ": " << call << "(" << spar << ") errno " << \
+ errno << " (" << strerror(errno) << ")\n")
#ifndef MIN
#define MIN(a,b) ((a)<(b)?(a):(b))
@@ -186,7 +186,8 @@
it != m_polldata.end(); it++) {
NetconP& pll = it->second;
int fd = it->first;
- LOGDEB2("Selectloop: fd " << (fd) << " flags 0x" << (pll->m_wantedEvents) << "\n" );
+ LOGDEB2("Selectloop: fd " << fd << " flags 0x" <<
+ pll->m_wantedEvents << "\n");
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
FD_SET(fd, &rd);
nfds = MAX(nfds, fd + 1);
@@ -210,7 +211,7 @@
return 0;
}
- LOGDEB2("Netcon::selectloop: selecting, nfds = " << (nfds) << "\n" );
+ LOGDEB2("Netcon::selectloop: selecting, nfds = " << nfds << "\n");
// Compute the next timeout according to what might need to be
// done apart from waiting for data
@@ -362,6 +363,7 @@
return 0;
}
+
// Set/reset non-blocking flag on fd
int Netcon::set_nonblock(int onoff)
{
@@ -379,23 +381,46 @@
/////////////////////////////////////////////////////////////////////
// Data socket (NetconData) methods
+NetconData::NetconData(bool cancellable)
+ : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0), m_wkfds{-1,-1}
+{
+ if (cancellable) {
+ if (pipe(m_wkfds) < 0) {
+ LOGSYSERR("NetconData::NetconData", "pipe", "");
+ m_wkfds[0] = m_wkfds[1] = -1;
+ }
+ LOGDEB2("NetconData:: m_wkfds[0] " << m_wkfds[0] << " m_wkfds[1] " <<
+ m_wkfds[1] << endl);
+ for (int i = 0; i < 2; i++) {
+ int flags = fcntl(m_wkfds[i], F_GETFL, 0);
+ fcntl(m_wkfds[i], F_SETFL, flags | O_NONBLOCK);
+ }
+ }
+}
+
NetconData::~NetconData()
{
freeZ(m_buf);
m_bufbase = 0;
m_bufbytes = m_bufsize = 0;
+ for (int i = 0; i < 2; i++) {
+ if (m_wkfds[i] >= 0) {
+ close(m_wkfds[i]);
+ }
+ }
}
int NetconData::send(const char *buf, int cnt, int expedited)
{
- LOGDEB2("NetconData::send: fd " << (m_fd) << " cnt " << (cnt) << " expe " << (expedited) << "\n" );
+ LOGDEB2("NetconData::send: fd " << m_fd << " cnt " << cnt <<
+ " expe " << expedited << "\n");
int flag = 0;
if (m_fd < 0) {
LOGERR("NetconData::send: connection not opened\n" );
return -1;
}
if (expedited) {
- LOGDEB2("NetconData::send: expedited data, count " << (cnt) << " bytes\n" );
+ LOGDEB2("NetconData::send: expedited data, count " <<cnt << " bytes\n");
flag = MSG_OOB;
}
int ret;
@@ -416,36 +441,25 @@
return ret;
}
-// Test for data available
-int NetconData::readready()
-{
- LOGDEB2("NetconData::readready\n" );
- if (m_fd < 0) {
- LOGERR("NetconData::readready: connection not opened\n" );
- return -1;
- }
- return select1(m_fd, 0);
-}
-
-// Test for writable
-int NetconData::writeready()
-{
- LOGDEB2("NetconData::writeready\n" );
- if (m_fd < 0) {
- LOGERR("NetconData::writeready: connection not opened\n" );
- return -1;
- }
- return select1(m_fd, 0, 1);
+void NetconData::cancelReceive()
+{
+ if (m_wkfds[1] >= 0) {
+ LOGDEB2("NetconData::cancelReceive: writing to " << m_wkfds[1] << endl);
+ ::write(m_wkfds[1], "!", 1);
+ }
}
// Receive at most cnt bytes (maybe less)
int NetconData::receive(char *buf, int cnt, int timeo)
{
- LOGDEB2("NetconData::receive: cnt " << (cnt) << " timeo " << (timeo) << " m_buf 0x" << (m_buf) << " m_bufbytes " << (m_bufbytes) << "\n" );
+ LOGDEB2("NetconData::receive: cnt " << cnt << " timeo " << timeo <<
+ " m_buf 0x" << m_buf << " m_bufbytes " << m_bufbytes << "\n");
+
if (m_fd < 0) {
LOGERR("NetconData::receive: connection not opened\n" );
return -1;
}
+
int fromibuf = 0;
// Get whatever might have been left in the buffer by a previous
// getline, except if we're called to fill the buffer of course
@@ -455,31 +469,54 @@
m_bufbytes -= fromibuf;
m_bufbase += fromibuf;
cnt -= fromibuf;
- LOGDEB2("NetconData::receive: transferred " << (fromibuf) << " from mbuf\n" );
+ LOGDEB2("NetconData::receive: got " << fromibuf << " from mbuf\n");
if (cnt <= 0) {
return fromibuf;
}
}
+
if (timeo > 0) {
- int ret = select1(m_fd, timeo);
- if (ret == 0) {
- LOGDEB2("NetconData::receive timed out\n" );
+ struct timeval tv;
+ tv.tv_sec = timeo;
+ tv.tv_usec = 0;
+ fd_set rd;
+ FD_ZERO(&rd);
+ FD_SET(m_fd, &rd);
+ bool cancellable = (m_wkfds[0] >= 0);
+ if (cancellable) {
+ LOGDEB2("NetconData::receive: cancel fd " << m_wkfds[0] << endl);
+ FD_SET(m_wkfds[0], &rd);
+ }
+ int nfds = MAX(m_fd, m_wkfds[0]) + 1;
+
+ int ret = select(nfds, &rd, 0, 0, &tv);
+ LOGDEB2("NetconData::receive: select returned " << ret << endl);
+
+ if (cancellable && FD_ISSET(m_wkfds[0], &rd)) {
+ char b[100];
+ read(m_wkfds[0], b, 100);
+ return Cancelled;
+ }
+
+ if (!FD_ISSET(m_fd, &rd)) {
m_didtimo = 1;
- return -1;
- }
+ return TimeoutOrError;
+ }
+
if (ret < 0) {
LOGSYSERR("NetconData::receive", "select", "");
- return -1;
- }
- }
+ m_didtimo = 0;
+ return TimeoutOrError;
+ }
+ }
+
m_didtimo = 0;
if ((cnt = read(m_fd, buf + fromibuf, cnt)) < 0) {
- char fdcbuf[20];
- sprintf(fdcbuf, "%d", m_fd);
- LOGSYSERR("NetconData::receive", "read", fdcbuf);
- return -1;
- }
- LOGDEB2("NetconData::receive: normal return, cnt " << (cnt) << "\n" );
+ LOGSYSERR("NetconData::receive", "read", m_fd);
+ return -1;
+ }
+ LOGDEB2("NetconData::receive: normal return, fromibuf " << fromibuf <<
+ " cnt " << cnt << "\n");
return fromibuf + cnt;
}
@@ -487,13 +524,13 @@
int NetconData::doreceive(char *buf, int cnt, int timeo)
{
int got, cur;
- LOGDEB2("Netcon::doreceive: cnt " << (cnt) << ", timeo " << (timeo) << "\n" );
+ LOGDEB2("Netcon::doreceive: cnt " << cnt << ", timeo " << timeo << "\n");
cur = 0;
while (cnt > cur) {
got = receive(buf, cnt - cur, timeo);
LOGDEB2("Netcon::doreceive: got " << (got) << "\n" );
if (got < 0) {
- return -1;
+ return got;
}
if (got == 0) {
return cur;