Switch to unified view

a b/src/mediaserver/cdplugins/curlfetch.cpp
1
//#define LOGGER_LOCAL_LOGINC 2
2
3
#include "curlfetch.h"
4
5
#include <string.h>
6
#include <unistd.h>
7
8
#include <string>
9
#include <mutex>
10
11
#include <curl/curl.h>
12
13
#include "smallut.h"
14
#include "log.h"
15
16
using namespace std;
17
18
// Global libcurl initialization.
19
class CurlInit {
20
public:
21
    CurlInit() {
22
        int opts = CURL_GLOBAL_ALL;
23
#ifdef CURL_GLOBAL_ACK_EINTR
24
        opts |= CURL_GLOBAL_ACK_EINTR;
25
#endif
26
        curl_global_init(opts);
27
    }
28
};
29
static CurlInit curlglobalinit;
30
31
32
class CurlFetch::Internal {
33
public:
34
    Internal(const string& _url) : url(_url) {}
35
    ~Internal();
36
37
    void curlWorkerFunc();
38
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
39
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
40
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
41
    size_t databufToQ(const void *contents, size_t size);
42
    
43
    string url;
44
    uint64_t startoffset;
45
    int timeoutsecs{0};
46
    CURL *curl{nullptr};
47
    curl_socket_t curlfd{-1};
48
    u_int64_t curl_data_count{0};
49
    std::thread curlworker;
50
    CURLcode  curl_code{CURLE_OK};
51
    long curl_http_code{200};
52
    
53
    // Is the curl operation thread running ?
54
    bool curlrunning{false};
55
    bool curldone{false};
56
    bool aborting{false}; // Any waiting loop must abort asap
57
    
58
    // Count of client threads waiting for headers (normally 0/1)
59
    int extWaitingThreads{0};
60
61
    // Header values if we get them
62
    bool headers_ok{false};
63
    map<string, string> headers;
64
65
    BufXChange<ABuffer*> *outqueue{nullptr};
66
67
    // We pre-buffer the beginning of the stream so that the first
68
    // block we actually release is big enough for header forensics.
69
    ABuffer headbuf{1024};
70
    
71
    // Synchronization
72
    condition_variable curlcv;
73
    mutex curlmutex;
74
75
    function<void(bool, u_int64_t)> eofcb;
76
    function<void(u_int64_t)> fbcb;
77
    function<bool(string&,void *,int)> buf1cb;
78
};
79
80
CurlFetch::CurlFetch(const std::string& url)
81
{
82
    m = std::unique_ptr<Internal>(new Internal(url));
83
}
84
85
CurlFetch::~CurlFetch()
86
{
87
}
88
89
CurlFetch::Internal::~Internal()
90
{
91
    LOGDEB1("CurlFetch::Internal::~Internal\n");
92
    unique_lock<mutex> lock(curlmutex);
93
    aborting = true;
94
    if (curlfd >= 0) {
95
        close(curlfd);
96
        curlfd = -1;
97
    }
98
    if (outqueue) {
99
        outqueue->setTerminate();
100
    }
101
    curlcv.notify_all();
102
    while (extWaitingThreads > 0) {
103
        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
104
                extWaitingThreads << endl);
105
        curlcv.notify_all();
106
        LOGDEB1("CurlFetch::~CurlFetch: waiting for ext thread wkup\n");
107
        curlcv.wait(lock);
108
    }
109
    while (!curlworker.joinable()) {
110
        curlcv.wait(lock);
111
    }
112
    curlworker.join();
113
    if (curl) {
114
        curl_easy_cleanup(curl);
115
        curl = nullptr;
116
    }
117
    LOGDEB1("CurlFetch::CurlFetch::~Internal: done\n");
