Switch to unified view

a/src/mediaserver/cdplugins/curlfetch.cpp b/src/mediaserver/cdplugins/curlfetch.cpp
...
...
31
31
32
class CurlFetch::Internal {
32
class CurlFetch::Internal {
33
public:
33
public:
34
    Internal(const string& _url) : url(_url) {}
34
    Internal(const string& _url) : url(_url) {}
35
    ~Internal();
35
    ~Internal();
36
    bool curlrunning() {
37
        return curlworker.joinable();
38
    }
36
39
37
    void curlWorkerFunc();
40
    void curlWorkerFunc();
38
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
41
    size_t curlHeaderCB(void *contents, size_t size, size_t nmemb);
39
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
42
    size_t curlWriteCB(void *contents, size_t size, size_t nmemb);
40
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
43
    int curlSockoptCB(curl_socket_t curlfd, curlsocktype purpose);
...
...
42
    
45
    
43
    string url;
46
    string url;
44
    uint64_t startoffset;
47
    uint64_t startoffset;
45
    int timeoutsecs{0};
48
    int timeoutsecs{0};
46
    CURL *curl{nullptr};
49
    CURL *curl{nullptr};
50
    // The socket is used to kill any waiting by curl when we want to abort
47
    curl_socket_t curlfd{-1};
51
    curl_socket_t curlfd{-1};
48
    u_int64_t curl_data_count{0};
52
    u_int64_t curl_data_count{0};
49
    std::thread curlworker;
53
    std::thread curlworker;
54
    bool curldone{false};
50
    CURLcode  curl_code{CURLE_OK};
55
    CURLcode  curl_code{CURLE_OK};
51
    long curl_http_code{200};
56
    int curl_http_code{200};
52
    
57
53
    // Is the curl operation thread running ?
58
    // In destructor: any waiting loop must abort asap
54
    bool curlrunning{false};
59
    bool aborting{false}; 
55
    bool curldone{false};
56
    bool aborting{false}; // Any waiting loop must abort asap
57
    
60
    
58
    // Count of client threads waiting for headers (normally 0/1)
61
    // Count of client threads waiting for headers (normally 0/1)
59
    int extWaitingThreads{0};
62
    int extWaitingThreads{0};
60
63
61
    // Header values if we get them
64
    // Header values if we get them
...
...
63
    map<string, string> headers;
66
    map<string, string> headers;
64
67
65
    BufXChange<ABuffer*> *outqueue{nullptr};
68
    BufXChange<ABuffer*> *outqueue{nullptr};
66
69
67
    // We pre-buffer the beginning of the stream so that the first
70
    // We pre-buffer the beginning of the stream so that the first
68
    // block we actually release is big enough for header forensics.
71
    // block we actually release is always big enough for header
72
    // forensics.
69
    ABuffer headbuf{1024};
73
    ABuffer headbuf{1024};
70
    
74
    
71
    // Synchronization
75
    // Synchronization
72
    condition_variable curlcv;
76
    condition_variable curlcv;
73
    mutex curlmutex;
77
    mutex curlmutex;
74
78
79
    // User callbacks
75
    function<void(bool, u_int64_t)> eofcb;
80
    function<void(bool, u_int64_t)> eofcb;
76
    function<void(u_int64_t)> fbcb;
81
    function<void(u_int64_t)> fbcb;
77
    function<bool(string&,void *,int)> buf1cb;
82
    function<bool(string&,void *,int)> buf1cb;
78
};
83
};
79
84
...
...
82
    m = std::unique_ptr<Internal>(new Internal(url));
87
    m = std::unique_ptr<Internal>(new Internal(url));
83
}
88
}
84
89
85
CurlFetch::~CurlFetch()
90
CurlFetch::~CurlFetch()
86
{
91
{
92
}
93
const string& CurlFetch::url()
94
{
95
    return m->url;
87
}
96
}
88
97
89
CurlFetch::Internal::~Internal()
98
CurlFetch::Internal::~Internal()
90
{
99
{
91
    LOGDEB1("CurlFetch::Internal::~Internal\n");
100
    LOGDEB1("CurlFetch::Internal::~Internal\n");
...
...
104
                extWaitingThreads << endl);
113
                extWaitingThreads << endl);
105
        curlcv.notify_all();
114
        curlcv.notify_all();
106
        LOGDEB1("CurlFetch::~CurlFetch: waiting for ext thread wkup\n");
