Switch to unified view

a/src/mediaserver/cdplugins/streamproxy.cpp b/src/mediaserver/cdplugins/streamproxy.cpp
...
...
14
 *   Free Software Foundation, Inc.,
14
 *   Free Software Foundation, Inc.,
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
16
 */
17
17
18
#include "streamproxy.h"
18
#include "streamproxy.h"
19
#include "curlfetch.h"
19
#include "netfetch.h"
20
#include "log.h"
20
#include "log.h"
21
#include "smallut.h"
21
#include "smallut.h"
22
#include "chrono.h"
22
#include "chrono.h"
23
23
24
#include <fcntl.h>
24
#include <fcntl.h>
25
#include <microhttpd.h>
25
#include <microhttpd.h>
26
#include <curl/curl.h>
27
26
28
#include <mutex>
27
#include <mutex>
29
#include <condition_variable>
28
#include <condition_variable>
30
#include <unordered_map>
29
#include <unordered_map>
31
30
32
using namespace std;
31
using namespace std;
33
32
34
class ContentReader {
33
class ContentReader {
35
public:
34
public:
36
    ContentReader(CurlFetch *fetcher, int cfd);
35
    ContentReader(std::unique_ptr<NetFetch> ftchr, int cfd)
36
        : fetcher(std::move(ftchr)), connfd(cfd) {
37
        
38
    }
39
37
    ~ContentReader() {
40
    ~ContentReader() {
38
        LOGDEB1("ContentReader::~ContentReader\n");
41
        LOGDEB1("ContentReader::~ContentReader\n");
39
        // This should not be neccessary but see comments in
42
        // This should not be necessary but see comments in
40
        // tstcurlfetch code
43
        // tstcurlfetch code
41
        fetcher = std::unique_ptr<CurlFetch>();
44
        fetcher = std::unique_ptr<NetFetch>();
42
    }
45
    }
43
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
46
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
44
47
45
    std::unique_ptr<CurlFetch> fetcher;
48
    std::unique_ptr<NetFetch> fetcher;
46
    BufXChange<ABuffer*> queue{"crqueue"};
49
    BufXChange<ABuffer*> queue{"crqueue"};
47
    bool normalEOS{false};
50
    bool normalEOS{false};
48
    // Used for experimentations in killing the connection
51
    // Used for experimentations in killing the connection
49
    int connfd{-1};
52
    int connfd{-1};
50
    int killafterms{-1};
53
    int killafterms{-1};
51
    Chrono chron;
54
    Chrono chron;
52
};
55
};
53
56
54
ContentReader::ContentReader(CurlFetch *f, int cfd)
55
    : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
