Switch to unified view

a/src/mediaserver/cdplugins/streamproxy.cpp b/src/mediaserver/cdplugins/streamproxy.cpp
...
...
14
 *   Free Software Foundation, Inc.,
14
 *   Free Software Foundation, Inc.,
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
16
 */
17
17
18
#include "streamproxy.h"
18
#include "streamproxy.h"
19
#include "netfetch.h"
19
#include "curlfetch.h"
20
#include "curlfetch.h"
20
#include "log.h"
21
#include "log.h"
21
#include "smallut.h"
22
#include "smallut.h"
22
#include "chrono.h"
23
#include "chrono.h"
23
24
24
#include <fcntl.h>
25
#include <fcntl.h>
25
#include <microhttpd.h>
26
#include <microhttpd.h>
26
#include <curl/curl.h>
27
27
28
#include <mutex>
28
#include <mutex>
29
#include <condition_variable>
29
#include <condition_variable>
30
#include <unordered_map>
30
#include <unordered_map>
31
31
32
using namespace std;
32
using namespace std;
33
33
34
class ContentReader {
34
class ContentReader {
35
public:
35
public:
36
    ContentReader(CurlFetch *fetcher, int cfd);
36
    ContentReader(NetFetch *fetcher, int cfd);
37
    ~ContentReader() {
37
    ~ContentReader() {
38
        LOGDEB1("ContentReader::~ContentReader\n");
38
        LOGDEB1("ContentReader::~ContentReader\n");
39
        // This should not be neccessary but see comments in
39
        // This should not be necessary but see comments in
40
        // tstcurlfetch code
40
        // tstcurlfetch code
41
        fetcher = std::unique_ptr<CurlFetch>();
41
        fetcher = std::unique_ptr<NetFetch>();
42
    }
42
    }
43
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
43
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
44
44
45
    std::unique_ptr<CurlFetch> fetcher;
45
    std::unique_ptr<NetFetch> fetcher;
46
    BufXChange<ABuffer*> queue{"crqueue"};
46
    BufXChange<ABuffer*> queue{"crqueue"};
47
    bool normalEOS{false};
47
    bool normalEOS{false};
48
    // Used for experimentations in killing the connection
48
    // Used for experimentations in killing the connection
49
    int connfd{-1};
49
    int connfd{-1};
50
    int killafterms{-1};
50
    int killafterms{-1};
51
    Chrono chron;
51
    Chrono chron;
52
};
52
};
53
53
54
ContentReader::ContentReader(CurlFetch *f, int cfd)
54
ContentReader::ContentReader(NetFetch *f, int cfd)
55
    : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
55
    : fetcher(std::unique_ptr<NetFetch>(f)), connfd(cfd)
56
{
56
{
57
}
57
}
58
58
59
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
59
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
60
{
60
{
...
...
65
    }
65
    }
66
    size_t totcnt = 0;
66
    size_t totcnt = 0;
67
    ABuffer *abuf;
67
    ABuffer *abuf;
