Switch to unified view

a/src/mediaserver/cdplugins/curlfetch.cpp b/src/mediaserver/cdplugins/curlfetch.cpp
...
...
29
static CurlInit curlglobalinit;
29
static CurlInit curlglobalinit;
30
30
31
31
32
class CurlFetch::Internal {
32
class CurlFetch::Internal {
33
public:
33
public:
34
    Internal(const string& _url) : url(_url) {}
34
    Internal(CurlFetch* parent)
35
        : p(parent) {}
35
    ~Internal();
36
    ~Internal();
36
    bool curlrunning() {
37
    bool curlrunning() {
37
        return curlworker.joinable();
38
        return curlworker.joinable();
38
    }
39
    }
39
40
40
    void curlWorkerFunc();
41
    void curlWorkerFunc();
41
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
42
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
42
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
43
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
43
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
44
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
44
    size_t databufToQ(const void *contents, size_t size);
45
45
    
46
    CurlFetch *p{nullptr};
46
    string url;
47
    uint64_t startoffset;
48
    int timeoutsecs{0};
49
    CURL *curl{nullptr};
47
    CURL *curl{nullptr};
50
    // The socket is used to kill any waiting by curl when we want to abort
48
    // The socket is used to kill any waiting by curl when we want to abort
51
    curl_socket_t curlfd{-1};
49
    curl_socket_t curlfd{-1};
52
    u_int64_t curl_data_count{0};
53
    std::thread curlworker;
50
    std::thread curlworker;
54
    bool curldone{false};
51
    bool curldone{false};
55
    CURLcode  curl_code{CURLE_OK};
52
    CURLcode  curl_code{CURLE_OK};
56
    int curl_http_code{200};
53
    int curl_http_code{200};
57
54
...
...
63
60
64
    // Header values if we get them
61
    // Header values if we get them
65
    bool headers_ok{false};
62
    bool headers_ok{false};
66
    map<string, string> headers;
63
    map<string, string> headers;
67
64
68
    BufXChange<ABuffer*> *outqueue{nullptr};
69
70
    // We pre-buffer the beginning of the stream so that the first
65
    // We pre-buffer the beginning of the stream so that the first
71
    // block we actually release is always big enough for header
66
    // block we actually release is always big enough for header
72
    // forensics.
67
    // forensics.
73
    ABuffer headbuf{1024};
68
    ABuffer headbuf{1024};
74
    
69
    
75
    // Synchronization
70
    // Synchronization
76
    condition_variable curlcv;
71
    condition_variable curlcv;
77
    mutex curlmutex;
72
    mutex curlmutex;
78
79
    // User callbacks
80
    function<void(bool, u_int64_t)> eofcb;
81
    function<void(u_int64_t)> fbcb;
82
    function<bool(string&,void *,int)> buf1cb;
83
};
73
};
84
74
85
CurlFetch::CurlFetch(const std::string& url)
75
CurlFetch::CurlFetch(const std::string& url)
76
    : NetFetch(url)