115
        LOGDEB1("CurlFetch::~CurlFetch: waiting for ext thread wkup\n");
107
        curlcv.wait(lock);
116
        curlcv.wait(lock);
108
    }
117
    }
109
    while (!curlworker.joinable()) {
118
    if (curlworker.joinable()) {
110
        curlcv.wait(lock);
111
    }
112
    curlworker.join();
119
        curlworker.join();
120
    }
113
    if (curl) {
121
    if (curl) {
114
        curl_easy_cleanup(curl);
122
        curl_easy_cleanup(curl);
115
        curl = nullptr;
123
        curl = nullptr;
116
    }
124
    }
117
    LOGDEB1("CurlFetch::CurlFetch::~Internal: done\n");
125
    LOGDEB1("CurlFetch::CurlFetch::~Internal: done\n");
118
}
126
}
119
127
120
bool CurlFetch::start(BufXChange<ABuffer*> *queue, uint64_t offset)
128
bool CurlFetch::start(BufXChange<ABuffer*> *queue, uint64_t offset)
121
{
129
{
122
    LOGDEB0("CurlFetch::start\n");
130
    LOGDEB1("CurlFetch::start\n");
123
    unique_lock<mutex> lock(m->curlmutex);
124
    if (m->curlrunning || m->aborting) {
125
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
126
        return false;
127
    }
128
    m->curldone = false;
129
    if (nullptr == queue) {
131
    if (nullptr == queue) {
130
        LOGERR("CurlFetch::start: called with nullptr\n");
132
        LOGERR("CurlFetch::start: called with nullptr\n");
133
        return false;
134
    }
135
136
    unique_lock<mutex> lock(m->curlmutex);
137
    LOGDEB1("CurlFetch::start: got lock\n");
138
    if (m->curlrunning() || m->aborting) {
139
        LOGERR("CurlFetch::start: called with transfer active or aborted\n");
131
        return false;
140
        return false;
132
    }
141
    }
133
    // We return after the curl thread is actually running
142
    // We return after the curl thread is actually running
134
    m->outqueue = queue;
143
    m->outqueue = queue;
135
    m->startoffset = offset;
144
    m->startoffset = offset;
136
    m->curlworker =
145
    m->curlworker =
137
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
146
        std::thread(std::bind(&CurlFetch::Internal::curlWorkerFunc, m.get()));
138
    while (!m->curlrunning && !m->curldone && !m->aborting) {
147
    while (!(m->curlrunning() || m->curldone || m->aborting)) {
148
        LOGDEB1("Start: waiting: running " << m->curlrunning() << " done " <<
149
               m->curldone << " aborting " << m->aborting << endl);
139
        if (m->aborting) {
150
        if (m->aborting) {
140
            return false;
151
            return false;
141
        }
152
        }
142
        m->curlcv.wait(lock);
153
        m->curlcv.wait(lock);
143
    }
154
    }
155
    LOGDEB1("CurlFetch::start: returning\n");
144
    return true;
156
    return true;
145
}
157
}
146
158
159
void  CurlFetch::reset()
160
{
161
    if (m->curlworker.joinable()) {
162
        m->curlworker.join();
163
    }
164
    m->curldone = false;
165
    m->curl_code = CURLE_OK;
166
    m->curl_http_code = 200;
167
    m->curl_data_count = 0;
168
    m->outqueue->reset();
169
}
170
147
bool CurlFetch::curlDone(int *curlcode, long *http_code)
171
bool CurlFetch::curlDone(int *curlcode, int *http_code)
148
{
172
{
149
    LOGDEB1("CurlTRans::curlDone: running: " << m->curlrunning << endl);
173
    LOGDEB1("CurlFetch::curlDone: running: " << m->curlrunning() <<
174
           " curldone " << m->curldone << endl);
150
    unique_lock<mutex> lock(m->curlmutex);
175
    unique_lock<mutex> lock(m->curlmutex);
151
    if (!m->curldone) {
176
    if (!m->curldone) {
152
        return false;
177
        return false;
153
    }
178
    }
179
    LOGDEB1("CurlFetch::curlDone: curlcode " << m->curl_code << " httpcode " <<
180
           m->curl_http_code << endl);
154
    if (curlcode) {
181
    if (curlcode) {
155
        *curlcode = int(m->curl_code);
182
        *curlcode = int(m->curl_code);
156
    }
183
    }
157
    if (http_code) {
184
    if (http_code) {
158
        *http_code = m->curl_http_code;
185
        *http_code = m->curl_http_code;
159
    }
186
    }
187
    LOGDEB1("CurlTRans::curlDone: done\n");
160
    return true;
188
    return true;
161
}
189
}
162
190
163
bool CurlFetch::waitForHeaders(int secs)
191
bool CurlFetch::waitForHeaders(int secs)
164
{
192
{
165
    LOGDEB1("CurlFetch::waitForHeaders\n");
193
    LOGDEB1("CurlFetch::waitForHeaders\n");
166
    unique_lock<mutex> lock(m->curlmutex);
194
    unique_lock<mutex> lock(m->curlmutex);
167
    m->extWaitingThreads++;
195
    m->extWaitingThreads++;
168
    // We actually wait for the 1st buffer write call, so that we also
196
    // We wait for the 1st buffer write call. If there is no data,
169
    // get an initial status like 404 (we'll wake up on curlrunning==0 in
197
    // we'll stop on curldone.
170
    // this case)
171
    while (m->curlrunning && !m->aborting &&
198
    while (m->curlrunning() && !m->aborting && !m->curldone && 
172
           m->curl_data_count + m->headbuf.bytes== 0) {
199
           m->curl_data_count + m->headbuf.bytes == 0) {
173
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning <<
200
        LOGDEB1("CurlFetch::waitForHeaders: running " << m->curlrunning() <<
174
               " aborting " << m->aborting << " datacount " <<
201
               " aborting " << m->aborting << " datacount " <<
175
               m->curl_data_count + m->headbuf.bytes << "\n");
202
               m->curl_data_count + m->headbuf.bytes << "\n");
176
        if (m->aborting) {
203
        if (m->aborting) {
177
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
204
            LOGDEB("CurlFetch::waitForHeaders: return: abort\n");
178
            m->extWaitingThreads--;
205
            m->extWaitingThreads--;
...
...
189
        }
216
        }
190
    }
217
    }
191
    m->extWaitingThreads--;
218
    m->extWaitingThreads--;
192
    m->curlcv.notify_all();
219
    m->curlcv.notify_all();
193
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
220
    LOGDEB1("CurlFetch::waitForHeaders: returning: headers_ok " <<
194
            m->headers_ok << " curlrunning " << m->curlrunning <<
221
            m->headers_ok << " curlrunning " << m->curlrunning() <<
195
            " aborting " << m->aborting << " datacnt " <<
222
            " aborting " << m->aborting << " datacnt " <<
196
            m->curl_data_count+ m->headbuf.bytes << endl);
223
            m->curl_data_count+ m->headbuf.bytes << endl);
