Switch to side-by-side view

--- a/src/mediaserver/cdplugins/curlfetch.cpp
+++ b/src/mediaserver/cdplugins/curlfetch.cpp
@@ -31,7 +31,8 @@
 
 class CurlFetch::Internal {
 public:
-    Internal(const string& _url) : url(_url) {}
+    Internal(CurlFetch* parent)
+        : p(parent) {}
     ~Internal();
     bool curlrunning() {
         return curlworker.joinable();
@@ -41,15 +42,11 @@
     size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
     size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
     int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
-    size_t databufToQ(const void *contents, size_t size);
-    
-    string url;
-    uint64_t startoffset;
-    int timeoutsecs{0};
+
+    CurlFetch *p{nullptr};
     CURL *curl{nullptr};
     // The socket is used to kill any waiting by curl when we want to abort
     curl_socket_t curlfd{-1};
-    u_int64_t curl_data_count{0};
     std::thread curlworker;
     bool curldone{false};
     CURLcode  curl_code{CURLE_OK};
@@ -65,8 +62,6 @@
     bool headers_ok{false};
     map<string, string> headers;
 
-    BufXChange<ABuffer*> *outqueue{nullptr};
-
     // We pre-buffer the beginning of the stream so that the first
     // block we actually release is always big enough for header
     // forensics.
@@ -75,24 +70,16 @@
     // Synchronization
     condition_variable curlcv;
     mutex curlmutex;
-
-    // User callbacks
-    function<void(bool, u_int64_t)> eofcb;
-    function<void(u_int64_t)> fbcb;
-    function<bool(string&,void *,int)> buf1cb;
 };
 
 CurlFetch::CurlFetch(const std::string& url)
-{
-    m = std::unique_ptr<Internal>(new Internal(url));
+    : NetFetch(url)
+{
+    m = std::unique_ptr<Internal>(new Internal(this));
 }
 
 CurlFetch::~CurlFetch()
 {
-}
-const string& CurlFetch::url()
-{
-    return m->url;
 }
 
 CurlFetch::Internal::~Internal()
@@ -104,8 +91,8 @@
         close(curlfd);
         curlfd = -1;
     }
-    if (outqueue) {
-        outqueue->setTerminate();
+    if (p->outqueue) {
+        p->outqueue->setTerminate();
     }
     curlcv.notify_all();
     while (extWaitingThreads > 0) {
@@ -140,8 +127,8 @@
         return false;
     }
     // We return after the curl thread is actually running
-    m->outqueue = queue;
-    m->startoffset = offset;
+    outqueue = queue;
+    startoffset = offset;
     m->curlworker =
         std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
     while (!(m->curlrunning() || m->curldone || m->aborting)) {
@@ -156,7 +143,7 @@
     return true;
 }
 
-void  CurlFetch::reset()
+bool CurlFetch::reset()
 {
     if (m->curlworker.joinable()) {
         m->curlworker.join();
@@ -164,27 +151,39 @@
     m->curldone = false;
     m->curl_code = CURLE_OK;
     m->curl_http_code = 200;
-    m->curl_data_count = 0;
-    m->outqueue->reset();
-}
-
-bool CurlFetch::curlDone(int *curlcode, int *http_code)
-{
-    LOGDEB1("CurlFetch::curlDone: running: " << m->curlrunning() <<
+    fetch_data_count = 0;
+    outqueue->reset();
+    return true;
+}
+
+bool CurlFetch::fetchDone(FetchStatus *code, int *http_code)
+{
+    LOGDEB1("CurlFetch::fetchDone: running: " << m->curlrunning() <<
            " curldone " << m->curldone << endl);
     unique_lock<mutex> lock(m->curlmutex);
     if (!m->curldone) {
         return false;
     }
-    LOGDEB1("CurlFetch::curlDone: curlcode " << m->curl_code << " httpcode " <<
+    LOGDEB1("CurlFetch::fetchDone: curlcode " << m->curl_code << " httpcode " <<
            m->curl_http_code << endl);
-    if (curlcode) {
-        *curlcode = int(m->curl_code);
+    if (code) {
+        switch (m->curl_code) {
+        case CURLE_PARTIAL_FILE:
+        case CURLE_RECV_ERROR:
+            *code = NetFetch::FETCH_RETRYABLE;
+            break;
+        case CURLE_OK:
+            *code = NetFetch::FETCH_OK;
+            break;
+        default:
+            *code = NetFetch::FETCH_FATAL;
+            break;
+        }
     }
     if (http_code) {
         *http_code = m->curl_http_code;
     }
-    LOGDEB1("CurlTRans::curlDone: done\n");
+    LOGDEB1("CurlTRans::fetchDone: done\n");
     return true;
 }
 
@@ -196,10 +195,10 @@
     // We wait for the 1st buffer write call. If there is no data,
     // we'll stop on curldone.
     while (m->curlrunning() && !m->aborting && !m->curldone && 
-           m->curl_data_count + m->headbuf.bytes == 0) {
+           fetch_data_count + m->headbuf.bytes == 0) {
         LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
                " aborting " << m->aborting << " datacount " <<
-               m->curl_data_count + m->headbuf.bytes << "\n");
+               fetch_data_count + m->headbuf.bytes << "\n");
         if (m->aborting) {
             LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
             m->extWaitingThreads--;
@@ -220,7 +219,7 @@
     LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
             m->headers_ok << " curlrunning " << m->curlrunning() <<
             " aborting " << m->aborting << " datacnt " <<
-            m->curl_data_count+ m->headbuf.bytes << endl);
+            fetch_data_count+ m->headbuf.bytes << endl);
     return m->headers_ok;
 }
 
@@ -289,52 +288,6 @@
     return CURL_SOCKOPT_OK;
 }
 
-// This is always called with the lock held
-size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
-{
-    LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
-    
-    ABuffer *buf = nullptr;
-    // Try to recover an empty buffer from the queue, else allocate one.
-    if (outqueue && outqueue->take_recycled(&buf)) {
-        if (buf->allocbytes < bcnt) {
-            delete buf;
-            buf = nullptr;
-        }
-    }
-    if (buf == nullptr) {
-        buf = new ABuffer(MAX(4096, bcnt));
-    }
-    if (buf == nullptr) {
-        LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
-               " bytes\n");
-        return 0;
-    }
-    memcpy(buf->buf, contents, bcnt);
-    buf->bytes = bcnt;
-    buf->curoffs = 0;
-
-    LOGDEB1("CurlFetch::calling put on " <<
-            (outqueue ? outqueue->getname() : "null") << endl);
-    
-    if (!outqueue->put(buf)) {
-        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
-        delete buf;
-        return -1;
-    }
-
-    bool first = (curl_data_count == 0);
-    curl_data_count += bcnt;
-    if (first) {
-        curlcv.notify_all();
-    }
-    if (fbcb) {
-        fbcb(curl_data_count);
-    }
-    LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
-    return bcnt;
-}
-
 static size_t
 curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
 {
@@ -358,7 +311,7 @@
 #endif
 
     unique_lock<mutex> lock(curlmutex);
-    if (curl_data_count == 0 && headbuf.bytes < 1024) {
+    if (p->datacount() == 0 && headbuf.bytes < 1024) {
         if (!headbuf.append((const char *)contents, bcnt)) {
             LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
             curlcv.notify_all();
@@ -369,23 +322,30 @@
         }
     }
     
-    if (curl_data_count == 0 && buf1cb) {
+    if (p->datacount() == 0 && p->buf1cb) {
         string sbuf;
-        if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
+        if (!p->buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
             return -1;
         }
         if (sbuf.size()) {
-            if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
+            curlcv.notify_all();
+            if (p->databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
                 return -1;
             }
         }
     }
     
     if (headbuf.bytes) {
-        databufToQ(headbuf.buf, headbuf.bytes);
+        if (p->datacount() == 0) {
+            curlcv.notify_all();
+        }
+        p->databufToQ(headbuf.buf, headbuf.bytes);
         headbuf.bytes = 0;
     }
-    return databufToQ(contents, bcnt);
+    if (p->datacount() == 0) {
+        curlcv.notify_all();
+    }
+    return p->databufToQ(contents, bcnt);
 }
 
 static int debug_callback(CURL *curl,
@@ -429,8 +389,8 @@
             {unique_lock<mutex> lock(curlmutex);
                 curldone = true;
             }
-            if (outqueue) {
-                outqueue->setTerminate();
+            if (p->outqueue) {
+                p->outqueue->setTerminate();
             }
             curlcv.notify_all();
             return;
@@ -457,14 +417,14 @@
     }
     
     LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
-    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
-    if (startoffset) {
+    curl_easy_setopt(curl, CURLOPT_URL, p->_url.c_str());
+    if (p->startoffset) {
         char range[32];
-        sprintf(range, "%llu-", (unsigned long long)startoffset);
+        sprintf(range, "%llu-", (unsigned long long)p->startoffset);
         curl_easy_setopt(curl, CURLOPT_RANGE, range);
     }
-    if (timeoutsecs) {
-        curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
+    if (p->timeoutsecs) {
+        curl_easy_setopt(curl, CURLOPT_TIMEOUT, p->timeoutsecs);
     }
 
     curl_code = curl_easy_perform(curl);
@@ -496,7 +456,8 @@
         if (headbuf.bytes) {
             LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
                    headbuf.bytes << " bytes\n");
-            databufToQ(headbuf.buf, headbuf.bytes);
+            curlcv.notify_all();
+            p->databufToQ(headbuf.buf, headbuf.bytes);
             headbuf.bytes = 0;
         }
         curlfd = -1;
@@ -509,39 +470,20 @@
         // Wake up other side with empty buffer (eof)
         LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
         ABuffer *buf = new ABuffer(0);
-        if (!outqueue || !outqueue->put(buf)) {
+        if (!p->outqueue || !p->outqueue->put(buf)) {
             delete buf;
         }
-        if (outqueue) {
+        if (p->outqueue) {
             // Wait for our zero buffer to be acknowledged before
             // killing the queue
             LOGDEB1("CurlFetch::curlworker: waitidle\n");
-            outqueue->waitIdle();
-        }
-    }
-    outqueue->setTerminate();
-    if (eofcb) {
-        eofcb(curl_code == CURLE_OK, curl_data_count);
+            p->outqueue->waitIdle();
+        }
+    }
+    p->outqueue->setTerminate();
+    if (p->eofcb) {
+        p->eofcb(curl_code == CURLE_OK, p->datacount());
     }
     LOGDEB1("CurlFetch::curlworker: done\n");
     return;
 }
-
-void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
-{
-    m->eofcb = eofcb;
-}
-void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
-{
-    m->fbcb = fbcb;
-}
-void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
-{
-    m->buf1cb = func;
-}
-
-void CurlFetch::setTimeout(int secs)
-{
-    m->timeoutsecs = secs;
-}
-