Switch to unified view

a/src/mediaserver/cdplugins/curlfetch.cpp b/src/mediaserver/cdplugins/curlfetch.cpp
...
...
29
static CurlInit curlglobalinit;
29
static CurlInit curlglobalinit;
30
30
31
31
32
class CurlFetch::Internal {
32
class CurlFetch::Internal {
33
public:
33
public:
34
    Internal(const string& _url) : url(_url) {}
34
    Internal(CurlFetch* parent)
35
        : p(parent) {}
35
    ~Internal();
36
    ~Internal();
36
    bool curlrunning() {
37
    bool curlrunning() {
37
        return curlworker.joinable();
38
        return curlworker.joinable();
38
    }
39
    }
39
40
40
    void curlWorkerFunc();
41
    void curlWorkerFunc();
41
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
42
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
42
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
43
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
43
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
44
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
44
    size_t databufToQ(const void *contents, size_t size);
45
45
    
46
    CurlFetch *p{nullptr};
46
    string url;
47
    uint64_t startoffset;
48
    int timeoutsecs{0};
49
    CURL *curl{nullptr};
47
    CURL *curl{nullptr};
50
    // The socket is used to kill any waiting by curl when we want to abort
48
    // The socket is used to kill any waiting by curl when we want to abort
51
    curl_socket_t curlfd{-1};
49
    curl_socket_t curlfd{-1};
52
    u_int64_t curl_data_count{0};
53
    std::thread curlworker;
50
    std::thread curlworker;
54
    bool curldone{false};
51
    bool curldone{false};
55
    CURLcode  curl_code{CURLE_OK};
52
    CURLcode  curl_code{CURLE_OK};
56
    int curl_http_code{200};
53
    int curl_http_code{200};
57
54
...
...
63
60
64
    // Header values if we get them
61
    // Header values if we get them
65
    bool headers_ok{false};
62
    bool headers_ok{false};
66
    map<string, string> headers;
63
    map<string, string> headers;
67
64
68
    BufXChange<ABuffer*> *outqueue{nullptr};
69
70
    // We pre-buffer the beginning of the stream so that the first
65
    // We pre-buffer the beginning of the stream so that the first
71
    // block we actually release is always big enough for header
66
    // block we actually release is always big enough for header
72
    // forensics.
67
    // forensics.
73
    ABuffer headbuf{1024};
68
    ABuffer headbuf{1024};
74
    
69
    
75
    // Synchronization
70
    // Synchronization
76
    condition_variable curlcv;
71
    condition_variable curlcv;
77
    mutex curlmutex;
72
    mutex curlmutex;
78
79
    // User callbacks
80
    function<void(bool, u_int64_t)> eofcb;
81
    function<void(u_int64_t)> fbcb;
82
    function<bool(string&,void *,int)> buf1cb;
83
};
73
};
84
74
85
CurlFetch::CurlFetch(const std::string& url)
75
CurlFetch::CurlFetch(const std::string& url)
76
    : NetFetch(url)
86
{
77
{
87
    m = std::unique_ptr<Internal>(new Internal(url));
78
    m = std::unique_ptr<Internal>(new Internal(this));
88
}
79
}
89
80
90
CurlFetch::~CurlFetch()
81
CurlFetch::~CurlFetch()
91
{
82
{
92
}
93
const string& CurlFetch::url()
94
{
95
    return m->url;
96
}
83
}
97
84
98
CurlFetch::Internal::~Internal()
85
CurlFetch::Internal::~Internal()
99
{
86
{
100
    LOGDEB1("CurlFetch::Internal::~Internal\n");
87
    LOGDEB1("CurlFetch::Internal::~Internal\n");
...
...
102
    aborting = true;
89
    aborting = true;
103
    if (curlfd >= 0) {
90
    if (curlfd >= 0) {
104
        close(curlfd);
91
        close(curlfd);
105
        curlfd = -1;
92
        curlfd = -1;
106
    }
93
    }
107
    if (outqueue) {
94
    if (p->outqueue) {
108
        outqueue->setTerminate();
95
        p->outqueue->setTerminate();
109
    }
96
    }
110
    curlcv.notify_all();
97
    curlcv.notify_all();
111
    while (extWaitingThreads > 0) {
98
    while (extWaitingThreads > 0) {
112
        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
99
        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
113
                extWaitingThreads << endl);
100
                extWaitingThreads << endl);
...
...
138
    if (m->curlrunning() || m->aborting) {
125
    if (m->curlrunning() || m->aborting) {
139
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
126
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
140
        return false;
127
        return false;
141
    }
128
    }
142
    // We return after the curl thread is actually running
129
    // We return after the curl thread is actually running
143
    m->outqueue = queue;
130
    outqueue = queue;
144
    m->startoffset = offset;
131
    startoffset = offset;
145
    m->curlworker =
132
    m->curlworker =
146
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
133
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
147
    while (!(m->curlrunning() || m->curldone || m->aborting)) {
134
    while (!(m->curlrunning() || m->curldone || m->aborting)) {
148
        LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
135
        LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
149
               m->curldone << " aborting " << m->aborting << endl);
136
               m->curldone << " aborting " << m->aborting << endl);
...
...
154
    }
141
    }
