Switch to unified view

a b/src/mediaserver/cdplugins/streamproxy.cpp
1
/* Copyright (C) 2017-2018 J.F.Dockes
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the
14
 *   Free Software Foundation, Inc.,
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
18
#include "streamproxy.h"
19
#include "curlfetch.h"
20
#include "log.h"
21
#include "smallut.h"
22
23
#include <microhttpd.h>
24
25
#include <mutex>
26
#include <condition_variable>
27
#include <unordered_map>
28
29
using namespace std;
30
31
class ContentReader {
32
public:
33
    ContentReader(CurlFetch *fetcher);
34
    ~ContentReader() {
35
        LOGDEB1("ContentReader::~ContentReader\n");
36
        // This should not be neccessary but see comments in
37
        // tstcurlfetch code
38
        fetcher = std::unique_ptr<CurlFetch>();
39
    }
40
    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
41
42
    std::unique_ptr<CurlFetch> fetcher;
43
    BufXChange<ABuffer*> queue{"crqueue"};
44
    bool normalEOS{false};
45
};
46
47
ContentReader::ContentReader(CurlFetch *f)
48
    : fetcher(std::unique_ptr<CurlFetch>(f))
49
{
50
}
51
52
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
53
{
54
    LOGDEB1("ContentReader::contentRead: pos "<<pos << " max " << max << endl);
55
    if (normalEOS) {
56
        LOGDEB1("ContentReader::contentRead: return EOS\n");
57
        return MHD_CONTENT_READER_END_OF_STREAM;
58
    }
59
    size_t totcnt = 0;
60
    ABuffer *abuf;
61
    while (totcnt < max) {
62
        if (!queue.take(&abuf)) {
63
            LOGDEB("ContentReader::contentRead: return ERROR\n");
64
            return MHD_CONTENT_READER_END_WITH_ERROR;
65
        }
66
        LOGDEB1("ContentReader::contentRead: got buffer with " <<
67
                abuf->bytes << " bytes\n");
68
        if (abuf->bytes == 0) {
69
            normalEOS = true;
70
            if (totcnt == 0) {
71
                LOGDEB1("ContentReader::contentRead: return EOS\n");
72
                return MHD_CONTENT_READER_END_OF_STREAM;
73
            } else {
74
                // Copied data, we will return eostream on next call.
75
                break;
76
            }
77
        }
78
        size_t tocopy = MIN(max-totcnt, abuf->bytes - abuf->curoffs);
79
        memcpy(obuf + totcnt, abuf->buf + abuf->curoffs, tocopy);
80
        totcnt += tocopy;
81
        abuf->curoffs += tocopy;
82
        if (abuf->curoffs >= abuf->bytes) {
83
            queue.recycle(abuf);
84
        } else {
85
            queue.untake(abuf);
86
        }
87
    }
88
    LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
89
    return totcnt;
90
}
91
92
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
93
{
94
    ContentReader *reader = static_cast<ContentReader*>(cls);
95
    if (reader) {
96
        return reader->contentRead(pos, buf, max);
97
    } else {
98
        return -1;
99
    }
100
}
101
102
103
class StreamProxy::Internal {
104
public:
105
    Internal(int listenport, UrlTransFunc urlt);
106
    ~Internal();
107
    bool startMHD();
108
109
    int answerConn(
110
        struct MHD_Connection *connection, const char *url, 
111
        const char *method, const char *version, 
112
        const char *upload_data, size_t *upload_data_size,
113
        void **con_cls);
114
115
    void requestCompleted(
116
        struct MHD_Connection *conn,
117
        void **con_cls, enum MHD_RequestTerminationCode toe);
118
119
    int listenport{-1};
120
    UrlTransFunc urltrans;
121
    struct MHD_Daemon *mhd{nullptr};
122
};
123
124
125
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
126
    : m(new Internal(listenport, urltrans))
127
{
128
}
129
130
StreamProxy::~StreamProxy()
131
{
132
}
133
StreamProxy::Internal::~Internal()
134
{
135
    LOGDEB("StreamProxy::Internal::~Internal()\n");
136
    if (mhd) {
137
        MHD_stop_daemon(mhd);
138
        mhd = nullptr;
139
    }
140
}
141
142
StreamProxy::Internal::Internal(int _listenport, UrlTransFunc _urltrans)
143
    : listenport(_listenport), urltrans(_urltrans)
144
{
145
    startMHD();
146
}
147
148
149
static int answer_to_connection(
150
    void *cls, struct MHD_Connection *conn, 
151
    const char *url, const char *method, const char *version, 
152
    const char *upload_data, size_t *upload_data_size,
153
    void **con_cls)
154
{
155
    StreamProxy::Internal *internal = static_cast<StreamProxy::Internal*>(cls);
156
    
157
    if (internal) {
158
        return internal->answerConn(
159
            conn, url, method, version, upload_data, upload_data_size, con_cls);
160
    } else {
161
        return -1;
162
    }
163
}
164
165
#define PRINT_KEYS
166
#ifdef PRINT_KEYS
167
static vector<CharFlags> valueKind {
168
    {MHD_RESPONSE_HEADER_KIND, "Response header"},
169
    {MHD_HEADER_KIND, "HTTP header"},
170
    {MHD_COOKIE_KIND, "Cookies"},
171
    {MHD_POSTDATA_KIND, "POST data"},
172
    {MHD_GET_ARGUMENT_KIND, "GET (URI) arguments"},
173
    {MHD_FOOTER_KIND, "HTTP footer"},
174
        };
175
176
static int print_out_key (void *cls, enum MHD_ValueKind kind, 
177
                          const char *key, const char *value)
178
{
179
    LOGDEB(valToString(valueKind, kind) << ": " << key << " -> " <<
180
           value << endl);
181
    return MHD_YES;
182
}
183
#endif /* PRINT_KEYS */
184
185
static int mapvalues_cb(void *cls, enum MHD_ValueKind kind, 
186
                        const char *key, const char *value)
