Switch to unified view

a/src/mediaserver/cdplugins/streamproxy.cpp b/src/mediaserver/cdplugins/streamproxy.cpp
...
...
17
17
18
#include "streamproxy.h"
18
#include "streamproxy.h"
19
#include "curlfetch.h"
19
#include "curlfetch.h"
20
#include "log.h"
20
#include "log.h"
21
#include "smallut.h"
21
#include "smallut.h"
22
#include "chrono.h"
22
23
24
#include <fcntl.h>
23
#include <microhttpd.h>
25
#include <microhttpd.h>
26
#include <curl/curl.h>
24
27
25
#include <mutex>
28
#include <mutex>
26
#include <condition_variable>
29
#include <condition_variable>
27
#include <unordered_map>
30
#include <unordered_map>
28
31
29
using namespace std;
32
using namespace std;
30
33
31
class ContentReader {
34
class ContentReader {
32
public:
35
public:
33
    ContentReader(CurlFetch *fetcher);
36
    ContentReader(CurlFetch *fetcher, int cfd);
34
    ~ContentReader() {
37
    ~ContentReader() {
35
        LOGDEB1("ContentReader::~ContentReader\n");
38
        LOGDEB1("ContentReader::~ContentReader\n");
36
        // This should not be neccessary but see comments in
39
        // This should not be neccessary but see comments in
37
        // tstcurlfetch code
40
        // tstcurlfetch code
38
        fetcher = std::unique_ptr<CurlFetch>();
41
        fetcher = std::unique_ptr<CurlFetch>();
...
...
40
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
43
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
41
44
42
    std::unique_ptr<CurlFetch> fetcher;
45
    std::unique_ptr<CurlFetch> fetcher;
43
    BufXChange<ABuffer*> queue{"crqueue"};
46
    BufXChange<ABuffer*> queue{"crqueue"};
44
    bool normalEOS{false};
47
    bool normalEOS{false};
48
    // Used for experimentations in killing the connection
49
    int connfd{-1};
50
    int killafterms{-1};
51
    Chrono chron;
45
};
52
};
46
53
47
ContentReader::ContentReader(CurlFetch *f)
54
ContentReader::ContentReader(CurlFetch *f, int cfd)
48
    : fetcher(std::unique_ptr<CurlFetch>(f))
55
    : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
49
{
56
{
50
}
57
}
51
58
52
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
59
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
53
{
60
{
...
...
58
    }
65
    }
59
    size_t totcnt = 0;
66
    size_t totcnt = 0;
60
    ABuffer *abuf;
67
    ABuffer *abuf;
61
    while (totcnt < max) {
68
    while (totcnt < max) {
62
        if (!queue.take(&abuf)) {
69
        if (!queue.take(&abuf)) {
70
            int curlcode{0}, httpcode{0};
71
            fetcher->curlDone(&curlcode, &httpcode);
72
            LOGDEB("Reader: queue take failed curlcode " << curlcode <<
73
                   " httpcode " << httpcode << endl);
74
            if (curlcode == CURLE_PARTIAL_FILE) {
75
                LOGINF("Reader: retrying at " << pos+totcnt << endl);
76
                fetcher->reset();
77
                fetcher->start(&queue, pos+totcnt);
78
                return 0;
79
            }
63
            LOGDEB("ContentReader::contentRead: return ERROR\n");
80
            LOGDEB("ContentReader::contentRead: return ERROR\n");
64
            return MHD_CONTENT_READER_END_WITH_ERROR;
81
            return MHD_CONTENT_READER_END_WITH_ERROR;
65
        }
82
        }
66
        LOGDEB1("ContentReader::contentRead: got buffer with " <<
83
        LOGDEB1("ContentReader::contentRead: got buffer with " <<
67
                abuf->bytes << " bytes\n");
84
                abuf->bytes << " bytes\n");
...
...
83
            queue.recycle(abuf);
100
            queue.recycle(abuf);
84
        } else {
101
        } else {
85
            queue.untake(abuf);
102
            queue.untake(abuf);
86
        }
103
        }
87
    }
104
    }
105
    if (killafterms > 0 && connfd >= 0) {
106
        if (chron.millis() > killafterms) {
107
            int fd = open("/dev/null", 0);
108
            if (fd < 0) {
109
                abort();
110
            }
111
            dup2(fd, connfd);
112
            close(fd);
113
            connfd = -1;
114
        }
115
    }
88
    LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
116
    LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
89
    return totcnt;
117
    return totcnt;
90
}
118
}
91
119
92
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
120
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
...
...
117
        void **con_cls, enum MHD_RequestTerminationCode toe);
145
        void **con_cls, enum MHD_RequestTerminationCode toe);
118
146
119
    int listenport{-1};
147
    int listenport{-1};
120
    UrlTransFunc urltrans;
148
    UrlTransFunc urltrans;
121
    struct MHD_Daemon *mhd{nullptr};
149
    struct MHD_Daemon *mhd{nullptr};
150
    int killafterms{-1};
122
};
151
};
123
152
124
153
125
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
154
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
126
    : m(new Internal(listenport, urltrans))
155
    : m(new Internal(listenport, urltrans))
