Switch to side-by-side view

--- a
+++ b/src/mediaserver/cdplugins/curlfetch.cpp
@@ -0,0 +1,515 @@
+//#define LOGGER_LOCAL_LOGINC 2
+
+#include "curlfetch.h"
+
+#include <string.h>
+#include <unistd.h>
+
+#include <string>
+#include <mutex>
+
+#include <curl/curl.h>
+
+#include "smallut.h"
+#include "log.h"
+
+using namespace std;
+
+// Global libcurl initialization.
+class CurlInit {
+public:
+    CurlInit() {
+        int opts = CURL_GLOBAL_ALL;
+#ifdef CURL_GLOBAL_ACK_EINTR
+        opts |= CURL_GLOBAL_ACK_EINTR;
+#endif
+        curl_global_init(opts);
+    }
+};
+static CurlInit curlglobalinit;
+
+
+class CurlFetch::Internal {
+public:
+    Internal(const string& _url) : url(_url) {}
+    ~Internal();
+
+    void curlWorkerFunc();
+    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
+    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
+    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
+    size_t databufToQ(const void *contents, size_t size);
+    
+    string url;
+    uint64_t startoffset;
+    int timeoutsecs{0};
+    CURL *curl{nullptr};
+    curl_socket_t curlfd{-1};
+    u_int64_t curl_data_count{0};
+    std::thread curlworker;
+    CURLcode  curl_code{CURLE_OK};
+    long curl_http_code{200};
+    
+    // Is the curl operation thread running ?
+    bool curlrunning{false};
+    bool curldone{false};
+    bool aborting{false}; // Any waiting loop must abort asap
+    
+    // Count of client threads waiting for headers (normally 0/1)
+    int extWaitingThreads{0};
+
+    // Header values if we get them
+    bool headers_ok{false};
+    map<string, string> headers;
+
+    BufXChange<ABuffer*> *outqueue{nullptr};
+
+    // We pre-buffer the beginning of the stream so that the first
+    // block we actually release is big enough for header forensics.
+    ABuffer headbuf{1024};
+    
+    // Synchronization
+    condition_variable curlcv;
+    mutex curlmutex;
+
+    function<void(bool, u_int64_t)> eofcb;
+    function<void(u_int64_t)> fbcb;
+    function<bool(string&,void *,int)> buf1cb;
+};
+
+CurlFetch::CurlFetch(const std::string& url)
+{
+    m = std::unique_ptr<Internal>(new Internal(url));
+}
+
+CurlFetch::~CurlFetch()
+{
+}
+
+CurlFetch::Internal::~Internal()
+{
+    LOGDEB1("CurlFetch::Internal::~Internal\n");
+    unique_lock<mutex> lock(curlmutex);
+    aborting = true;
+    if (curlfd >= 0) {
+        close(curlfd);
+        curlfd = -1;
+    }
+    if (outqueue) {
+        outqueue->setTerminate();
+    }
+    curlcv.notify_all();
+    while (extWaitingThreads > 0) {
+        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
+                extWaitingThreads << endl);
+        curlcv.notify_all();
+        LOGDEB1("CurlFetch::~CurlFetch: waiting for ext thread wkup\n");
+        curlcv.wait(lock);
+    }
+    while (!curlworker.joinable()) {
+        curlcv.wait(lock);
+    }
+    curlworker.join();
+    if (curl) {
+        curl_easy_cleanup(curl);
+        curl = nullptr;
+    }
+    LOGDEB1("CurlFetch::CurlFetch::~Internal: done\n");
+}
+
+bool CurlFetch::start(BufXChange<ABuffer*> *queue, uint64_t offset)
+{
+    LOGDEB0("CurlFetch::start\n");
+    unique_lock<mutex> lock(m->curlmutex);
+    if (m->curlrunning || m->aborting) {
+        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
+        return false;
+    }
+    m->curldone = false;
+    if (nullptr == queue) {
+        LOGERR("CurlFetch::start: called with nullptr\n");
+        return false;
+    }
+    // We return after the curl thread is actually running
+    m->outqueue = queue;
+    m->startoffset = offset;
+    m->curlworker =
+        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
+    while (!m->curlrunning && !m->curldone && !m->aborting) {
+        if (m->aborting) {
+            return false;
+        }
+        m->curlcv.wait(lock);
+    }
+    return true;
+}
+
+bool CurlFetch::curlDone(int *curlcode, long *http_code)
+{
+    LOGDEB1("CurlTRans::curlDone: running: " << m->curlrunning << endl);
+    unique_lock<mutex> lock(m->curlmutex);
+    if (!m->curldone) {
+        return false;
+    }
+    if (curlcode) {
+        *curlcode = int(m->curl_code);
+    }
+    if (http_code) {
+        *http_code = m->curl_http_code;
+    }
+    return true;
+}
+
+bool CurlFetch::waitForHeaders(int secs)
+{
+    LOGDEB1("CurlFetch::waitForHeaders\n");
+    unique_lock<mutex> lock(m->curlmutex);
+    m->extWaitingThreads++;
+    // We actually wait for the 1st buffer write call, so that we also
+    // get an initial status like 404 (we'll wake up on curlrunning==0 in
+    // this case)
+    while (m->curlrunning && !m->aborting &&
+           m->curl_data_count + m->headbuf.bytes== 0) {
+        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning <<
+               " aborting " << m->aborting << " datacount " <<
+               m->curl_data_count + m->headbuf.bytes << "\n");
+        if (m->aborting) {
+            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
+            m->extWaitingThreads--;
+            return false;
+        }
+        if (secs) {
+            if (m->curlcv.wait_for(lock, std::chrono::seconds(secs)) ==
+                std::cv_status::timeout) {
+                LOGERR("CurlFetch::waitForHeaders: timeout\n");
+                break;
+            }
+        } else {
+            m->curlcv.wait(lock);
+        }
+    }
+    m->extWaitingThreads--;
+    m->curlcv.notify_all();
+    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
+            m->headers_ok << " curlrunning " << m->curlrunning <<
+            " aborting " << m->aborting << " datacnt " <<
+            m->curl_data_count+ m->headbuf.bytes << endl);
+    return m->headers_ok;
+}
+
+bool CurlFetch::headerValue(const string& hname, string& val)
+{
+    unique_lock<mutex> lock(m->curlmutex);
+    if (!m->headers_ok) {
+        LOGERR("CurlFetch::headerValue: called with headers_ok == false\n");
+        return false;
+    }
+    auto it = m->headers.find(hname);
+    if (it != m->headers.end()) {
+        val = it->second;
+    } else {
+        LOGERR("CurlFetch::headerValue: header " << hname << " not found\n");
+        return false;
+    }
+    return true;
+}
+
+static size_t
+curl_header_cb(void *contents, size_t size, size_t nmemb, void *userp)
+{
+    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
+    return me ? me->curlHeaderCB(contents, size, nmemb) : -1;
+}
+
+size_t
+CurlFetch::Internal::curlHeaderCB(void *contents, size_t size, size_t cnt)
+{
+    size_t bcnt = size * cnt;
+    string header((char *)contents, bcnt);
+    trimstring(header, " \t\r\n");
+    LOGDEB1("CurlFetch::curlHeaderCB: header: [" << header << "]\n");
+    unique_lock<mutex> lock(curlmutex);
+    if (header.empty()) {
+        // End of headers
+        LOGDEB1("CurlFetch::curlHeaderCB: wake them up\n");
+        headers_ok = true;
+        curlcv.notify_all();
+    } else {
+        LOGDEB1("curlHeaderCB: got " << header << endl);
+        string::size_type colon = header.find(":");
+        if (string::npos != colon) {
+            string hname = header.substr(0, colon);
+            stringtolower(hname);
+            string val = header.substr(colon+1);
+            trimstring(val);
+            headers[hname] = val;
+        }
+    }
+    return bcnt;
+}
+
+static int
+curl_sockopt_cb(void *userp, curl_socket_t curlfd, curlsocktype purpose)
+{
+    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
+    return me ? me->curlSockoptCB(curlfd, purpose) : -1;
+}
+
+int CurlFetch::Internal::curlSockoptCB(curl_socket_t cfd, curlsocktype)
+{
+    unique_lock<mutex> lock(curlmutex);
+    curlfd = cfd;
+    return CURL_SOCKOPT_OK;
+}
+
+// This is always called with the lock held
+size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
+{
+    LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
+    
+    ABuffer *buf = nullptr;
+    // Try to recover an empty buffer from the queue, else allocate one.
+    if (outqueue && outqueue->take_recycled(&buf)) {
+        if (buf->allocbytes < bcnt) {
+            delete buf;
+            buf = nullptr;
+        }
+    }
+    if (buf == nullptr) {
+        buf = new ABuffer(MAX(4096, bcnt));
+    }
+    if (buf == nullptr) {
+        LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
+               " bytes\n");
+        return 0;
+    }
+    memcpy(buf->buf, contents, bcnt);
+    buf->bytes = bcnt;
+    buf->curoffs = 0;
+
+    LOGDEB2("CurlFetch::calling put on " <<
+            (outqueue ? outqueue->getname() : "null") << endl);
+    
+    if (!outqueue->put(buf)) {
+        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
+        delete buf;
+        return -1;
+    }
+
+    bool first = (curl_data_count == 0);
+    curl_data_count += bcnt;
+    if (first) {
+        curlcv.notify_all();
+    }
+    if (fbcb) {
+        fbcb(curl_data_count);
+    }
+    LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
+    return bcnt;
+}
+
+static size_t
+curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
+{
+    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
+    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
+}
+
+#undef DUMP_CONTENTS
+#ifdef DUMP_CONTENTS
+#include "listmem.h"
+#endif
+
+size_t CurlFetch::Internal::curlWriteCB(void *contents, size_t size, size_t cnt)
+{
+    size_t bcnt = size * cnt;
+
+#ifdef DUMP_CONTENTS
+    LOGDEB("CurlWriteCB: bcnt " << bcnt << " headbuf.bytes " <<
+           headbuf.bytes << endl);
+    listmem(cerr, contents, MIN(bcnt, 128));
+#endif
+
+    unique_lock<mutex> lock(curlmutex);
+    if (curl_data_count == 0 && headbuf.bytes < 1024) {
+        if (!headbuf.append((const char *)contents, bcnt)) {
+            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
+            curlcv.notify_all();
+            return -1;
+        } else {
+            curlcv.notify_all();
+            return bcnt;
+        }
+    }
+    
+    if (curl_data_count == 0 && buf1cb) {
+        string sbuf;
+        if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
+            return -1;
+        }
+        if (sbuf.size()) {
+            if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
+                return -1;
+            }
+        }
+    }
+    
+    if (headbuf.bytes) {
+        databufToQ(headbuf.buf, headbuf.bytes);
+        headbuf.bytes = 0;
+    }
+    return databufToQ(contents, bcnt);
+}
+
+static int debug_callback(CURL *curl,
+                          curl_infotype type,
+                          char *data,
+                          size_t size,
+                          void *userptr)
+{
+    string dt(data, size);
+    string tt;
+    switch (type) {
+    case CURLINFO_TEXT: tt = "== Info"; break;
+    default: tt = " ??? "; break;
+    case CURLINFO_HEADER_OUT: tt = "=> Send header"; break;
+    case CURLINFO_DATA_OUT: tt = "=> Send data"; break;
+    case CURLINFO_SSL_DATA_OUT: tt = "=> Send SSL data"; break;
+    case CURLINFO_HEADER_IN: tt = "<= Recv header"; break;
+    case CURLINFO_DATA_IN:
+        //LOGDEB("CURL: <= Recv data. cnt: " << size << endl);
+        //listmem(cerr, data, 16);
+        return 0;
+    }
+    LOGDEB("---CURL: " << tt << " " << dt);
+    return 0; 
+}
+
+void CurlFetch::Internal::curlWorkerFunc()
+{
+    LOGDEB1("CurlFetch::curlWorkerFunc\n");
+    (void)debug_callback;
+    
+    {unique_lock<mutex> lock(curlmutex);
+        curlrunning = true;
+    }
+    // Tell the world we're active (start is waiting for this).
+    curlcv.notify_all();
+    
+    if (!curl) {
+        curl = curl_easy_init();
+        if(!curl) {
+            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
+            {unique_lock<mutex> lock(curlmutex);
+                curlrunning = false;
+                curldone = true;
+            }
+            if (outqueue) {
+                outqueue->setTerminate();
+            }
+            curlcv.notify_all();
+            return;
+        }
+        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb);
+        curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
+        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, curl_header_cb);
+        curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
+        curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, curl_sockopt_cb);
+        curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, this);
+
+        curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5);
+        // Speedlimit is in bytes/S. 32Kbits/S
+        curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 4L);
+        curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 60);
+
+        //curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+        //curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debug_callback);
+
+        // Chunk decoding: this is the default
+        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
+    }
+    
+    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
+    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+    if (startoffset) {
+        char range[32];
+        sprintf(range, "%llu-", (unsigned long long)startoffset);
+        curl_easy_setopt(curl, CURLOPT_RANGE, range);
+    }
+    if (timeoutsecs) {
+        curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
+    }
+
+    curl_code = curl_easy_perform(curl);
+    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
+
+    bool http_ok = false;
+    if (curl_code == CURLE_OK) {
+        curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &curl_http_code);
+        http_ok = curl_http_code >= 200 && curl_http_code < 300;
+    }
+
+    {unique_lock<mutex> lock(curlmutex);
+        if (aborting) {
+            return;
+        }
+        if (headbuf.bytes) {
+            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
+                   headbuf.bytes << " bytes\n");
+            databufToQ(headbuf.buf, headbuf.bytes);
+            headbuf.bytes = 0;
+        }
+        curlfd = -1;
+        curlrunning = false;
+        curldone = true;
+        curlcv.notify_all();
+    }
+
+    if (curl_code != CURLE_OK || !http_ok) {
+        if (curl_code != CURLE_OK) {
+            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_perform(): " <<
+                   curl_easy_strerror(curl_code) << endl);
+        } else {
+            LOGDEB("CurlFetch::curlWorkerFunc: curl_easy_perform(): http code: "
+                   << curl_http_code << endl);
+        }
+    }
+
+    if (curl_code == CURLE_OK) {
+        // Wake up other side with empty buffer (eof)
+        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
+        ABuffer *buf = new ABuffer(0);
+        if (!outqueue || !outqueue->put(buf)) {
+            delete buf;
+        }
+        if (outqueue) {
+            outqueue->waitIdle();
+        }
+    }
+    if (eofcb) {
+        eofcb(curl_code == CURLE_OK, curl_data_count);
+    }
+    LOGDEB1("CurlFetch::curlworker: done\n");
+    return;
+}
+
+void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
+{
+    m->eofcb = eofcb;
+}
+void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
+{
+    m->fbcb = fbcb;
+}
+void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
+{
+    m->buf1cb = func;
+}
+
+void CurlFetch::setTimeout(int secs)
+{
+    m->timeoutsecs = secs;
+}
+