187
{
188
    unordered_map<string,string> *mp = (unordered_map<string,string> *)cls;
189
    if (mp) {
190
        (*mp)[key] = value;
191
    }
192
    return MHD_YES;
193
}
194
195
// Parse range header. 
196
static bool parseRanges(
197
    const string& ranges, vector<pair<int64_t, int64_t>>& oranges)
198
{
199
    oranges.clear();
200
    string::size_type pos = ranges.find("bytes=");
201
    if (pos == string::npos) {
202
        return false;
203
    }
204
    pos += 6;
205
    bool done = false;
206
    while(!done) {
207
        string::size_type dash = ranges.find('-', pos);
208
        if (dash == string::npos) {
209
            return false;
210
        }
211
        string::size_type comma = ranges.find(',', pos);
212
        string firstPart = ranges.substr(pos, dash-pos);
213
        int64_t start = firstPart.empty() ? 0 : atoll(firstPart.c_str());
214
        string secondPart = ranges.substr(dash+1, comma != string::npos ? 
215
                                          comma-dash-1 : string::npos);
216
        int64_t fin = secondPart.empty() ? -1 : atoll(secondPart.c_str());
217
        pair<int64_t, int64_t> nrange(start,fin);
218
        oranges.push_back(nrange);
219
        if (comma != string::npos) {
220
            pos = comma + 1;
221
        }
222
        done = comma == string::npos;
223
    }
224
    return true;
225
}
226
227
static bool processRange(struct MHD_Connection *mhdconn, uint64_t& offset)
228
{
229
    const char* rangeh =
230
        MHD_lookup_connection_value(mhdconn, MHD_HEADER_KIND, "range");
231
    vector<pair<int64_t, int64_t> > ranges;
232
    if (rangeh && parseRanges(rangeh, ranges) && ranges.size()) {
233
        if (ranges[0].second != -1 || ranges.size() > 1) {
234
            LOGERR("AProxy::mhdAnswerConn: unsupported range: " <<
235
                   rangeh << "\n");
236
            struct MHD_Response *response = 
237
                MHD_create_response_from_buffer(0,0,MHD_RESPMEM_PERSISTENT);
238
            MHD_queue_response(mhdconn, 416, response);
239
            MHD_destroy_response(response);
240
            return false;
241
        } else {
242
            offset = (uint64_t)ranges[0].first;
243
            LOGDEB("AProxy::mhdAnswerConn: accept xx- range, offset "
244
                   << offset << endl);
245
        }
246
    }
247
    return true;
248
}
249
250
int StreamProxy::Internal::answerConn(
251
    struct MHD_Connection *mhdconn, const char *_url,
252
    const char *method, const char *version, 
253
    const char *upload_data, size_t *upload_data_size,
254
    void **con_cls)
255
{
256
    int curlcode; long httpcode;
257
258
    if (nullptr == *con_cls) {
259
        uint64_t offset = 0;
260
        // First call, look at headers, method etc.
261
#ifdef PRINT_KEYS
262
        MHD_get_connection_values(mhdconn,MHD_HEADER_KIND,&print_out_key,0);
263
#endif
264
        if (strcmp("GET", method) && strcmp("HEAD", method)) {
265
            LOGERR("StreamProxy::answerConn: method is not GET or HEAD\n");
266
            return MHD_NO;
267
        }
268
        if (!processRange(mhdconn, offset)) {
269
            return MHD_NO;
270
        }
271
272
        // Compute destination url
273
        unordered_map<string,string>querydata;
274
        MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
275
                                  &mapvalues_cb, &querydata);
276
        
277
        string url(_url);
278
        UrlTransReturn ret = urltrans(url, querydata);
279
        if (ret == Error) {
280
            return MHD_NO;
281
        } else if (ret == Redirect) {
282
            struct MHD_Response *response =
283
                MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
284
            if (nullptr == response ) {
285
                LOGERR("StreamProxy::answerConn: can't create redirect\n");
286
                return MHD_NO;
287
            }
288
            MHD_add_response_header (response, "Location", url.c_str());
289
            int ret = MHD_queue_response(mhdconn, 302, response);
290
            MHD_destroy_response(response);
291
            return ret;
292
        }
293
294
        // Create/Start the curl transaction, and wait for the headers.
295
        LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
296
        auto reader = new ContentReader(new CurlFetch(url));
297
        reader->fetcher->start(&reader->queue, offset);
298
        *con_cls = reader;
299
        return MHD_YES;
300
        // End first call
301
    }