155
    LOGDEB1("CurlFetch::start: returning\n");
142
    LOGDEB1("CurlFetch::start: returning\n");
156
    return true;
143
    return true;
157
}
144
}
158
145
159
void  CurlFetch::reset()
146
bool CurlFetch::reset()
160
{
147
{
161
    if (m->curlworker.joinable()) {
148
    if (m->curlworker.joinable()) {
162
        m->curlworker.join();
149
        m->curlworker.join();
163
    }
150
    }
164
    m->curldone = false;
151
    m->curldone = false;
165
    m->curl_code = CURLE_OK;
152
    m->curl_code = CURLE_OK;
166
    m->curl_http_code = 200;
153
    m->curl_http_code = 200;
167
    m->curl_data_count = 0;
154
    fetch_data_count = 0;
168
    m->outqueue->reset();
155
    outqueue->reset();
156
    return true;
169
}
157
}
170
158
171
bool CurlFetch::fetchDone(FetchStatus *code, int *http_code)
159
bool CurlFetch::fetchDone(FetchStatus *code, int *http_code)
172
{
160
{
173
    LOGDEB1("CurlFetch::fetchDone: running: " << m->curlrunning() <<
161
    LOGDEB1("CurlFetch::fetchDone: running: " << m->curlrunning() <<
...
...
205
    unique_lock<mutex> lock(m->curlmutex);
193
    unique_lock<mutex> lock(m->curlmutex);
206
    m->extWaitingThreads++;
194
    m->extWaitingThreads++;
207
    // We wait for the 1st buffer write call. If there is no data,
195
    // We wait for the 1st buffer write call. If there is no data,
208
    // we'll stop on curldone.
196
    // we'll stop on curldone.
209
    while (m->curlrunning() && !m->aborting && !m->curldone && 
197
    while (m->curlrunning() && !m->aborting && !m->curldone && 
210
           m->curl_data_count + m->headbuf.bytes == 0) {
198
           fetch_data_count + m->headbuf.bytes == 0) {
211
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
199
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
212
               " aborting " << m->aborting << " datacount " <<
200
               " aborting " << m->aborting << " datacount " <<
213
               m->curl_data_count + m->headbuf.bytes << "\n");
201
               fetch_data_count + m->headbuf.bytes << "\n");
214
        if (m->aborting) {
202
        if (m->aborting) {
215
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
203
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
216
            m->extWaitingThreads--;
204
            m->extWaitingThreads--;
217
            return false;
205
            return false;
218
        }
206
        }
...
...
229
    m->extWaitingThreads--;
217
    m->extWaitingThreads--;
230
    m->curlcv.notify_all();
218
    m->curlcv.notify_all();
231
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
219
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
232
            m->headers_ok << " curlrunning " << m->curlrunning() <<
220
            m->headers_ok << " curlrunning " << m->curlrunning() <<
233
            " aborting " << m->aborting << " datacnt " <<
221
            " aborting " << m->aborting << " datacnt " <<
234
            m->curl_data_count+ m->headbuf.bytes << endl);
222
            fetch_data_count+ m->headbuf.bytes << endl);
235
    return m->headers_ok;
223
    return m->headers_ok;
236
}
224
}
237
225
238
bool CurlFetch::headerValue(const string& hname, string& val)
226
bool CurlFetch::headerValue(const string& hname, string& val)
239
{
227
{
...
...
298
    unique_lock<mutex> lock(curlmutex);
286
    unique_lock<mutex> lock(curlmutex);
299
    curlfd = cfd;
287
    curlfd = cfd;
300
    return CURL_SOCKOPT_OK;
288
    return CURL_SOCKOPT_OK;
301
}
289
}
302
290
303
// This is always called with the lock held
304
size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
305
{
306
    LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
307
    
308
    ABuffer *buf = nullptr;
309
    // Try to recover an empty buffer from the queue, else allocate one.
310
    if (outqueue && outqueue->take_recycled(&buf)) {
311
        if (buf->allocbytes < bcnt) {
312
            delete buf;
313
            buf = nullptr;
314
        }
315
    }
316
    if (buf == nullptr) {
317
        buf = new ABuffer(MAX(4096, bcnt));
318
    }
319
    if (buf == nullptr) {
320
        LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
321
               " bytes\n");
322
        return 0;
323
    }
