|
a |
|
b/src/mediaserver/cdplugins/streamproxy.cpp |
|
|
1 |
/* Copyright (C) 2017-2018 J.F.Dockes
|
|
|
2 |
* This program is free software; you can redistribute it and/or modify
|
|
|
3 |
* it under the terms of the GNU General Public License as published by
|
|
|
4 |
* the Free Software Foundation; either version 2 of the License, or
|
|
|
5 |
* (at your option) any later version.
|
|
|
6 |
*
|
|
|
7 |
* This program is distributed in the hope that it will be useful,
|
|
|
8 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
9 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
10 |
* GNU General Public License for more details.
|
|
|
11 |
*
|
|
|
12 |
* You should have received a copy of the GNU General Public License
|
|
|
13 |
* along with this program; if not, write to the
|
|
|
14 |
* Free Software Foundation, Inc.,
|
|
|
15 |
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
|
16 |
*/
|
|
|
17 |
|
|
|
18 |
#include "streamproxy.h"
|
|
|
19 |
#include "curlfetch.h"
|
|
|
20 |
#include "log.h"
|
|
|
21 |
#include "smallut.h"
|
|
|
22 |
|
|
|
23 |
#include <microhttpd.h>
|
|
|
24 |
|
|
|
25 |
#include <mutex>
|
|
|
26 |
#include <condition_variable>
|
|
|
27 |
#include <unordered_map>
|
|
|
28 |
|
|
|
29 |
using namespace std;
|
|
|
30 |
|
|
|
31 |
class ContentReader {
|
|
|
32 |
public:
|
|
|
33 |
ContentReader(CurlFetch *fetcher);
|
|
|
34 |
~ContentReader() {
|
|
|
35 |
LOGDEB1("ContentReader::~ContentReader\n");
|
|
|
36 |
// This should not be neccessary but see comments in
|
|
|
37 |
// tstcurlfetch code
|
|
|
38 |
fetcher = std::unique_ptr<CurlFetch>();
|
|
|
39 |
}
|
|
|
40 |
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
|
|
|
41 |
|
|
|
42 |
std::unique_ptr<CurlFetch> fetcher;
|
|
|
43 |
BufXChange<ABuffer*> queue{"crqueue"};
|
|
|
44 |
bool normalEOS{false};
|
|
|
45 |
};
|
|
|
46 |
|
|
|
47 |
ContentReader::ContentReader(CurlFetch *f)
|
|
|
48 |
: fetcher(std::unique_ptr<CurlFetch>(f))
|
|
|
49 |
{
|
|
|
50 |
}
|
|
|
51 |
|
|
|
52 |
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
|
|
|
53 |
{
|
|
|
54 |
LOGDEB1("ContentReader::contentRead: pos "<<pos << " max " << max << endl);
|
|
|
55 |
if (normalEOS) {
|
|
|
56 |
LOGDEB1("ContentReader::contentRead: return EOS\n");
|
|
|
57 |
return MHD_CONTENT_READER_END_OF_STREAM;
|
|
|
58 |
}
|
|
|
59 |
size_t totcnt = 0;
|
|
|
60 |
ABuffer *abuf;
|
|
|
61 |
while (totcnt < max) {
|
|
|
62 |
if (!queue.take(&abuf)) {
|
|
|
63 |
LOGDEB("ContentReader::contentRead: return ERROR\n");
|
|
|
64 |
return MHD_CONTENT_READER_END_WITH_ERROR;
|
|
|
65 |
}
|
|
|
66 |
LOGDEB1("ContentReader::contentRead: got buffer with " <<
|
|
|
67 |
abuf->bytes << " bytes\n");
|
|
|
68 |
if (abuf->bytes == 0) {
|
|
|
69 |
normalEOS = true;
|
|
|
70 |
if (totcnt == 0) {
|
|
|
71 |
LOGDEB1("ContentReader::contentRead: return EOS\n");
|
|
|
72 |
return MHD_CONTENT_READER_END_OF_STREAM;
|
|
|
73 |
} else {
|
|
|
74 |
// Copied data, we will return eostream on next call.
|
|
|
75 |
break;
|
|
|
76 |
}
|
|
|
77 |
}
|
|
|
78 |
size_t tocopy = MIN(max-totcnt, abuf->bytes - abuf->curoffs);
|
|
|
79 |
memcpy(obuf + totcnt, abuf->buf + abuf->curoffs, tocopy);
|
|
|
80 |
totcnt += tocopy;
|
|
|
81 |
abuf->curoffs += tocopy;
|
|
|
82 |
if (abuf->curoffs >= abuf->bytes) {
|
|
|
83 |
queue.recycle(abuf);
|
|
|
84 |
} else {
|
|
|
85 |
queue.untake(abuf);
|
|
|
86 |
}
|
|
|
87 |
}
|
|
|
88 |
LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
|
|
|
89 |
return totcnt;
|
|
|
90 |
}
|
|
|
91 |
|
|
|
92 |
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
|
|
|
93 |
{
|
|
|
94 |
ContentReader *reader = static_cast<ContentReader*>(cls);
|
|
|
95 |
if (reader) {
|
|
|
96 |
return reader->contentRead(pos, buf, max);
|
|
|
97 |
} else {
|
|
|
98 |
return -1;
|
|
|
99 |
}
|
|
|
100 |
}
|
|
|
101 |
|
|
|
102 |
|
|
|
103 |
class StreamProxy::Internal {
|
|
|
104 |
public:
|
|
|
105 |
Internal(int listenport, UrlTransFunc urlt);
|
|
|
106 |
~Internal();
|
|
|
107 |
bool startMHD();
|
|
|
108 |
|
|
|
109 |
int answerConn(
|
|
|
110 |
struct MHD_Connection *connection, const char *url,
|
|
|
111 |
const char *method, const char *version,
|
|
|
112 |
const char *upload_data, size_t *upload_data_size,
|
|
|
113 |
void **con_cls);
|
|
|
114 |
|
|
|
115 |
void requestCompleted(
|
|
|
116 |
struct MHD_Connection *conn,
|
|
|
117 |
void **con_cls, enum MHD_RequestTerminationCode toe);
|
|
|
118 |
|
|
|
119 |
int listenport{-1};
|
|
|
120 |
UrlTransFunc urltrans;
|
|
|
121 |
struct MHD_Daemon *mhd{nullptr};
|
|
|
122 |
};
|
|
|
123 |
|
|
|
124 |
|
|
|
125 |
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
|
|
|
126 |
: m(new Internal(listenport, urltrans))
|
|
|
127 |
{
|
|
|
128 |
}
|
|
|
129 |
|
|
|
130 |
StreamProxy::~StreamProxy()
|
|
|
131 |
{
|
|
|
132 |
}
|
|
|
133 |
StreamProxy::Internal::~Internal()
|
|
|
134 |
{
|
|
|
135 |
LOGDEB("StreamProxy::Internal::~Internal()\n");
|
|
|
136 |
if (mhd) {
|
|
|
137 |
MHD_stop_daemon(mhd);
|
|
|
138 |
mhd = nullptr;
|
|
|
139 |
}
|
|
|
140 |
}
|
|
|
141 |
|
|
|
142 |
StreamProxy::Internal::Internal(int _listenport, UrlTransFunc _urltrans)
|
|
|
143 |
: listenport(_listenport), urltrans(_urltrans)
|
|
|
144 |
{
|
|
|
145 |
startMHD();
|
|
|
146 |
}
|
|
|
147 |
|
|
|
148 |
|
|
|
149 |
static int answer_to_connection(
|
|
|
150 |
void *cls, struct MHD_Connection *conn,
|
|
|
151 |
const char *url, const char *method, const char *version,
|
|
|
152 |
const char *upload_data, size_t *upload_data_size,
|
|
|
153 |
void **con_cls)
|
|
|
154 |
{
|
|
|
155 |
StreamProxy::Internal *internal = static_cast<StreamProxy::Internal*>(cls);
|
|
|
156 |
|
|
|
157 |
if (internal) {
|
|
|
158 |
return internal->answerConn(
|
|
|
159 |
conn, url, method, version, upload_data, upload_data_size, con_cls);
|
|
|
160 |
} else {
|
|
|
161 |
return -1;
|
|
|
162 |
}
|
|
|
163 |
}
|
|
|
164 |
|
|
|
165 |
#define PRINT_KEYS
|
|
|
166 |
#ifdef PRINT_KEYS
|
|
|
167 |
static vector<CharFlags> valueKind {
|
|
|
168 |
{MHD_RESPONSE_HEADER_KIND, "Response header"},
|
|
|
169 |
{MHD_HEADER_KIND, "HTTP header"},
|
|
|
170 |
{MHD_COOKIE_KIND, "Cookies"},
|
|
|
171 |
{MHD_POSTDATA_KIND, "POST data"},
|
|
|
172 |
{MHD_GET_ARGUMENT_KIND, "GET (URI) arguments"},
|
|
|
173 |
{MHD_FOOTER_KIND, "HTTP footer"},
|
|
|
174 |
};
|
|
|
175 |
|
|
|
176 |
static int print_out_key (void *cls, enum MHD_ValueKind kind,
|
|
|
177 |
const char *key, const char *value)
|
|
|
178 |
{
|
|
|
179 |
LOGDEB(valToString(valueKind, kind) << ": " << key << " -> " <<
|
|
|
180 |
value << endl);
|
|
|
181 |
return MHD_YES;
|
|
|
182 |
}
|
|
|
183 |
#endif /* PRINT_KEYS */
|
|
|
184 |
|
|
|
185 |
static int mapvalues_cb(void *cls, enum MHD_ValueKind kind,
|
|
|
186 |
const char *key, const char *value)
|
|
|
187 |
{
|
|
|
188 |
unordered_map<string,string> *mp = (unordered_map<string,string> *)cls;
|
|
|
189 |
if (mp) {
|
|
|
190 |
(*mp)[key] = value;
|
|
|
191 |
}
|
|
|
192 |
return MHD_YES;
|
|
|
193 |
}
|
|
|
194 |
|
|
|
195 |
// Parse range header.
|
|
|
196 |
static bool parseRanges(
|
|
|
197 |
const string& ranges, vector<pair<int64_t, int64_t>>& oranges)
|
|
|
198 |
{
|
|
|
199 |
oranges.clear();
|
|
|
200 |
string::size_type pos = ranges.find("bytes=");
|
|
|
201 |
if (pos == string::npos) {
|
|
|
202 |
return false;
|
|
|
203 |
}
|
|
|
204 |
pos += 6;
|
|
|
205 |
bool done = false;
|
|
|
206 |
while(!done) {
|
|
|
207 |
string::size_type dash = ranges.find('-', pos);
|
|
|
208 |
if (dash == string::npos) {
|
|
|
209 |
return false;
|
|
|
210 |
}
|
|
|
211 |
string::size_type comma = ranges.find(',', pos);
|
|
|
212 |
string firstPart = ranges.substr(pos, dash-pos);
|
|
|
213 |
int64_t start = firstPart.empty() ? 0 : atoll(firstPart.c_str());
|
|
|
214 |
string secondPart = ranges.substr(dash+1, comma != string::npos ?
|
|
|
215 |
comma-dash-1 : string::npos);
|
|
|
216 |
int64_t fin = secondPart.empty() ? -1 : atoll(secondPart.c_str());
|
|
|
217 |
pair<int64_t, int64_t> nrange(start,fin);
|
|
|
218 |
oranges.push_back(nrange);
|
|
|
219 |
if (comma != string::npos) {
|
|
|
220 |
pos = comma + 1;
|
|
|
221 |
}
|
|
|
222 |
done = comma == string::npos;
|
|
|
223 |
}
|
|
|
224 |
return true;
|
|
|
225 |
}
|
|
|
226 |
|
|
|
227 |
static bool processRange(struct MHD_Connection *mhdconn, uint64_t& offset)
|
|
|
228 |
{
|
|
|
229 |
const char* rangeh =
|
|
|
230 |
MHD_lookup_connection_value(mhdconn, MHD_HEADER_KIND, "range");
|
|
|
231 |
vector<pair<int64_t, int64_t> > ranges;
|
|
|
232 |
if (rangeh && parseRanges(rangeh, ranges) && ranges.size()) {
|
|
|
233 |
if (ranges[0].second != -1 || ranges.size() > 1) {
|
|
|
234 |
LOGERR("AProxy::mhdAnswerConn: unsupported range: " <<
|
|
|
235 |
rangeh << "\n");
|
|
|
236 |
struct MHD_Response *response =
|
|
|
237 |
MHD_create_response_from_buffer(0,0,MHD_RESPMEM_PERSISTENT);
|
|
|
238 |
MHD_queue_response(mhdconn, 416, response);
|
|
|
239 |
MHD_destroy_response(response);
|
|
|
240 |
return false;
|
|
|
241 |
} else {
|
|
|
242 |
offset = (uint64_t)ranges[0].first;
|
|
|
243 |
LOGDEB("AProxy::mhdAnswerConn: accept xx- range, offset "
|
|
|
244 |
<< offset << endl);
|
|
|
245 |
}
|
|
|
246 |
}
|
|
|
247 |
return true;
|
|
|
248 |
}
|
|
|
249 |
|
|
|
250 |
int StreamProxy::Internal::answerConn(
|
|
|
251 |
struct MHD_Connection *mhdconn, const char *_url,
|
|
|
252 |
const char *method, const char *version,
|
|
|
253 |
const char *upload_data, size_t *upload_data_size,
|
|
|
254 |
void **con_cls)
|
|
|
255 |
{
|
|
|
256 |
int curlcode; long httpcode;
|
|
|
257 |
|
|
|
258 |
if (nullptr == *con_cls) {
|
|
|
259 |
uint64_t offset = 0;
|
|
|
260 |
// First call, look at headers, method etc.
|
|
|
261 |
#ifdef PRINT_KEYS
|
|
|
262 |
MHD_get_connection_values(mhdconn,MHD_HEADER_KIND,&print_out_key,0);
|
|
|
263 |
#endif
|
|
|
264 |
if (strcmp("GET", method) && strcmp("HEAD", method)) {
|
|
|
265 |
LOGERR("StreamProxy::answerConn: method is not GET or HEAD\n");
|
|
|
266 |
return MHD_NO;
|
|
|
267 |
}
|
|
|
268 |
if (!processRange(mhdconn, offset)) {
|
|
|
269 |
return MHD_NO;
|
|
|
270 |
}
|
|
|
271 |
|
|
|
272 |
// Compute destination url
|
|
|
273 |
unordered_map<string,string>querydata;
|
|
|
274 |
MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
|
|
|
275 |
&mapvalues_cb, &querydata);
|
|
|
276 |
|
|
|
277 |
string url(_url);
|
|
|
278 |
UrlTransReturn ret = urltrans(url, querydata);
|
|
|
279 |
if (ret == Error) {
|
|
|
280 |
return MHD_NO;
|
|
|
281 |
} else if (ret == Redirect) {
|
|
|
282 |
struct MHD_Response *response =
|
|
|
283 |
MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
|
|
|
284 |
if (nullptr == response ) {
|
|
|
285 |
LOGERR("StreamProxy::answerConn: can't create redirect\n");
|
|
|
286 |
return MHD_NO;
|
|
|
287 |
}
|
|
|
288 |
MHD_add_response_header (response, "Location", url.c_str());
|
|
|
289 |
int ret = MHD_queue_response(mhdconn, 302, response);
|
|
|
290 |
MHD_destroy_response(response);
|
|
|
291 |
return ret;
|
|
|
292 |
}
|
|
|
293 |
|
|
|
294 |
// Create/Start the curl transaction, and wait for the headers.
|
|
|
295 |
LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
|
|
|
296 |
auto reader = new ContentReader(new CurlFetch(url));
|
|
|
297 |
reader->fetcher->start(&reader->queue, offset);
|
|
|
298 |
*con_cls = reader;
|
|
|
299 |
return MHD_YES;
|
|
|
300 |
// End first call
|
|
|
301 |
}
|
|
|
302 |
|
|
|
303 |
// Second call for this request. We know that the curl request has
|
|
|
304 |
// proceeded past the headers (else, we would have failed during
|
|
|
305 |
// the first call). Check for a curl error or http 404 or similar
|
|
|
306 |
ContentReader *reader = (ContentReader*)*con_cls;
|
|
|
307 |
|
|
|
308 |
if (!reader->fetcher->waitForHeaders()) {
|
|
|
309 |
LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
|
|
|
310 |
reader->fetcher->curlDone(&curlcode, &httpcode);
|
|
|
311 |
int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
|
|
312 |
struct MHD_Response *response =
|
|
|
313 |
MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
|
|
|
314 |
int ret = MHD_queue_response(mhdconn, code, response);
|
|
|
315 |
MHD_destroy_response(response);
|
|
|
316 |
LOGINF("StreamProxy::answerConn (1): return with http code: " <<
|
|
|
317 |
code << endl);
|
|
|
318 |
return ret;
|
|
|
319 |
}
|
|
|
320 |
|
|
|
321 |
string cl;
|
|
|
322 |
uint64_t size = MHD_SIZE_UNKNOWN;
|
|
|
323 |
if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
|
|
|
324 |
LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
|
|
|
325 |
size = (uint64_t)atoll(cl.c_str());
|
|
|
326 |
}
|
|
|
327 |
|
|
|
328 |
// Build a data response.
|
|
|
329 |
// the block size seems to be flatly ignored by libmicrohttpd
|
|
|
330 |
// Any random value would probably work the same
|
|
|
331 |
struct MHD_Response *response =
|
|
|
332 |
MHD_create_response_from_callback(size, 4096,
|
|
|
333 |
content_reader_cb, reader, nullptr);
|
|
|
334 |
if (response == NULL) {
|
|
|
335 |
LOGERR("mhdAnswerConn: answer: could not create response" << endl);
|
|
|
336 |
return MHD_NO;
|
|
|
337 |
}
|
|
|
338 |
|
|
|
339 |
MHD_add_response_header (response, "Accept-Ranges", "bytes");
|
|
|
340 |
if (!cl.empty()) {
|
|
|
341 |
MHD_add_response_header(response, "Content-Length", cl.c_str());
|
|
|
342 |
}
|
|
|
343 |
string ct;
|
|
|
344 |
if (reader->fetcher->headerValue("content-type", ct) && !ct.empty()) {
|
|
|
345 |
LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
|
|
|
346 |
MHD_add_response_header(response, "Content-Type", ct.c_str());
|
|
|
347 |
}
|
|
|
348 |
|
|
|
349 |
int code = MHD_HTTP_OK;
|
|
|
350 |
if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
|
|
|
351 |
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
|
|
352 |
}
|
|
|
353 |
int ret = MHD_queue_response(mhdconn, code, response);
|
|
|
354 |
MHD_destroy_response(response);
|
|
|
355 |
return ret;
|
|
|
356 |
}
|
|
|
357 |
|
|
|
358 |
static void
|
|
|
359 |
request_completed_callback(
|
|
|
360 |
void *cls, struct MHD_Connection *conn,
|
|
|
361 |
void **con_cls, enum MHD_RequestTerminationCode toe)
|
|
|
362 |
{
|
|
|
363 |
// We get this even if the answer callback returned MHD_NO
|
|
|
364 |
// (e.g. for a second connection). check con_cls and do nothing if
|
|
|
365 |
// it's not set.
|
|
|
366 |
if (cls && *con_cls) {
|
|
|
367 |
StreamProxy::Internal *internal =
|
|
|
368 |
static_cast<StreamProxy::Internal*>(cls);
|
|
|
369 |
return internal->requestCompleted(conn, con_cls, toe);
|
|
|
370 |
}
|
|
|
371 |
}
|
|
|
372 |
|
|
|
373 |
static vector<CharFlags> completionStatus {
|
|
|
374 |
{MHD_REQUEST_TERMINATED_COMPLETED_OK,
|
|
|
375 |
"MHD_REQUEST_TERMINATED_COMPLETED_OK", ""},
|
|
|
376 |
{MHD_REQUEST_TERMINATED_WITH_ERROR,
|
|
|
377 |
"MHD_REQUEST_TERMINATED_WITH_ERROR", ""},
|
|
|
378 |
{MHD_REQUEST_TERMINATED_TIMEOUT_REACHED,
|
|
|
379 |
"MHD_REQUEST_TERMINATED_TIMEOUT_REACHED", ""},
|
|
|
380 |
{MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN,
|
|
|
381 |
"MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN", ""},
|
|
|
382 |
{MHD_REQUEST_TERMINATED_READ_ERROR,
|
|
|
383 |
"MHD_REQUEST_TERMINATED_READ_ERROR", ""},
|
|
|
384 |
{MHD_REQUEST_TERMINATED_CLIENT_ABORT,
|
|
|
385 |
"MHD_REQUEST_TERMINATED_CLIENT_ABORT", ""},
|
|
|
386 |
};
|
|
|
387 |
|
|
|
388 |
void StreamProxy::Internal::requestCompleted(
|
|
|
389 |
struct MHD_Connection *conn,
|
|
|
390 |
void **con_cls, enum MHD_RequestTerminationCode toe)
|
|
|
391 |
{
|
|
|
392 |
LOGDEB("StreamProxy::requestCompleted: status " <<
|
|
|
393 |
valToString(completionStatus, toe) << endl);
|
|
|
394 |
if (*con_cls) {
|
|
|
395 |
ContentReader *reader = static_cast<ContentReader*>(*con_cls);
|
|
|
396 |
delete reader;
|
|
|
397 |
}
|
|
|
398 |
}
|
|
|
399 |
|
|
|
400 |
bool StreamProxy::Internal::startMHD()
|
|
|
401 |
{
|
|
|
402 |
mhd = MHD_start_daemon(
|
|
|
403 |
MHD_USE_THREAD_PER_CONNECTION|MHD_USE_SELECT_INTERNALLY|MHD_USE_DEBUG,
|
|
|
404 |
listenport,
|
|
|
405 |
/* Accept policy callback and arg */
|
|
|
406 |
nullptr, nullptr,
|
|
|
407 |
/* handler and arg */
|
|
|
408 |
&answer_to_connection, this,
|
|
|
409 |
MHD_OPTION_NOTIFY_COMPLETED, request_completed_callback, this,
|
|
|
410 |
MHD_OPTION_END);
|
|
|
411 |
|
|
|
412 |
if (nullptr == mhd) {
|
|
|
413 |
LOGERR("Aproxy: MHD_start_daemon failed\n");
|
|
|
414 |
return false;
|
|
|
415 |
}
|
|
|
416 |
|
|
|
417 |
return true;
|
|
|
418 |
}
|