68
    while (totcnt < max) {
68
    while (totcnt < max) {
69
        if (!queue.take(&abuf)) {
69
        if (!queue.take(&abuf)) {
70
            NetFetch::FetchStatus code;
70
            int curlcode{0}, httpcode{0};
71
            int httpcode;
71
            fetcher->curlDone(&curlcode, &httpcode);
72
            fetcher->fetchDone(&code, &httpcode);
72
            LOGDEB("Reader: queue take failed curlcode " << curlcode <<
73
            LOGDEB("Reader: queue take failed code " << code <<
73
                   " httpcode " << httpcode << endl);
74
                   " httpcode " << httpcode << endl);
74
            if (curlcode == CURLE_PARTIAL_FILE) {
75
            if (code == NetFetch::FETCH_RETRYABLE) {
75
                LOGINF("Reader: retrying at " << pos+totcnt << endl);
76
                LOGINF("Reader: retrying at " << pos+totcnt << endl);
76
                fetcher->reset();
77
                fetcher->reset();
77
                fetcher->start(&queue, pos+totcnt);
78
                fetcher->start(&queue, pos+totcnt);
78
                return 0;
79
                return 0;
79
            }
80
            }
...
...
287
    const char *method, const char *version, 
288
    const char *method, const char *version, 
288
    const char *upload_data, size_t *upload_data_size,
289
    const char *upload_data, size_t *upload_data_size,
289
    void **con_cls)
290
    void **con_cls)
290
{
291
{
291
    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
292
    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
293
    NetFetch::FetchStatus fetchcode;
292
    int curlcode, httpcode;
294
    int httpcode;
293
295
294
    if (nullptr == *con_cls) {
296
    if (nullptr == *con_cls) {
295
        uint64_t offset = 0;
297
        uint64_t offset = 0;
296
        // First call, look at headers, method etc.
298
        // First call, look at headers, method etc.
297
#ifdef PRINT_KEYS
299
#ifdef PRINT_KEYS
...
...
325
            int ret = MHD_queue_response(mhdconn, 302, response);
327
            int ret = MHD_queue_response(mhdconn, 302, response);
326
            MHD_destroy_response(response);
328
            MHD_destroy_response(response);
327
            return ret;
329
            return ret;
328
        }
330
        }
329
331
330
        // Create/Start the curl transaction, and wait for the headers.
332
        // Create/Start the fetch transaction, and wait for the headers.
331
        LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
333
        LOGDEB0("StreamProxy::answerConn: starting fetch for " << url << endl);
332
        int cfd{-1};
334
        int cfd{-1};
333
        const union MHD_ConnectionInfo *cinf =
335
        const union MHD_ConnectionInfo *cinf =
334
            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
336
            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
335
        if (nullptr == cinf) {
337
        if (nullptr == cinf) {
336
            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
338
            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
...
...
347
        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
349
        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
348
        return MHD_YES;
350
        return MHD_YES;
349
        // End first call
351
        // End first call
350
    }
352
    }
351
353
352
    // Second call for this request. We know that the curl request has
354
    // Second call for this request. We know that the fetch request has
353
    // proceeded past the headers (else, we would have failed during
355
    // proceeded past the headers (else, we would have failed during
354
    // the first call). Check for a curl error or http 404 or similar
356
    // the first call). Check for an error or http 404 or similar
355
    ContentReader *reader = (ContentReader*)*con_cls;
357
    ContentReader *reader = (ContentReader*)*con_cls;
356
    
358
    
357
    if (!reader->fetcher->waitForHeaders()) {
359
    if (!reader->fetcher->waitForHeaders()) {
358
        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
360
        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
359
        reader->fetcher->curlDone(&curlcode, &httpcode);
361
        reader->fetcher->fetchDone(&fetchcode, &httpcode);
360
        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
362
        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
361
        struct MHD_Response *response =
363
        struct MHD_Response *response =
362
            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
364
            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
363
        int ret = MHD_queue_response(mhdconn, code, response);
365
        int ret = MHD_queue_response(mhdconn, code, response);
364
        MHD_destroy_response(response);
366
        MHD_destroy_response(response);
...
...
394
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
396
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
395
        MHD_add_response_header(response, "Content-Type", ct.c_str());
397
        MHD_add_response_header(response, "Content-Type", ct.c_str());
396
    }
398
    }
397
399
398
    int code = MHD_HTTP_OK;
400
    int code = MHD_HTTP_OK;
399
    LOGDEB1("StreamProxy::answerConn: calling curldone\n");
401
    LOGDEB1("StreamProxy::answerConn: calling fetchDone\n");
400
    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
402
    if (reader->fetcher->fetchDone(&fetchcode, &httpcode)) {
401
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
403
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
402
    }
404
    }
403
    int ret = MHD_queue_response(mhdconn, code, response);
405
    int ret = MHD_queue_response(mhdconn, code, response);
404
    MHD_destroy_response(response);
406
    MHD_destroy_response(response);
405
    return ret;
407
    return ret;