302
303
    // Second call for this request. We know that the curl request has
304
    // proceeded past the headers (else, we would have failed during
305
    // the first call). Check for a curl error or http 404 or similar
306
    ContentReader *reader = (ContentReader*)*con_cls;
307
    
308
    if (!reader->fetcher->waitForHeaders()) {
309
        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
310
        reader->fetcher->curlDone(&curlcode, &httpcode);
311
        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
312
        struct MHD_Response *response =
313
            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
314
        int ret = MHD_queue_response(mhdconn, code, response);
315
        MHD_destroy_response(response);
316
        LOGINF("StreamProxy::answerConn (1): return with http code: " <<
317
               code << endl);
318
        return ret;
319
    }
320
321
    string cl;
322
    uint64_t size = MHD_SIZE_UNKNOWN;
323
    if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
324
        LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
325
        size  = (uint64_t)atoll(cl.c_str());
326
    }
327
328
    // Build a data response.
329
    // the block size seems to be flatly ignored by libmicrohttpd
330
    // Any random value would probably work the same
331
    struct MHD_Response *response = 
332
        MHD_create_response_from_callback(size, 4096,
333
                                          content_reader_cb, reader, nullptr);
334
    if (response == NULL) {
335
        LOGERR("mhdAnswerConn: answer: could not create response" << endl);
336
        return MHD_NO;
337
    }
338
339
    MHD_add_response_header (response, "Accept-Ranges", "bytes");
340
    if (!cl.empty()) {
341
        MHD_add_response_header(response, "Content-Length", cl.c_str());
342
    }
343
    string ct;
344
    if (reader->fetcher->headerValue("content-type", ct) && !ct.empty()) {
345
        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
346
        MHD_add_response_header(response, "Content-Type", ct.c_str());
347
    }
348
349
    int code = MHD_HTTP_OK;
350
    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
351
        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
352
    }
353
    int ret = MHD_queue_response(mhdconn, code, response);
354
    MHD_destroy_response(response);
355
    return ret;
356
}
357
358
static void
359
request_completed_callback(
360
    void *cls, struct MHD_Connection *conn,
361
    void **con_cls, enum MHD_RequestTerminationCode toe)
362
{
363
    // We get this even if the answer callback returned MHD_NO
364
    // (e.g. for a second connection). check con_cls and do nothing if
365
    // it's not set.
366
    if (cls && *con_cls) {
367
        StreamProxy::Internal *internal =
368
            static_cast<StreamProxy::Internal*>(cls);
369
        return internal->requestCompleted(conn, con_cls, toe);
370
    }
371
}
372
373
static vector<CharFlags> completionStatus {
374
    {MHD_REQUEST_TERMINATED_COMPLETED_OK,
375
            "MHD_REQUEST_TERMINATED_COMPLETED_OK", ""},
376
    {MHD_REQUEST_TERMINATED_WITH_ERROR,
377
            "MHD_REQUEST_TERMINATED_WITH_ERROR", ""},
378
    {MHD_REQUEST_TERMINATED_TIMEOUT_REACHED,
379
            "MHD_REQUEST_TERMINATED_TIMEOUT_REACHED", ""},
380
    {MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN,
381
            "MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN", ""},
382
    {MHD_REQUEST_TERMINATED_READ_ERROR,
383
            "MHD_REQUEST_TERMINATED_READ_ERROR", ""},
384
    {MHD_REQUEST_TERMINATED_CLIENT_ABORT,
385
            "MHD_REQUEST_TERMINATED_CLIENT_ABORT", ""},
386
        };
387
388
void StreamProxy::Internal::requestCompleted(
389
    struct MHD_Connection *conn,
390
    void **con_cls, enum MHD_RequestTerminationCode toe)
391
{
392
    LOGDEB("StreamProxy::requestCompleted: status " <<
393
           valToString(completionStatus, toe) << endl);
394
    if (*con_cls) {
395
        ContentReader *reader = static_cast<ContentReader*>(*con_cls);
396
        delete reader;
397
    }
398
}
399
400
bool StreamProxy::Internal::startMHD()
401
{
402
    mhd = MHD_start_daemon(
403
        MHD_USE_THREAD_PER_CONNECTION|MHD_USE_SELECT_INTERNALLY|MHD_USE_DEBUG,
404
        listenport, 
405
        /* Accept policy callback and arg */
406
        nullptr, nullptr, 
407
        /* handler and arg */
408
        &answer_to_connection, this,
409
        MHD_OPTION_NOTIFY_COMPLETED, request_completed_callback, this,
410
        MHD_OPTION_END);
411
412
    if (nullptr == mhd) {
413
        LOGERR("Aproxy: MHD_start_daemon failed\n");
414
        return false;
415
    }
416
417
    return true;
418
}