--- a/src/mediaserver/cdplugins/curlfetch.cpp
+++ b/src/mediaserver/cdplugins/curlfetch.cpp
@@ -31,7 +31,8 @@
class CurlFetch::Internal {
public:
- Internal(const string& _url) : url(_url) {}
+ Internal(CurlFetch* parent)
+ : p(parent) {}
~Internal();
bool curlrunning() {
return curlworker.joinable();
@@ -41,15 +42,11 @@
size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
- size_t databufToQ(const void *contents, size_t size);
-
- string url;
- uint64_t startoffset;
- int timeoutsecs{0};
+
+ CurlFetch *p{nullptr};
CURL *curl{nullptr};
// The socket is used to kill any waiting by curl when we want to abort
curl_socket_t curlfd{-1};
- u_int64_t curl_data_count{0};
std::thread curlworker;
bool curldone{false};
CURLcode curl_code{CURLE_OK};
@@ -65,8 +62,6 @@
bool headers_ok{false};
map<string, string> headers;
- BufXChange<ABuffer*> *outqueue{nullptr};
-
// We pre-buffer the beginning of the stream so that the first
// block we actually release is always big enough for header
// forensics.
@@ -75,24 +70,16 @@
// Synchronization
condition_variable curlcv;
mutex curlmutex;
-
- // User callbacks
- function<void(bool, u_int64_t)> eofcb;
- function<void(u_int64_t)> fbcb;
- function<bool(string&,void *,int)> buf1cb;
};
CurlFetch::CurlFetch(const std::string& url)
-{
- m = std::unique_ptr<Internal>(new Internal(url));
+ : NetFetch(url)
+{
+ m = std::unique_ptr<Internal>(new Internal(this));
}
CurlFetch::~CurlFetch()
{
-}
-const string& CurlFetch::url()
-{
- return m->url;
}
CurlFetch::Internal::~Internal()
@@ -104,8 +91,8 @@
close(curlfd);
curlfd = -1;
}
- if (outqueue) {
- outqueue->setTerminate();
+ if (p->outqueue) {
+ p->outqueue->setTerminate();
}
curlcv.notify_all();
while (extWaitingThreads > 0) {
@@ -140,8 +127,8 @@
return false;
}
// We return after the curl thread is actually running
- m->outqueue = queue;
- m->startoffset = offset;
+ outqueue = queue;
+ startoffset = offset;
m->curlworker =
std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
while (!(m->curlrunning() || m->curldone || m->aborting)) {
@@ -156,7 +143,7 @@
return true;
}
-void CurlFetch::reset()
+bool CurlFetch::reset()
{
if (m->curlworker.joinable()) {
m->curlworker.join();
@@ -164,8 +151,9 @@
m->curldone = false;
m->curl_code = CURLE_OK;
m->curl_http_code = 200;
- m->curl_data_count = 0;
- m->outqueue->reset();
+ fetch_data_count = 0;
+ outqueue->reset();
+ return true;
}
bool CurlFetch::fetchDone(FetchStatus *code, int *http_code)
@@ -207,10 +195,10 @@
// We wait for the 1st buffer write call. If there is no data,
// we'll stop on curldone.
while (m->curlrunning() && !m->aborting && !m->curldone &&
- m->curl_data_count + m->headbuf.bytes == 0) {
+ fetch_data_count + m->headbuf.bytes == 0) {
LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
" aborting " << m->aborting << " datacount " <<
- m->curl_data_count + m->headbuf.bytes << "\n");
+ fetch_data_count + m->headbuf.bytes << "\n");
if (m->aborting) {
LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
m->extWaitingThreads--;
@@ -231,7 +219,7 @@
LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
m->headers_ok << " curlrunning " << m->curlrunning() <<
" aborting " << m->aborting << " datacnt " <<
- m->curl_data_count+ m->headbuf.bytes << endl);
+ fetch_data_count+ m->headbuf.bytes << endl);
return m->headers_ok;
}
@@ -300,52 +288,6 @@
return CURL_SOCKOPT_OK;
}
-// This is always called with the lock held
-size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
-{
- LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
-
- ABuffer *buf = nullptr;
- // Try to recover an empty buffer from the queue, else allocate one.
- if (outqueue && outqueue->take_recycled(&buf)) {
- if (buf->allocbytes < bcnt) {
- delete buf;
- buf = nullptr;
- }
- }
- if (buf == nullptr) {
- buf = new ABuffer(MAX(4096, bcnt));
- }
- if (buf == nullptr) {
- LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
- " bytes\n");
- return 0;
- }
- memcpy(buf->buf, contents, bcnt);
- buf->bytes = bcnt;
- buf->curoffs = 0;
-
- LOGDEB1("CurlFetch::calling put on " <<
- (outqueue ? outqueue->getname() : "null") << endl);
-
- if (!outqueue->put(buf)) {
- LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
- delete buf;
- return -1;
- }
-
- bool first = (curl_data_count == 0);
- curl_data_count += bcnt;
- if (first) {
- curlcv.notify_all();
- }
- if (fbcb) {
- fbcb(curl_data_count);
- }
- LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
- return bcnt;
-}
-
static size_t
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
{
@@ -369,7 +311,7 @@
#endif
unique_lock<mutex> lock(curlmutex);
- if (curl_data_count == 0 && headbuf.bytes < 1024) {
+ if (p->datacount() == 0 && headbuf.bytes < 1024) {
if (!headbuf.append((const char *)contents, bcnt)) {
LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
curlcv.notify_all();
@@ -380,23 +322,30 @@
}
}
- if (curl_data_count == 0 && buf1cb) {
+ if (p->datacount() == 0 && p->buf1cb) {
string sbuf;
- if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
+ if (!p->buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
return -1;
}
if (sbuf.size()) {
- if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
+ curlcv.notify_all();
+ if (p->databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
return -1;
}
}
}
if (headbuf.bytes) {
- databufToQ(headbuf.buf, headbuf.bytes);
+ if (p->datacount() == 0) {
+ curlcv.notify_all();
+ }
+ p->databufToQ(headbuf.buf, headbuf.bytes);
headbuf.bytes = 0;
}
- return databufToQ(contents, bcnt);
+ if (p->datacount() == 0) {
+ curlcv.notify_all();
+ }
+ return p->databufToQ(contents, bcnt);
}
static int debug_callback(CURL *curl,
@@ -440,8 +389,8 @@
{unique_lock<mutex> lock(curlmutex);
curldone = true;
}
- if (outqueue) {
- outqueue->setTerminate();
+ if (p->outqueue) {
+ p->outqueue->setTerminate();
}
curlcv.notify_all();
return;
@@ -468,14 +417,14 @@
}
LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
- curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
- if (startoffset) {
+ curl_easy_setopt(curl, CURLOPT_URL, p->_url.c_str());
+ if (p->startoffset) {
char range[32];
- sprintf(range, "%llu-", (unsigned long long)startoffset);
+ sprintf(range, "%llu-", (unsigned long long)p->startoffset);
curl_easy_setopt(curl, CURLOPT_RANGE, range);
}
- if (timeoutsecs) {
- curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
+ if (p->timeoutsecs) {
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, p->timeoutsecs);
}
curl_code = curl_easy_perform(curl);
@@ -507,7 +456,8 @@
if (headbuf.bytes) {
LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
headbuf.bytes << " bytes\n");
- databufToQ(headbuf.buf, headbuf.bytes);
+ curlcv.notify_all();
+ p->databufToQ(headbuf.buf, headbuf.bytes);
headbuf.bytes = 0;
}
curlfd = -1;
@@ -520,39 +470,20 @@
// Wake up other side with empty buffer (eof)
LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
ABuffer *buf = new ABuffer(0);
- if (!outqueue || !outqueue->put(buf)) {
+ if (!p->outqueue || !p->outqueue->put(buf)) {
delete buf;
}
- if (outqueue) {
+ if (p->outqueue) {
// Wait for our zero buffer to be acknowledged before
// killing the queue
LOGDEB1("CurlFetch::curlworker: waitidle\n");
- outqueue->waitIdle();
- }
- }
- outqueue->setTerminate();
- if (eofcb) {
- eofcb(curl_code == CURLE_OK, curl_data_count);
+ p->outqueue->waitIdle();
+ }
+ }
+ p->outqueue->setTerminate();
+ if (p->eofcb) {
+ p->eofcb(curl_code == CURLE_OK, p->datacount());
}
LOGDEB1("CurlFetch::curlworker: done\n");
return;
}
-
-void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
-{
- m->eofcb = eofcb;
-}
-void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
-{
- m->fbcb = fbcb;
-}
-void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
-{
- m->buf1cb = func;
-}
-
-void CurlFetch::setTimeout(int secs)
-{
- m->timeoutsecs = secs;
-}
-