56
{
57
}
58
57
59
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
58
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
60
{
59
{
61
    LOGDEB1("ContentReader::contentRead: pos "<<pos << " max " << max << endl);
60
    LOGDEB1("ContentReader::contentRead: pos "<<pos << " max " << max << endl);
62
    if (normalEOS) {
61
    if (normalEOS) {
...
...
65
    }
64
    }
66
    size_t totcnt = 0;
65
    size_t totcnt = 0;
67
    ABuffer *abuf;
66
    ABuffer *abuf;
68
    while (totcnt < max) {
67
    while (totcnt < max) {
69
        if (!queue.take(&abuf)) {
68
        if (!queue.take(&abuf)) {
69
            NetFetch::FetchStatus code;
70
            int curlcode{0}, httpcode{0};
70
            int httpcode;
71
            fetcher->curlDone(&curlcode, &httpcode);
71
            fetcher->fetchDone(&code, &httpcode);
72
            LOGDEB("Reader: queue take failed curlcode " << curlcode <<
72
            LOGDEB("Reader: queue take failed code " << code <<
73
                   " httpcode " << httpcode << endl);
73
                   " httpcode " << httpcode << endl);
74
            if (curlcode == CURLE_PARTIAL_FILE) {
74
            if (code == NetFetch::FETCH_RETRYABLE) {
75
                LOGINF("Reader: retrying at " << pos+totcnt << endl);
75
                LOGINF("Reader: retrying at " << pos+totcnt << endl);
76
                fetcher->reset();
76
                fetcher->reset();
77
                fetcher->start(&queue, pos+totcnt);
77
                fetcher->start(&queue, pos+totcnt);
78
                return 0;
78
                return 0;
79
            }
79
            }
...
...
287
    const char *method, const char *version, 
287
    const char *method, const char *version, 
288
    const char *upload_data, size_t *upload_data_size,
288
    const char *upload_data, size_t *upload_data_size,
289
    void **con_cls)
289
    void **con_cls)
290
{
290
{
291
    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
291
    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
292
    NetFetch::FetchStatus fetchcode;
292
    int curlcode, httpcode;
293
    int httpcode;
293
294
294
    if (nullptr == *con_cls) {
295
    if (nullptr == *con_cls) {
295
        uint64_t offset = 0;
296
        uint64_t offset = 0;
296
        // First call, look at headers, method etc.
297
        // First call, look at headers, method etc.
297
#ifdef PRINT_KEYS
298
#ifdef PRINT_KEYS
...
...
307
308
308
        // Compute destination url
309
        // Compute destination url
309
        unordered_map<string,string>querydata;
310
        unordered_map<string,string>querydata;
310
        MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
311
        MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
311
                                  &mapvalues_cb, &querydata);
312
                                  &mapvalues_cb, &querydata);
313
314
        // Request the method (redirect or proxy), and the fetcher if
315
        // we are proxying.
316
        string url(_url);
317
        std::unique_ptr<NetFetch> fetcher;
318
        UrlTransReturn ret = urltrans(url, querydata, fetcher);
312
        
319
        
313
        string url(_url);
314
        UrlTransReturn ret = urltrans(url, querydata);
315
        if (ret == Error) {
320
        if (ret == Error) {
316
            return MHD_NO;
321
            return MHD_NO;
317
        } else if (ret == Redirect) {
322
        } else if (ret == Redirect) {
318
            struct MHD_Response *response =
323
            struct MHD_Response *response =
319
                MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
324
                MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
...
...
325
            int ret = MHD_queue_response(mhdconn, 302, response);
330
            int ret = MHD_queue_response(mhdconn, 302, response);
326
            MHD_destroy_response(response);
331
            MHD_destroy_response(response);
327
            return ret;
332
            return ret;
328
        }
333
        }
329
334
330
        // Create/Start the curl transaction, and wait for the headers.
335
        // The connection fd thing is strictly for debug/diag: faking
331
        LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
336
        // a connection loss so that the client side can exercise the
337
        // retry code.
332
        int cfd{-1};
338
        int cfd{-1};
333
        const union MHD_ConnectionInfo *cinf =
339
        const union MHD_ConnectionInfo *cinf =
334
            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
340
            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
335
        if (nullptr == cinf) {
341
        if (nullptr == cinf) {
336
            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
342
            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
337
        } else {
343
        } else {
338
            cfd = cinf->connect_fd;
344
            cfd = cinf->connect_fd;
339
        }
345
        }
346
347
        // Create/Start the fetch transaction, and wait for the headers.
348
        LOGDEB0("StreamProxy::answerConn: starting fetch for " << url << endl);
340
        auto reader = new ContentReader(new CurlFetch(url), cfd);
349
        auto reader = new ContentReader(std::move(fetcher), cfd);
341
        if (killafterms > 0) {
350
        if (killafterms > 0) {
342
            reader->killafterms = killafterms;
351
            reader->killafterms = killafterms;
343
            killafterms = -1;
352
            killafterms = -1;
344
        }
353
        }
345
        reader->fetcher->start(&reader->queue, offset);
354
        reader->fetcher->start(&reader->queue, offset);
346
        *con_cls = reader;
355
        *con_cls = reader;
356
347
        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
357
        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
348
        return MHD_YES;
358
        return MHD_YES;
349
        // End first call
359
        // End first call
350
    }
360
    }
351
361
352
    // Second call for this request. We know that the curl request has
362
    // Second call for this request. We know that the fetch request has
353
    // proceeded past the headers (else, we would have failed during
363
    // proceeded past the headers (else, we would have failed during
354
    // the first call). Check for a curl error or http 404 or similar
364
    // the first call). Check for an error or http 404 or similar
355
    ContentReader *reader = (ContentReader*)*con_cls;
365
    ContentReader *reader = (ContentReader*)*con_cls;
356
    
366
    
357
    if (!reader->fetcher->waitForHeaders()) {
367
    if (!reader->fetcher->waitForHeaders()) {
358
        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
368
        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
359
        reader->fetcher->curlDone(&curlcode, &httpcode);
369
        reader->fetcher->fetchDone(&fetchcode, &httpcode);
360
        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
370
        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
361
        struct MHD_Response *response =
371
        struct MHD_Response *response =
362
            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
372
            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
363
        int ret = MHD_queue_response(mhdconn, code, response);
373
        int ret = MHD_queue_response(mhdconn, code, response);
364
        MHD_destroy_response(response);
374
        MHD_destroy_response(response);
...
...
394
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
404
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
395
        MHD_add_response_header(response, "Content-Type", ct.c_str());
405
        MHD_add_response_header(response, "Content-Type", ct.c_str());
396
    }
406
    }
397
407
398
    int code = MHD_HTTP_OK;
408
    int code = MHD_HTTP_OK;
399
    LOGDEB1("StreamProxy::answerConn: calling curldone\n");
409
    LOGDEB1("StreamProxy::answerConn: calling fetchDone\n");
400
    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
410
    if (reader->fetcher->fetchDone(&fetchcode, &httpcode)) {
401
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
411
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
402
    }
412
    }
403
    int ret = MHD_queue_response(mhdconn, code, response);
413
    int ret = MHD_queue_response(mhdconn, code, response);
404
    MHD_destroy_response(response);
414
    MHD_destroy_response(response);
405
    return ret;
415
    return ret;