86
{
77
{
87
    m = std::unique_ptr<Internal>(new Internal(url));
78
    m = std::unique_ptr<Internal>(new Internal(this));
88
}
79
}
89
80
90
CurlFetch::~CurlFetch()
81
CurlFetch::~CurlFetch()
91
{
82
{
92
}
93
const string& CurlFetch::url()
94
{
95
    return m->url;
96
}
83
}
97
84
98
CurlFetch::Internal::~Internal()
85
CurlFetch::Internal::~Internal()
99
{
86
{
100
    LOGDEB1("CurlFetch::Internal::~Internal\n");
87
    LOGDEB1("CurlFetch::Internal::~Internal\n");
...
...
102
    aborting = true;
89
    aborting = true;
103
    if (curlfd >= 0) {
90
    if (curlfd >= 0) {
104
        close(curlfd);
91
        close(curlfd);
105
        curlfd = -1;
92
        curlfd = -1;
106
    }
93
    }
107
    if (outqueue) {
94
    if (p->outqueue) {
108
        outqueue->setTerminate();
95
        p->outqueue->setTerminate();
109
    }
96
    }
110
    curlcv.notify_all();
97
    curlcv.notify_all();
111
    while (extWaitingThreads > 0) {
98
    while (extWaitingThreads > 0) {
112
        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
99
        LOGDEB1("CurlFetch::~CurlFetch: extWaitingThreads: " <<
113
                extWaitingThreads << endl);
100
                extWaitingThreads << endl);
...
...
138
    if (m->curlrunning() || m->aborting) {
125
    if (m->curlrunning() || m->aborting) {
139
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
126
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
140
        return false;
127
        return false;
141
    }
128
    }
142
    // We return after the curl thread is actually running
129
    // We return after the curl thread is actually running
143
    m->outqueue = queue;
130
    outqueue = queue;
144
    m->startoffset = offset;
131
    startoffset = offset;
145
    m->curlworker =
132
    m->curlworker =
146
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
133
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
147
    while (!(m->curlrunning() || m->curldone || m->aborting)) {
134
    while (!(m->curlrunning() || m->curldone || m->aborting)) {
148
        LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
135
        LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
149
               m->curldone << " aborting " << m->aborting << endl);
136
               m->curldone << " aborting " << m->aborting << endl);
...
...
154
    }
141
    }
155
    LOGDEB1("CurlFetch::start: returning\n");
142
    LOGDEB1("CurlFetch::start: returning\n");
156
    return true;
143
    return true;
157
}
144
}
158
145
159
void  CurlFetch::reset()
146
bool CurlFetch::reset()
160
{
147
{
161
    if (m->curlworker.joinable()) {
148
    if (m->curlworker.joinable()) {
162
        m->curlworker.join();
149
        m->curlworker.join();
163
    }
150
    }
164
    m->curldone = false;
151
    m->curldone = false;
165
    m->curl_code = CURLE_OK;
152
    m->curl_code = CURLE_OK;
166
    m->curl_http_code = 200;
153
    m->curl_http_code = 200;
167
    m->curl_data_count = 0;
154
    fetch_data_count = 0;
168
    m->outqueue->reset();
155
    outqueue->reset();
156
    return true;
169
}
157
}
170
158
171
bool CurlFetch::curlDone(int *curlcode, int *http_code)
159
bool CurlFetch::fetchDone(FetchStatus *code, int *http_code)
172
{
160
{
173
    LOGDEB1("CurlFetch::curlDone: running: " << m->curlrunning() <<
161
    LOGDEB1("CurlFetch::fetchDone: running: " << m->curlrunning() <<
174
           " curldone " << m->curldone << endl);
162
           " curldone " << m->curldone << endl);
175
    unique_lock<mutex> lock(m->curlmutex);
163
    unique_lock<mutex> lock(m->curlmutex);
176
    if (!m->curldone) {
164
    if (!m->curldone) {
177
        return false;
165
        return false;
178
    }
166
    }
179
    LOGDEB1("CurlFetch::curlDone: curlcode " << m->curl_code << " httpcode " <<
167
    LOGDEB1("CurlFetch::fetchDone: curlcode " << m->curl_code << " httpcode " <<
180
           m->curl_http_code << endl);
168
           m->curl_http_code << endl);
181
    if (curlcode) {
169
    if (code) {
182
        *curlcode = int(m->curl_code);
170
        switch (m->curl_code) {
171
        case CURLE_PARTIAL_FILE:
172
        case CURLE_RECV_ERROR:
173
            *code = NetFetch::FETCH_RETRYABLE;
174
            break;
175
        case CURLE_OK:
176
            *code = NetFetch::FETCH_OK;
177
            break;
178
        default:
179
            *code = NetFetch::FETCH_FATAL;
180
            break;
181
        }
183
    }
182
    }
184
    if (http_code) {
183
    if (http_code) {
185
        *http_code = m->curl_http_code;
184
        *http_code = m->curl_http_code;
186
    }
185
    }
187
    LOGDEB1("CurlTRans::curlDone: done\n");
186
    LOGDEB1("CurlTRans::fetchDone: done\n");
188
    return true;
187
    return true;
189
}
188
}
190
189
191
bool CurlFetch::waitForHeaders(int secs)
190
bool CurlFetch::waitForHeaders(int secs)
192
{
191
{
...
...
194
    unique_lock<mutex> lock(m->curlmutex);
193
    unique_lock<mutex> lock(m->curlmutex);
195
    m->extWaitingThreads++;
194
    m->extWaitingThreads++;
196
    // We wait for the 1st buffer write call. If there is no data,
195
    // We wait for the 1st buffer write call. If there is no data,
197
    // we'll stop on curldone.
196
    // we'll stop on curldone.
198
    while (m->curlrunning() && !m->aborting && !m->curldone && 
197
    while (m->curlrunning() && !m->aborting && !m->curldone && 
199
           m->curl_data_count + m->headbuf.bytes == 0) {
198
           fetch_data_count + m->headbuf.bytes == 0) {
200
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
199
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
201
               " aborting " << m->aborting << " datacount " <<
200
               " aborting " << m->aborting << " datacount " <<
202
               m->curl_data_count + m->headbuf.bytes << "\n");
201
               fetch_data_count + m->headbuf.bytes << "\n");
203
        if (m->aborting) {
202
        if (m->aborting) {
204
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
203
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
205
            m->extWaitingThreads--;
204
            m->extWaitingThreads--;
206
            return false;
205
            return false;
207
        }
206
        }
...
...
218
    m->extWaitingThreads--;
217
    m->extWaitingThreads--;
219
    m->curlcv.notify_all();
218
    m->curlcv.notify_all();
220
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
219
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
221
            m->headers_ok << " curlrunning " << m->curlrunning() <<
220
            m->headers_ok << " curlrunning " << m->curlrunning() <<
222
            " aborting " << m->aborting << " datacnt " <<
221
            " aborting " << m->aborting << " datacnt " <<
223
            m->curl_data_count+ m->headbuf.bytes << endl);
222
            fetch_data_count+ m->headbuf.bytes << endl);