118
}
119
120
bool CurlFetch::start(BufXChange<ABuffer*> *queue, uint64_t offset)
121
{
122
    LOGDEB0("CurlFetch::start\n");
123
    unique_lock<mutex> lock(m->curlmutex);
124
    if (m->curlrunning || m->aborting) {
125
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
126
        return false;
127
    }
128
    m->curldone = false;
129
    if (nullptr == queue) {
130
        LOGERR("CurlFetch::start: called with nullptr\n");
131
        return false;
132
    }
133
    // We return after the curl thread is actually running
134
    m->outqueue = queue;
135
    m->startoffset = offset;
136
    m->curlworker =
137
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
138
    while (!m->curlrunning && !m->curldone && !m->aborting) {
139
        if (m->aborting) {
140
            return false;
141
        }
142
        m->curlcv.wait(lock);
143
    }
144
    return true;
145
}
146
147
bool CurlFetch::curlDone(int *curlcode, long *http_code)
148
{
149
    LOGDEB1("CurlTRans::curlDone: running: " << m->curlrunning << endl);
150
    unique_lock<mutex> lock(m->curlmutex);
151
    if (!m->curldone) {
152
        return false;
153
    }
154
    if (curlcode) {
155
        *curlcode = int(m->curl_code);
156
    }
157
    if (http_code) {
158
        *http_code = m->curl_http_code;
159
    }
160
    return true;
161
}
162
163
bool CurlFetch::waitForHeaders(int secs)
164
{
165
    LOGDEB1("CurlFetch::waitForHeaders\n");
166
    unique_lock<mutex> lock(m->curlmutex);
167
    m->extWaitingThreads++;
168
    // We actually wait for the 1st buffer write call, so that we also
169
    // get an initial status like 404 (we'll wake up on curlrunning==0 in
170
    // this case)
171
    while (m->curlrunning && !m->aborting &&
172
           m->curl_data_count + m->headbuf.bytes== 0) {
173
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning <<
174
               " aborting " << m->aborting << " datacount " <<
175
               m->curl_data_count + m->headbuf.bytes << "\n");
176
        if (m->aborting) {
177
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
178
            m->extWaitingThreads--;
179
            return false;
180
        }
181
        if (secs) {
182
            if (m->curlcv.wait_for(lock, std::chrono::seconds(secs)) ==
183
                std::cv_status::timeout) {
184
                LOGERR("CurlFetch::waitForHeaders: timeout\n");
185
                break;
186
            }
187
        } else {
188
            m->curlcv.wait(lock);
189
        }
190
    }
191
    m->extWaitingThreads--;
192
    m->curlcv.notify_all();
193
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
194
            m->headers_ok << " curlrunning " << m->curlrunning <<
195
            " aborting " << m->aborting << " datacnt " <<
196
            m->curl_data_count+ m->headbuf.bytes << endl);
197
    return m->headers_ok;
198
}
199
200
bool CurlFetch::headerValue(const string& hname, string& val)
201
{
202
    unique_lock<mutex> lock(m->curlmutex);
203
    if (!m->headers_ok) {
204
        LOGERR("CurlFetch::headerValue: called with headers_ok == false\n");
205
        return false;
206
    }
207
    auto it = m->headers.find(hname);
208
    if (it != m->headers.end()) {
209
        val = it->second;
210
    } else {
211
        LOGERR("CurlFetch::headerValue: header " << hname << " not found\n");
212
        return false;
213
    }
214
    return true;
215
}
216
217
static size_t
218
curl_header_cb(void *contents, size_t size, size_t nmemb, void *userp)
219
{
220
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
221
    return me ? me->curlHeaderCB(contents, size, nmemb) : -1;
222
}
223
224
size_t
225
CurlFetch::Internal::curlHeaderCB(void *contents, size_t size, size_t cnt)
226
{
227
    size_t bcnt = size * cnt;
228
    string header((char *)contents, bcnt);
229
    trimstring(header, " \t\r\n");
230
    LOGDEB1("CurlFetch::curlHeaderCB: header: [" << header << "]\n");
231
    unique_lock<mutex> lock(curlmutex);
232
    if (header.empty()) {
233
        // End of headers
234
        LOGDEB1("CurlFetch::curlHeaderCB: wake them up\n");
235
        headers_ok = true;
236
        curlcv.notify_all();
237
    } else {
238
        LOGDEB1("curlHeaderCB: got " << header << endl);
239
        string::size_type colon = header.find(":");
240
        if (string::npos != colon) {
241
            string hname = header.substr(0, colon);
242
            stringtolower(hname);
243
            string val = header.substr(colon+1);
244
            trimstring(val);
245
            headers[hname] = val;
246
        }
247
    }
248
    return bcnt;
249
}
250
251
static int
252
curl_sockopt_cb(void *userp, curl_socket_t curlfd, curlsocktype purpose)
253
{
254
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
255
    return me ? me->curlSockoptCB(curlfd, purpose) : -1;
256
}
257
258
int CurlFetch::Internal::curlSockoptCB(curl_socket_t cfd, curlsocktype)
259
{
260
    unique_lock<mutex> lock(curlmutex);
261
    curlfd = cfd;
262
    return CURL_SOCKOPT_OK;
263
}
264
265
// This is always called with the lock held
266
size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
267
{
268
    LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
269
    
270
    ABuffer *buf = nullptr;
271
    // Try to recover an empty buffer from the queue, else allocate one.
272
    if (outqueue && outqueue->take_recycled(&buf)) {
273
        if (buf->allocbytes < bcnt) {
274
            delete buf;
275
            buf = nullptr;
276
        }
277
    }
278
    if (buf == nullptr) {
279
        buf = new ABuffer(MAX(4096, bcnt));
280
    }
281
    if (buf == nullptr) {
282
        LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
283
               " bytes\n");
284
        return 0;
285
    }