324
    memcpy(buf->buf, contents, bcnt);
325
    buf->bytes = bcnt;
326
    buf->curoffs = 0;
327
328
    LOGDEB1("CurlFetch::calling put on " <<
329
            (outqueue ? outqueue->getname() : "null") << endl);
330
    
331
    if (!outqueue->put(buf)) {
332
        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
333
        delete buf;
334
        return -1;
335
    }
336
337
    bool first = (curl_data_count == 0);
338
    curl_data_count += bcnt;
339
    if (first) {
340
        curlcv.notify_all();
341
    }
342
    if (fbcb) {
343
        fbcb(curl_data_count);
344
    }
345
    LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
346
    return bcnt;
347
}
348
349
static size_t
291
static size_t
350
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
292
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
351
{
293
{
352
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
294
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
353
    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
295
    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
...
...
367
           headbuf.bytes << endl);
309
           headbuf.bytes << endl);
368
    listmem(cerr, contents, MIN(bcnt, 128));
310
    listmem(cerr, contents, MIN(bcnt, 128));
369
#endif
311
#endif
370
312
371
    unique_lock<mutex> lock(curlmutex);
313
    unique_lock<mutex> lock(curlmutex);
372
    if (curl_data_count == 0 && headbuf.bytes < 1024) {
314
    if (p->datacount() == 0 && headbuf.bytes < 1024) {
373
        if (!headbuf.append((const char *)contents, bcnt)) {
315
        if (!headbuf.append((const char *)contents, bcnt)) {
374
            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
316
            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
375
            curlcv.notify_all();
317
            curlcv.notify_all();
376
            return -1;
318
            return -1;
377
        } else {
319
        } else {
378
            curlcv.notify_all();
320
            curlcv.notify_all();
379
            return bcnt;
321
            return bcnt;
380
        }
322
        }
381
    }
323
    }
382
    
324
    
383
    if (curl_data_count == 0 && buf1cb) {
325
    if (p->datacount() == 0 && p->buf1cb) {
384
        string sbuf;
326
        string sbuf;
385
        if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
327
        if (!p->buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
386
            return -1;
328
            return -1;
387
        }
329
        }
388
        if (sbuf.size()) {
330
        if (sbuf.size()) {
331
            curlcv.notify_all();
389
            if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
332
            if (p->databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
390
                return -1;
333
                return -1;
391
            }
334
            }
392
        }
335
        }
393
    }
336
    }
394
    
337
    
395
    if (headbuf.bytes) {
338
    if (headbuf.bytes) {
339
        if (p->datacount() == 0) {
340
            curlcv.notify_all();
341
        }
396
        databufToQ(headbuf.buf, headbuf.bytes);
342
        p->databufToQ(headbuf.buf, headbuf.bytes);
397
        headbuf.bytes = 0;
343
        headbuf.bytes = 0;
398
    }
344
    }
345
    if (p->datacount() == 0) {
346
        curlcv.notify_all();
347
    }
399
    return databufToQ(contents, bcnt);
348
    return p->databufToQ(contents, bcnt);