224
    return m->headers_ok;
223
    return m->headers_ok;
225
}
224
}
226
225
227
bool CurlFetch::headerValue(const string& hname, string& val)
226
bool CurlFetch::headerValue(const string& hname, string& val)
228
{
227
{
...
...
287
    unique_lock<mutex> lock(curlmutex);
286
    unique_lock<mutex> lock(curlmutex);
288
    curlfd = cfd;
287
    curlfd = cfd;
289
    return CURL_SOCKOPT_OK;
288
    return CURL_SOCKOPT_OK;
290
}
289
}
291
290
292
// This is always called with the lock held
293
size_t CurlFetch::Internal::databufToQ(const void *contents, size_t bcnt)
294
{
295
    LOGDEB1("CurlFetch::dataBufToQ. bcnt " << bcnt << endl);
296
    
297
    ABuffer *buf = nullptr;
298
    // Try to recover an empty buffer from the queue, else allocate one.
299
    if (outqueue && outqueue->take_recycled(&buf)) {
300
        if (buf->allocbytes < bcnt) {
301
            delete buf;
302
            buf = nullptr;
303
        }
304
    }
305
    if (buf == nullptr) {
306
        buf = new ABuffer(MAX(4096, bcnt));
307
    }
308
    if (buf == nullptr) {
309
        LOGERR("CurlFetch::dataBufToQ: can't get buffer for " << bcnt <<
310
               " bytes\n");
311
        return 0;
312
    }
313
    memcpy(buf->buf, contents, bcnt);
314
    buf->bytes = bcnt;
315
    buf->curoffs = 0;
316
317
    LOGDEB1("CurlFetch::calling put on " <<
318
            (outqueue ? outqueue->getname() : "null") << endl);
319
    
320
    if (!outqueue->put(buf)) {
321
        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
322
        delete buf;
323
        return -1;
324
    }
325
326
    bool first = (curl_data_count == 0);
327
    curl_data_count += bcnt;
328
    if (first) {
329
        curlcv.notify_all();
330
    }
331
    if (fbcb) {
332
        fbcb(curl_data_count);
333
    }
334
    LOGDEB1("CurlFetch::dataBufToQ. returning " << bcnt << endl);
335
    return bcnt;
336
}
337
338
static size_t
291
static size_t
339
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
292
curl_write_cb(void *contents, size_t size, size_t nmemb, void *userp)
340
{
293
{
341
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
294
    CurlFetch::Internal *me = (CurlFetch::Internal *)userp;
342
    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
295
    return me ? me->curlWriteCB(contents, size, nmemb) : -1;
...
...
356
           headbuf.bytes << endl);
309
           headbuf.bytes << endl);