286
    memcpy(buf->buf, contents, bcnt);
287
    buf->bytes = bcnt;
288
    buf->curoffs = 0;
289
290
    LOGDEB2("CurlFetch::calling put on " <<
291
            (outqueue ? outqueue->getname() : "null") << endl);
292
    
293
    if (!outqueue->put(buf)) {
294
        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
295
        delete buf;
296
        return -1;
297
    }
298
299
    bool first = (curl_data_count == 0);
300
    curl_data_count += bcnt;
301
    if (first) {
302
        curlcv.notify_all();
303
    }
304
    if (fbcb) {
305
        fbcb(curl_data_count);
306
    }
307
    LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
308
    return bcnt;
309
}
310
311
static size_t
312
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
313
{
314
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
315
    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
316
}
317
318
#undef DUMP_CONTENTS
319
#ifdef DUMP_CONTENTS
320
#include "listmem.h"
321
#endif
322
323
size_t CurlFetch::Internal::curlWriteCB(void *contents, size_t size, size_t cnt)
324
{
325
    size_t bcnt = size * cnt;
326
327
#ifdef DUMP_CONTENTS
328
    LOGDEB("CurlWriteCB: bcnt " << bcnt << " headbuf.bytes " <<
329
           headbuf.bytes << endl);
330
    listmem(cerr, contents, MIN(bcnt, 128));
331
#endif
332
333
    unique_lock<mutex> lock(curlmutex);
334
    if (curl_data_count == 0 && headbuf.bytes < 1024) {
335
        if (!headbuf.append((const char *)contents, bcnt)) {
336
            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
337
            curlcv.notify_all();
338
            return -1;
339
        } else {
340
            curlcv.notify_all();
341
            return bcnt;
342
        }
343
    }
344
    
345
    if (curl_data_count == 0 && buf1cb) {
346
        string sbuf;
347
        if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
348
            return -1;
349
        }
350
        if (sbuf.size()) {
351
            if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
352
                return -1;
353
            }
354
        }
355
    }
356
    
357
    if (headbuf.bytes) {
358
        databufToQ(headbuf.buf, headbuf.bytes);
359
        headbuf.bytes = 0;
360
    }
361
    return databufToQ(contents, bcnt);
362
}
363
364
static int debug_callback(CURL *curl,
365
                          curl_infotype type,
366
                          char *data,
367
                          size_t size,
368
                          void *userptr)