127
{
156
{
128
}
157
}
129
158
159
void StreamProxy::setKillAfterMs(int ms)
160
{
161
    m->killafterms = ms;
162
}
163
130
StreamProxy::~StreamProxy()
164
StreamProxy::~StreamProxy()
131
{
165
{
132
}
166
}
167
133
StreamProxy::Internal::~Internal()
168
StreamProxy::Internal::~Internal()
134
{
169
{
135
    LOGDEB("StreamProxy::Internal::~Internal()\n");
170
    LOGDEB("StreamProxy::Internal::~Internal()\n");
136
    if (mhd) {
171
    if (mhd) {
137
        MHD_stop_daemon(mhd);
172
        MHD_stop_daemon(mhd);
...
...
160
    } else {
195
    } else {
161
        return -1;
196
        return -1;
162
    }
197
    }
163
}
198
}
164
199
165
#define PRINT_KEYS
200
#undef PRINT_KEYS
166
#ifdef PRINT_KEYS
201
#ifdef PRINT_KEYS
167
static vector<CharFlags> valueKind {
202
static vector<CharFlags> valueKind {
168
    {MHD_RESPONSE_HEADER_KIND, "Response header"},
203
    {MHD_RESPONSE_HEADER_KIND, "Response header"},
169
    {MHD_HEADER_KIND, "HTTP header"},
204
    {MHD_HEADER_KIND, "HTTP header"},
170
    {MHD_COOKIE_KIND, "Cookies"},
205
    {MHD_COOKIE_KIND, "Cookies"},
...
...
251
    struct MHD_Connection *mhdconn, const char *_url,
286
    struct MHD_Connection *mhdconn, const char *_url,
252
    const char *method, const char *version, 
287
    const char *method, const char *version, 
253
    const char *upload_data, size_t *upload_data_size,
288
    const char *upload_data, size_t *upload_data_size,
254
    void **con_cls)
289
    void **con_cls)
255
{
290
{
291
    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
256
    int curlcode; long httpcode;
292
    int curlcode, httpcode;
257
293
258
    if (nullptr == *con_cls) {
294
    if (nullptr == *con_cls) {
259
        uint64_t offset = 0;
295
        uint64_t offset = 0;
260
        // First call, look at headers, method etc.
296
        // First call, look at headers, method etc.
261
#ifdef PRINT_KEYS
297
#ifdef PRINT_KEYS
...
...
290
            MHD_destroy_response(response);
326
            MHD_destroy_response(response);
291
            return ret;
327
            return ret;
292
        }
328
        }
293
329
294
        // Create/Start the curl transaction, and wait for the headers.
330
        // Create/Start the curl transaction, and wait for the headers.
295
        LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
331
        LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
332
        int cfd{-1};
333
        const union MHD_ConnectionInfo *cinf =
334
            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
335
        if (nullptr == cinf) {
336
            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
337
        } else {
338
            cfd = cinf->connect_fd;
339
        }
296
        auto reader = new ContentReader(new CurlFetch(url));
340
        auto reader = new ContentReader(new CurlFetch(url), cfd);
341
        if (killafterms > 0) {
342
            reader->killafterms = killafterms;
343
            killafterms = -1;
344
        }
297
        reader->fetcher->start(&reader->queue, offset);
345
        reader->fetcher->start(&reader->queue, offset);
298
        *con_cls = reader;
346
        *con_cls = reader;
347
        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
299
        return MHD_YES;
348
        return MHD_YES;
300
        // End first call
349
        // End first call
301
    }
350
    }
302
351
303
    // Second call for this request. We know that the curl request has
352
    // Second call for this request. We know that the curl request has
...
...
315
        MHD_destroy_response(response);
364
        MHD_destroy_response(response);
316
        LOGINF("StreamProxy::answerConn (1): return with http code: " <<
365
        LOGINF("StreamProxy::answerConn (1): return with http code: " <<
317
               code << endl);
366
               code << endl);
318
        return ret;
367
        return ret;
319
    }
368
    }
369
    LOGDEB1("StreamProxy::answerConn: waitForHeaders done\n");
320
370
321
    string cl;
371
    string cl;
322
    uint64_t size = MHD_SIZE_UNKNOWN;
372
    uint64_t size = MHD_SIZE_UNKNOWN;
323
    if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
373
    if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
324
        LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
374
        LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
325
        size  = (uint64_t)atoll(cl.c_str());
375
        size  = (uint64_t)atoll(cl.c_str());
326
    }
376
    }
327
328
    // Build a data response.
377
    // Build a data response.
329
    // the block size seems to be flatly ignored by libmicrohttpd
378
    // the block size seems to be flatly ignored by libmicrohttpd
330
    // Any random value would probably work the same
379
    // Any random value would probably work the same
331
    struct MHD_Response *response = 
380
    struct MHD_Response *response = 
332
        MHD_create_response_from_callback(size, 4096,
381
        MHD_create_response_from_callback(size, 4096,
...
...
345
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
394
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
346
        MHD_add_response_header(response, "Content-Type", ct.c_str());
395
        MHD_add_response_header(response, "Content-Type", ct.c_str());
347
    }
396
    }
348
397
349
    int code = MHD_HTTP_OK;
398
    int code = MHD_HTTP_OK;
399
    LOGDEB1("StreamProxy::answerConn: calling curldone\n");
350
    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
400
    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
351
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
401
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
352
    }
402
    }
353
    int ret = MHD_queue_response(mhdconn, code, response);
403
    int ret = MHD_queue_response(mhdconn, code, response);
354
    MHD_destroy_response(response);
404
    MHD_destroy_response(response);