357
    listmem(cerr, contents, MIN(bcnt, 128));
310
    listmem(cerr, contents, MIN(bcnt, 128));
358
#endif
311
#endif
359
312
360
    unique_lock<mutex> lock(curlmutex);
313
    unique_lock<mutex> lock(curlmutex);
361
    if (curl_data_count == 0 && headbuf.bytes < 1024) {
314
    if (p->datacount() == 0 && headbuf.bytes < 1024) {
362
        if (!headbuf.append((const char *)contents, bcnt)) {
315
        if (!headbuf.append((const char *)contents, bcnt)) {
363
            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
316
            LOGERR("CurlFetch::curlWriteCB: buf append failed\n");
364
            curlcv.notify_all();
317
            curlcv.notify_all();
365
            return -1;
318
            return -1;
366
        } else {
319
        } else {
367
            curlcv.notify_all();
320
            curlcv.notify_all();
368
            return bcnt;
321
            return bcnt;
369
        }
322
        }
370
    }
323
    }
371
    
324
    
372
    if (curl_data_count == 0 && buf1cb) {
325
    if (p->datacount() == 0 && p->buf1cb) {
373
        string sbuf;
326
        string sbuf;
374
        if (!buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
327
        if (!p->buf1cb(sbuf, headbuf.buf, headbuf.bytes)) {
375
            return -1;
328
            return -1;
376
        }
329
        }
377
        if (sbuf.size()) {
330
        if (sbuf.size()) {
331
            curlcv.notify_all();
378
            if (databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
332
            if (p->databufToQ(sbuf.c_str(), sbuf.size()) < 0) {
379
                return -1;
333
                return -1;
380
            }
334
            }
381
        }
335
        }
382
    }
336
    }
383
    
337
    
384
    if (headbuf.bytes) {
338
    if (headbuf.bytes) {
339
        if (p->datacount() == 0) {
340
            curlcv.notify_all();
341
        }
385
        databufToQ(headbuf.buf, headbuf.bytes);
342
        p->databufToQ(headbuf.buf, headbuf.bytes);
386
        headbuf.bytes = 0;
343
        headbuf.bytes = 0;
387
    }
344
    }
345
    if (p->datacount() == 0) {
346
        curlcv.notify_all();
347
    }
388
    return databufToQ(contents, bcnt);
348
    return p->databufToQ(contents, bcnt);
389
}
349
}
390
350
391
static int debug_callback(CURL *curl,
351
static int debug_callback(CURL *curl,
392
                          curl_infotype type,
352
                          curl_infotype type,
393
                          char *data,
353
                          char *data,
...
...
427
        if(!curl) {
387
        if(!curl) {
428
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
388
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
429
            {unique_lock<mutex> lock(curlmutex);
389
            {unique_lock<mutex> lock(curlmutex);
430
                curldone = true;
390
                curldone = true;
431
            }
391
            }
432
            if (outqueue) {
392
            if (p->outqueue) {
433
                outqueue->setTerminate();
393
                p->outqueue->setTerminate();
434
            }
394
            }
435
            curlcv.notify_all();
395
            curlcv.notify_all();
436
            return;
396
            return;
437
        }
397
        }
438
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
398
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
...
...
455
        // Chunk decoding: this is the default
415
        // Chunk decoding: this is the default
456
        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
416
        //curl_easy_setopt(curl, CURLOPT_HTTP_TRANSFER_DECODING, 1L);
457
    }
417
    }