369
{
370
    string dt(data, size);
371
    string tt;
372
    switch (type) {
373
    case CURLINFO_TEXT: tt = "== Info"; break;
374
    default: tt = " ??? "; break;
375
    case CURLINFO_HEADER_OUT: tt = "=> Send header"; break;
376
    case CURLINFO_DATA_OUT: tt = "=> Send data"; break;
377
    case CURLINFO_SSL_DATA_OUT: tt = "=> Send SSL data"; break;
378
    case CURLINFO_HEADER_IN: tt = "<= Recv header"; break;
379
    case CURLINFO_DATA_IN:
380
        //LOGDEB("CURL: <= Recv data. cnt: " << size << endl);
381
        //listmem(cerr, data, 16);
382
        return 0;
383
    }
384
    LOGDEB("---CURL: " << tt << " " << dt);
385
    return 0; 
386
}
387
388
void CurlFetch::Internal::curlWorkerFunc()
389
{
390
    LOGDEB1("CurlFetch::curlWorkerFunc\n");
391
    (void)debug_callback;
392
    
393
    {unique_lock<mutex> lock(curlmutex);
394
        curlrunning = true;
395
    }
396
    // Tell the world we're active (start is waiting for this).
397
    curlcv.notify_all();
398
    
399
    if (!curl) {
400
        curl = curl_easy_init();
401
        if(!curl) {
402
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
403
            {unique_lock<mutex> lock(curlmutex);
404
                curlrunning = false;
405
                curldone = true;
406
            }
407
            if (outqueue) {
408
                outqueue->setTerminate();
409
            }
410
            curlcv.notify_all();
411
            return;
412
        }
413
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
414
        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
415
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb);
416
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
417
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, curl_header_cb);
418
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
419
        curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, curl_sockopt_cb);
420
        curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, this);
421
422
        curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5);
423
        // Speedlimit is in bytes/S. 32Kbits/S
424
        curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 4L);
425
        curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 60);
426
427
        //curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
428
        //curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debug_callback);
429
430
        // Chunk decoding: this is the default
431
        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
432
    }
433
    
434
    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
435
    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
436
    if (startoffset) {
437
        char range[32];
438
        sprintf(range, "%llu-", (unsigned long long)startoffset);
439
        curl_easy_setopt(curl, CURLOPT_RANGE, range);
440
    }
441
    if (timeoutsecs) {
442
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
443
    }
444
445
    curl_code = curl_easy_perform(curl);
446
    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
447
448
    bool http_ok = false;
449
    if (curl_code == CURLE_OK) {
450
        curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &curl_http_code);
451
        http_ok = curl_http_code >= 200 && curl_http_code < 300;
452
    }
453
454
    {unique_lock<mutex> lock(curlmutex);
455
        if (aborting) {
456
            return;
457
        }
458
        if (headbuf.bytes) {
459
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
460
                   headbuf.bytes << " bytes\n");
461
            databufToQ(headbuf.buf, headbuf.bytes);
462
            headbuf.bytes = 0;
463
        }
464
        curlfd = -1;
465
        curlrunning = false;
466
        curldone = true;
467
        curlcv.notify_all();
468
    }
469
470
    if (curl_code != CURLE_OK || !http_ok) {
471
        if (curl_code != CURLE_OK) {
472
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_perform(): " <<
473
                   curl_easy_strerror(curl_code) << endl);
474
        } else {
475
            LOGDEB("CurlFetch::curlWorkerFunc: curl_easy_perform(): http code: "
476
                   << curl_http_code << endl);
477
        }
478
    }
479
480
    if (curl_code == CURLE_OK) {
481
        // Wake up other side with empty buffer (eof)
482
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
483
        ABuffer *buf = new ABuffer(0);
484
        if (!outqueue || !outqueue->put(buf)) {
485
            delete buf;
486
        }
487
        if (outqueue) {
488
            outqueue->waitIdle();
489
        }
490
    }
491
    if (eofcb) {
492
        eofcb(curl_code == CURLE_OK, curl_data_count);
493
    }
494
    LOGDEB1("CurlFetch::curlworker: done\n");
495
    return;
496
}
497
498
void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
499
{
500
    m->eofcb = eofcb;
501
}
502
void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
503
{
504
    m->fbcb = fbcb;
505
}
506
void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
507
{
508
    m->buf1cb = func;
509
}
510
511
void CurlFetch::setTimeout(int secs)
512
{
513
    m->timeoutsecs = secs;
514
}
515