--- a/src/mediaserver/cdplugins/curlfetch.cpp
+++ b/src/mediaserver/cdplugins/curlfetch.cpp
@@ -33,6 +33,9 @@
public:
Internal(const string& _url) : url(_url) {}
~Internal();
+ bool curlrunning() {
+ return curlworker.joinable();
+ }
void curlWorkerFunc();
size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
@@ -44,16 +47,16 @@
uint64_t startoffset;
int timeoutsecs{0};
CURL *curl{nullptr};
+ // The socket is used to kill any waiting by curl when we want to abort
curl_socket_t curlfd{-1};
u_int64_t curl_data_count{0};
std::thread curlworker;
+ bool curldone{false};
CURLcode curl_code{CURLE_OK};
- long curl_http_code{200};
-
- // Is the curl operation thread running ?
- bool curlrunning{false};
- bool curldone{false};
- bool aborting{false}; // Any waiting loop must abort asap
+ int curl_http_code{200};
+
+ // In destructor: any waiting loop must abort asap
+ bool aborting{false};
// Count of client threads waiting for headers (normally 0/1)
int extWaitingThreads{0};
@@ -65,13 +68,15 @@
BufXChange<ABuffer*> *outqueue{nullptr};
// We pre-buffer the beginning of the stream so that the first
- // block we actually release is big enough for header forensics.
+ // block we actually release is always big enough for header
+ // forensics.
ABuffer headbuf{1024};
// Synchronization
condition_variable curlcv;
mutex curlmutex;
+ // User callbacks
function<void(bool, u_int64_t)> eofcb;
function<void(u_int64_t)> fbcb;
function<bool(string&,void *,int)> buf1cb;
@@ -84,6 +89,10 @@
CurlFetch::~CurlFetch()
{
+}
+const string& CurlFetch::url()
+{
+ return m->url;
}
CurlFetch::Internal::~Internal()
@@ -106,10 +115,9 @@
LOGDEB1("CurlFetch::~CurlFetch: waiting for ext thread wkup\n");
curlcv.wait(lock);
}
- while (!curlworker.joinable()) {
- curlcv.wait(lock);
- }
- curlworker.join();
+ if (curlworker.joinable()) {
+ curlworker.join();
+ }
if (curl) {
curl_easy_cleanup(curl);
curl = nullptr;
@@ -119,15 +127,16 @@
bool CurlFetch::start(BufXChange<ABuffer*> *queue, uint64_t offset)
{
- LOGDEB0("CurlFetch::start\n");
- unique_lock<mutex> lock(m->curlmutex);
- if (m->curlrunning || m->aborting) {
- LOGERR("CurlFetch::start: called with transfer active or aborted\n");
- return false;
- }
- m->curldone = false;
+ LOGDEB1("CurlFetch::start\n");
if (nullptr == queue) {
LOGERR("CurlFetch::start: called with nullptr\n");
+ return false;
+ }
+
+ unique_lock<mutex> lock(m->curlmutex);
+ LOGDEB1("CurlFetch::start: got lock\n");
+ if (m->curlrunning() || m->aborting) {
+ LOGERR("CurlFetch::start: called with transfer active or aborted\n");
return false;
}
// We return after the curl thread is actually running
@@ -135,28 +144,47 @@
m->startoffset = offset;
m->curlworker =
std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
- while (!m->curlrunning && !m->curldone && !m->aborting) {
+ while (!(m->curlrunning() || m->curldone || m->aborting)) {
+ LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
+ m->curldone << " aborting " << m->aborting << endl);
if (m->aborting) {
return false;
}
m->curlcv.wait(lock);
}
+ LOGDEB1("CurlFetch::start: returning\n");
return true;
}
-bool CurlFetch::curlDone(int *curlcode, long *http_code)
-{
- LOGDEB1("CurlTRans::curlDone: running: " << m->curlrunning << endl);
+void CurlFetch::reset()
+{
+ if (m->curlworker.joinable()) {
+ m->curlworker.join();
+ }
+ m->curldone = false;
+ m->curl_code = CURLE_OK;
+ m->curl_http_code = 200;
+ m->curl_data_count = 0;
+ m->outqueue->reset();
+}
+
+bool CurlFetch::curlDone(int *curlcode, int *http_code)
+{
+ LOGDEB1("CurlFetch::curlDone: running: " << m->curlrunning() <<
+ " curldone " << m->curldone << endl);
unique_lock<mutex> lock(m->curlmutex);
if (!m->curldone) {
return false;
}
+ LOGDEB1("CurlFetch::curlDone: curlcode " << m->curl_code << " httpcode " <<
+ m->curl_http_code << endl);
if (curlcode) {
*curlcode = int(m->curl_code);
}
if (http_code) {
*http_code = m->curl_http_code;
}
+ LOGDEB1("CurlTRans::curlDone: done\n");
return true;
}
@@ -165,12 +193,11 @@
LOGDEB1("CurlFetch::waitForHeaders\n");
unique_lock<mutex> lock(m->curlmutex);
m->extWaitingThreads++;
- // We actually wait for the 1st buffer write call, so that we also
- // get an initial status like 404 (we'll wake up on curlrunning==0 in
- // this case)
- while (m->curlrunning && !m->aborting &&
- m->curl_data_count + m->headbuf.bytes== 0) {
- LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning <<
+ // We wait for the 1st buffer write call. If there is no data,
+ // we'll stop on curldone.
+ while (m->curlrunning() && !m->aborting && !m->curldone &&
+ m->curl_data_count + m->headbuf.bytes == 0) {
+ LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
" aborting " << m->aborting << " datacount " <<
m->curl_data_count + m->headbuf.bytes << "\n");
if (m->aborting) {
@@ -191,7 +218,7 @@
m->extWaitingThreads--;
m->curlcv.notify_all();
LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
- m->headers_ok << " curlrunning " << m->curlrunning <<
+ m->headers_ok << " curlrunning " << m->curlrunning() <<
" aborting " << m->aborting << " datacnt " <<
m->curl_data_count+ m->headbuf.bytes << endl);
return m->headers_ok;
@@ -287,7 +314,7 @@
buf->bytes = bcnt;
buf->curoffs = 0;
- LOGDEB2("CurlFetch::calling put on " <<
+ LOGDEB1("CurlFetch::calling put on " <<
(outqueue ? outqueue->getname() : "null") << endl);
if (!outqueue->put(buf)) {
@@ -391,17 +418,15 @@
(void)debug_callback;
{unique_lock<mutex> lock(curlmutex);
- curlrunning = true;
- }
- // Tell the world we're active (start is waiting for this).
- curlcv.notify_all();
+ // Tell the world we're active (start is waiting for this).
+ curlcv.notify_all();
+ }
if (!curl) {
curl = curl_easy_init();
if(!curl) {
LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
{unique_lock<mutex> lock(curlmutex);
- curlrunning = false;
curldone = true;
}
if (outqueue) {
@@ -451,22 +476,7 @@
http_ok = curl_http_code >= 200 && curl_http_code < 300;
}
- {unique_lock<mutex> lock(curlmutex);
- if (aborting) {
- return;
- }
- if (headbuf.bytes) {
- LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
- headbuf.bytes << " bytes\n");
- databufToQ(headbuf.buf, headbuf.bytes);
- headbuf.bytes = 0;
- }
- curlfd = -1;
- curlrunning = false;
- curldone = true;
- curlcv.notify_all();
- }
-
+ // Log/Debug
if (curl_code != CURLE_OK || !http_ok) {
if (curl_code != CURLE_OK) {
LOGERR("CurlFetch::curlWorkerFunc: curl_easy_perform(): " <<
@@ -477,6 +487,24 @@
}
}
+ LOGDEB1("CurlFetch::curlWorker: locking\n");
+ {unique_lock<mutex> lock(curlmutex);
+ LOGDEB1("CurlFetch::curlWorker: locked\n");
+ if (aborting) {
+ return;
+ }
+ if (headbuf.bytes) {
+ LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
+ headbuf.bytes << " bytes\n");
+ databufToQ(headbuf.buf, headbuf.bytes);
+ headbuf.bytes = 0;
+ }
+ curlfd = -1;
+ curldone = true;
+ curlcv.notify_all();
+ }
+
+ // Normal eos
if (curl_code == CURLE_OK) {
// Wake up other side with empty buffer (eof)
LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
@@ -485,9 +513,13 @@
delete buf;
}
if (outqueue) {
+ // Wait for our zero buffer to be acknowledged before
+ // killing the queue
+ LOGDEB1("CurlFetch::curlworker: waitidle\n");
outqueue->waitIdle();
}
}
+ outqueue->setTerminate();
if (eofcb) {
eofcb(curl_code == CURLE_OK, curl_data_count);
}