458
    
418
    
459
    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
419
    LOGDEB1("CurlFetch::curlWorker: fetching " << url << endl);
460
    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
420
    curl_easy_setopt(curl, CURLOPT_URL, p->_url.c_str());
461
    if (startoffset) {
421
    if (p->startoffset) {
462
        char range[32];
422
        char range[32];
463
        sprintf(range, "%llu-", (unsigned long long)startoffset);
423
        sprintf(range, "%llu-", (unsigned long long)p->startoffset);
464
        curl_easy_setopt(curl, CURLOPT_RANGE, range);
424
        curl_easy_setopt(curl, CURLOPT_RANGE, range);
465
    }
425
    }
466
    if (timeoutsecs) {
426
    if (p->timeoutsecs) {
467
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutsecs);
427
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, p->timeoutsecs);
468
    }
428
    }
469
429
470
    curl_code = curl_easy_perform(curl);
430
    curl_code = curl_easy_perform(curl);
471
    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
431
    LOGDEB1("CurlFetch::curlWorker: curl_easy_perform returned\n");
472
432
...
...
494
            return;
454
            return;
495
        }
455
        }
496
        if (headbuf.bytes) {
456
        if (headbuf.bytes) {
497
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
457
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
498
                   headbuf.bytes << " bytes\n");
458
                   headbuf.bytes << " bytes\n");
459
            curlcv.notify_all();
499
            databufToQ(headbuf.buf, headbuf.bytes);
460
            p->databufToQ(headbuf.buf, headbuf.bytes);
500
            headbuf.bytes = 0;
461
            headbuf.bytes = 0;
501
        }
462
        }
502
        curlfd = -1;
463
        curlfd = -1;
503
        curldone = true;
464
        curldone = true;
504
        curlcv.notify_all();
465
        curlcv.notify_all();
...
...
507
    // Normal eos
468
    // Normal eos
508
    if (curl_code == CURLE_OK) {
469
    if (curl_code == CURLE_OK) {
509
        // Wake up other side with empty buffer (eof)
470
        // Wake up other side with empty buffer (eof)
510
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
471
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
511
        ABuffer *buf = new ABuffer(0);
472
        ABuffer *buf = new ABuffer(0);
512
        if (!outqueue || !outqueue->put(buf)) {
473
        if (!p->outqueue || !p->outqueue->put(buf)) {
513
            delete buf;
474
            delete buf;
514
        }
475
        }
515
        if (outqueue) {
476
        if (p->outqueue) {
516
            // Wait for our zero buffer to be acknowledged before
477
            // Wait for our zero buffer to be acknowledged before
517
            // killing the queue
478
            // killing the queue
518
            LOGDEB1("CurlFetch::curlworker: waitidle\n");
479
            LOGDEB1("CurlFetch::curlworker: waitidle\n");
519
            outqueue->waitIdle();
480
            p->outqueue->waitIdle();
520
        }
521
    }
481
        }
482
    }
522
    outqueue->setTerminate();
483
    p->outqueue->setTerminate();
523
    if (eofcb) {
484
    if (p->eofcb) {
524
        eofcb(curl_code == CURLE_OK, curl_data_count);
485
        p->eofcb(curl_code == CURLE_OK, p->datacount());
525
    }
486
    }
526
    LOGDEB1("CurlFetch::curlworker: done\n");
487
    LOGDEB1("CurlFetch::curlworker: done\n");
527
    return;
488
    return;
528
}
489
}
529
530
void CurlFetch::setEOFetchCB(std::function<void(bool ok, u_int64_t count)> eofcb)
531
{
532
    m->eofcb = eofcb;
533
}
534
void CurlFetch::setFetchBytesCB(std::function<void(u_int64_t count)> fbcb)
535
{
536
    m->fbcb = fbcb;
537
}
538
void CurlFetch::setBuf1GenCB(std::function<bool(string&,void*,int)> func)
539
{
540
    m->buf1cb = func;
541
}
542
543
void CurlFetch::setTimeout(int secs)
544
{
545
    m->timeoutsecs = secs;
546
}
547