197
    return m->headers_ok;
224
    return m->headers_ok;
198
}
225
}
199
226
...
...
285
    }
312
    }
286
    memcpy(buf->buf, contents, bcnt);
313
    memcpy(buf->buf, contents, bcnt);
287
    buf->bytes = bcnt;
314
    buf->bytes = bcnt;
288
    buf->curoffs = 0;
315
    buf->curoffs = 0;
289
316
290
    LOGDEB2("CurlFetch::calling put on " <<
317
    LOGDEB1("CurlFetch::calling put on " <<
291
            (outqueue ? outqueue->getname() : "null") << endl);
318
            (outqueue ? outqueue->getname() : "null") << endl);
292
    
319
    
293
    if (!outqueue->put(buf)) {
320
    if (!outqueue->put(buf)) {
294
        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
321
        LOGDEB1("CurlFetch::dataBufToQ. queue put failed\n");
295
        delete buf;
322
        delete buf;
...
...
389
{
416
{
390
    LOGDEB1("CurlFetch::curlWorkerFunc\n");
417
    LOGDEB1("CurlFetch::curlWorkerFunc\n");
391
    (void)debug_callback;
418
    (void)debug_callback;
392
    
419
    
393
    {unique_lock<mutex> lock(curlmutex);
420
    {unique_lock<mutex> lock(curlmutex);
394
        curlrunning = true;
395
    }
396
    // Tell the world we're active (start is waiting for this).
421
        // Tell the world we're active (start is waiting for this).
397
    curlcv.notify_all();
422
        curlcv.notify_all();
423
    }
398
    
424
    
399
    if (!curl) {
425
    if (!curl) {
400
        curl = curl_easy_init();
426
        curl = curl_easy_init();
401
        if(!curl) {
427
        if(!curl) {
402
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
428
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_init failed" << endl);
403
            {unique_lock<mutex> lock(curlmutex);
429
            {unique_lock<mutex> lock(curlmutex);
404
                curlrunning = false;
405
                curldone = true;
430
                curldone = true;
406
            }
431
            }
407
            if (outqueue) {
432
            if (outqueue) {
408
                outqueue->setTerminate();
433
                outqueue->setTerminate();
409
            }
434
            }
...
...
449
    if (curl_code == CURLE_OK) {
474
    if (curl_code == CURLE_OK) {
450
        curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &curl_http_code);
475
        curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &curl_http_code);
451
        http_ok = curl_http_code >= 200 && curl_http_code < 300;
476
        http_ok = curl_http_code >= 200 && curl_http_code < 300;
452
    }
477
    }
453
478
454
    {unique_lock<mutex> lock(curlmutex);
479
    // Log/Debug
455
        if (aborting) {
456
            return;
457
        }
458
        if (headbuf.bytes) {
459
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
460
                   headbuf.bytes << " bytes\n");
461
            databufToQ(headbuf.buf, headbuf.bytes);
462
            headbuf.bytes = 0;
463
        }
464
        curlfd = -1;
465
        curlrunning = false;
466
        curldone = true;
467
        curlcv.notify_all();
468
    }
469
470
    if (curl_code != CURLE_OK || !http_ok) {
480
    if (curl_code != CURLE_OK || !http_ok) {
471
        if (curl_code != CURLE_OK) {
481
        if (curl_code != CURLE_OK) {
472
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_perform(): " <<
482
            LOGERR("CurlFetch::curlWorkerFunc: curl_easy_perform(): " <<
473
                   curl_easy_strerror(curl_code) << endl);
483
                   curl_easy_strerror(curl_code) << endl);
474
        } else {
484
        } else {
475
            LOGDEB("CurlFetch::curlWorkerFunc: curl_easy_perform(): http code: "
485
            LOGDEB("CurlFetch::curlWorkerFunc: curl_easy_perform(): http code: "
476
                   << curl_http_code << endl);
486
                   << curl_http_code << endl);
477
        }
487
        }
478
    }
488
    }
479
489
490
    LOGDEB1("CurlFetch::curlWorker: locking\n");
491
    {unique_lock<mutex> lock(curlmutex);
492
        LOGDEB1("CurlFetch::curlWorker: locked\n");
493
        if (aborting) {
494
            return;
495
        }
496
        if (headbuf.bytes) {
497
            LOGDEB1("CurlFetch::curlWorker: flushing headbuf: " <<
498
                   headbuf.bytes << " bytes\n");
499
            databufToQ(headbuf.buf, headbuf.bytes);
500
            headbuf.bytes = 0;
501
        }
502
        curlfd = -1;
503
        curldone = true;
504
        curlcv.notify_all();
505
    }
506
507
    // Normal eos
480
    if (curl_code == CURLE_OK) {
508
    if (curl_code == CURLE_OK) {
481
        // Wake up other side with empty buffer (eof)
509
        // Wake up other side with empty buffer (eof)
482
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
510
        LOGDEB1("CurlFetch::curlWorkerFunc: request done ok: q empty buffer\n");
483
        ABuffer *buf = new ABuffer(0);
511
        ABuffer *buf = new ABuffer(0);
484
        if (!outqueue || !outqueue->put(buf)) {
512
        if (!outqueue || !outqueue->put(buf)) {
485
            delete buf;
513
            delete buf;
486
        }
514
        }
487
        if (outqueue) {
515
        if (outqueue) {
516
            // Wait for our zero buffer to be acknowledged before
517
            // killing the queue
518
            LOGDEB1("CurlFetch::curlworker: waitidle\n");
488
            outqueue->waitIdle();
519
            outqueue->waitIdle();
489
        }
520
        }
490
    }
521
    }
522
    outqueue->setTerminate();
491
    if (eofcb) {
523
    if (eofcb) {
492
        eofcb(curl_code == CURLE_OK, curl_data_count);
524
        eofcb(curl_code == CURLE_OK, curl_data_count);
493
    }
525
    }
494
    LOGDEB1("CurlFetch::curlworker: done\n");
526
    LOGDEB1("CurlFetch::curlworker: done\n");
495
    return;
527
    return;