400
}
349
}
401
350
402
static int debug_callback(CURL *curl,
351
static int debug_callback(CURL *curl,
403
                          curl_infotype type,
352
                          curl_infotype type,
404
                          char *data,
353
                          char *data,
...
...
438
        if(!curl) {
387
        if(!curl) {
439
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
388
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
440
            {unique_lock<mutex> lock(curlmutex);
389
            {unique_lock<mutex> lock(curlmutex);
441
                curldone = true;
390
                curldone = true;
442
            }
391
            }
443
            if (outqueue) {
392
            if (p->outqueue) {
444
                outqueue->setTerminate();
393
                p->outqueue->setTerminate();
445
            }
394
            }
446
            curlcv.notify_all();
395
            curlcv.notify_all();
447
            return;
396
            return;
448
        }
397
        }
449
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
398
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
...
...
466
        // Chunk decoding: this is the default
415
        // Chunk decoding: this is the default
467
        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
416
        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
468
    }
417
    }
469
    
418
    
470
    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
419
    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
471
    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
420
    curl_easy_setopt(curl, CURLOPT_URL, p->_url.c_str());
472
    if (startoffset) {
421
    if (p->startoffset) {
473
        char range[32];
422
        char range[32];
474
        sprintf(range, "%llu-", (unsigned long long)startoffset);
423
        sprintf(range, "%llu-", (unsigned long long)p->startoffset);
475
        curl_easy_setopt(curl, CURLOPT_RANGE, range);
424
        curl_easy_setopt(curl, CURLOPT_RANGE, range);
476
    }
425
    }
477
    if (timeoutsecs) {
426
    if (p->timeoutsecs) {
478
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
427
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, p->timeoutsecs);
479
    }
428
    }
480
429
481
    curl_code = curl_easy_perform(curl);
430
    curl_code = curl_easy_perform(curl);
482
    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
431
    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
483
432
...
...
505
            return;
454
            return;
506
        }
455
        }
507
        if (headbuf.bytes) {
456
        if (headbuf.bytes) {
508
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
457
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
509
                   headbuf.bytes << " bytes\n");
458
                   headbuf.bytes << " bytes\n");
459
            curlcv.notify_all();
510
            databufToQ(headbuf.buf, headbuf.bytes);
460
            p->databufToQ(headbuf.buf, headbuf.bytes);
511
            headbuf.bytes = 0;
461
            headbuf.bytes = 0;
512
        }
462
        }
513
        curlfd = -1;
463
        curlfd = -1;
514
        curldone = true;
464
        curldone = true;
515
        curlcv.notify_all();
465
        curlcv.notify_all();
...
...
518
    // Normal eos
468
    // Normal eos
519
    if (curl_code == CURLE_OK) {
469
    if (curl_code == CURLE_OK) {
520
        // Wake up other side with empty buffer (eof)
470
        // Wake up other side with empty buffer (eof)
521
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
471
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
522
        ABuffer *buf = new ABuffer(0);
472
        ABuffer *buf = new ABuffer(0);
523
        if (!outqueue || !outqueue->put(buf)) {
473
        if (!p->outqueue || !p->outqueue->put(buf)) {
524
            delete buf;
474
            delete buf;
525
        }
475
        }
526
        if (outqueue) {
476
        if (p->outqueue) {
527
            // Wait for our zero buffer to be acknowledged before
477
            // Wait for our zero buffer to be acknowledged before
528
            // killing the queue
478
            // killing the queue
529
            LOGDEB1("CurlFetch::curlworker: waitidle\n");
479
            LOGDEB1("CurlFetch::curlworker: waitidle\n");
530
            outqueue->waitIdle();
480
            p->outqueue->waitIdle();
531
        }
532
    }
481
        }
482
    }
533
    outqueue->setTerminate();
483
    p->outqueue->setTerminate();
534
    if (eofcb) {
484
    if (p->eofcb) {
535
        eofcb(curl_code == CURLE_OK, curl_data_count);
485
        p->eofcb(curl_code == CURLE_OK, p->datacount());
536
    }
486
    }
537
    LOGDEB1("CurlFetch::curlworker: done\n");
487
    LOGDEB1("CurlFetch::curlworker: done\n");
538
    return;
488
    return;
539
}
489
}
540
541
void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
542
{
543
    m->eofcb = eofcb;
544
}
545
void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
546
{
547
    m->fbcb = fbcb;
548
}
549
void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
550
{
551
    m->buf1cb = func;
552
}
553
554
void CurlFetch::setTimeout(int secs)
555
{
556
    m->timeoutsecs = secs;
557
}
558