|
a/src/mediaserver/cdplugins/streamproxy.cpp |
|
b/src/mediaserver/cdplugins/streamproxy.cpp |
|
... |
|
... |
17 |
|
17 |
|
18 |
#include "streamproxy.h"
|
18 |
#include "streamproxy.h"
|
19 |
#include "curlfetch.h"
|
19 |
#include "curlfetch.h"
|
20 |
#include "log.h"
|
20 |
#include "log.h"
|
21 |
#include "smallut.h"
|
21 |
#include "smallut.h"
|
|
|
22 |
#include "chrono.h"
|
22 |
|
23 |
|
|
|
24 |
#include <fcntl.h>
|
23 |
#include <microhttpd.h>
|
25 |
#include <microhttpd.h>
|
|
|
26 |
#include <curl/curl.h>
|
24 |
|
27 |
|
25 |
#include <mutex>
|
28 |
#include <mutex>
|
26 |
#include <condition_variable>
|
29 |
#include <condition_variable>
|
27 |
#include <unordered_map>
|
30 |
#include <unordered_map>
|
28 |
|
31 |
|
29 |
using namespace std;
|
32 |
using namespace std;
|
30 |
|
33 |
|
31 |
class ContentReader {
|
34 |
class ContentReader {
|
32 |
public:
|
35 |
public:
|
33 |
ContentReader(CurlFetch *fetcher);
|
36 |
ContentReader(CurlFetch *fetcher, int cfd);
|
34 |
~ContentReader() {
|
37 |
~ContentReader() {
|
35 |
LOGDEB1("ContentReader::~ContentReader\n");
|
38 |
LOGDEB1("ContentReader::~ContentReader\n");
|
36 |
// This should not be neccessary but see comments in
|
39 |
// This should not be neccessary but see comments in
|
37 |
// tstcurlfetch code
|
40 |
// tstcurlfetch code
|
38 |
fetcher = std::unique_ptr<CurlFetch>();
|
41 |
fetcher = std::unique_ptr<CurlFetch>();
|
|
... |
|
... |
40 |
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
|
43 |
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
|
41 |
|
44 |
|
42 |
std::unique_ptr<CurlFetch> fetcher;
|
45 |
std::unique_ptr<CurlFetch> fetcher;
|
43 |
BufXChange<ABuffer*> queue{"crqueue"};
|
46 |
BufXChange<ABuffer*> queue{"crqueue"};
|
44 |
bool normalEOS{false};
|
47 |
bool normalEOS{false};
|
|
|
48 |
// Used for experimentations in killing the connection
|
|
|
49 |
int connfd{-1};
|
|
|
50 |
int killafterms{-1};
|
|
|
51 |
Chrono chron;
|
45 |
};
|
52 |
};
|
46 |
|
53 |
|
47 |
ContentReader::ContentReader(CurlFetch *f)
|
54 |
ContentReader::ContentReader(CurlFetch *f, int cfd)
|
48 |
: fetcher(std::unique_ptr<CurlFetch>(f))
|
55 |
: fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
|
49 |
{
|
56 |
{
|
50 |
}
|
57 |
}
|
51 |
|
58 |
|
52 |
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
|
59 |
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
|
53 |
{
|
60 |
{
|
|
... |
|
... |
58 |
}
|
65 |
}
|
59 |
size_t totcnt = 0;
|
66 |
size_t totcnt = 0;
|
60 |
ABuffer *abuf;
|
67 |
ABuffer *abuf;
|
61 |
while (totcnt < max) {
|
68 |
while (totcnt < max) {
|
62 |
if (!queue.take(&abuf)) {
|
69 |
if (!queue.take(&abuf)) {
|
|
|
70 |
int curlcode{0}, httpcode{0};
|
|
|
71 |
fetcher->curlDone(&curlcode, &httpcode);
|
|
|
72 |
LOGDEB("Reader: queue take failed curlcode " << curlcode <<
|
|
|
73 |
" httpcode " << httpcode << endl);
|
|
|
74 |
if (curlcode == CURLE_PARTIAL_FILE) {
|
|
|
75 |
LOGINF("Reader: retrying at " << pos+totcnt << endl);
|
|
|
76 |
fetcher->reset();
|
|
|
77 |
fetcher->start(&queue, pos+totcnt);
|
|
|
78 |
return 0;
|
|
|
79 |
}
|
63 |
LOGDEB("ContentReader::contentRead: return ERROR\n");
|
80 |
LOGDEB("ContentReader::contentRead: return ERROR\n");
|
64 |
return MHD_CONTENT_READER_END_WITH_ERROR;
|
81 |
return MHD_CONTENT_READER_END_WITH_ERROR;
|
65 |
}
|
82 |
}
|
66 |
LOGDEB1("ContentReader::contentRead: got buffer with " <<
|
83 |
LOGDEB1("ContentReader::contentRead: got buffer with " <<
|
67 |
abuf->bytes << " bytes\n");
|
84 |
abuf->bytes << " bytes\n");
|
|
... |
|
... |
83 |
queue.recycle(abuf);
|
100 |
queue.recycle(abuf);
|
84 |
} else {
|
101 |
} else {
|
85 |
queue.untake(abuf);
|
102 |
queue.untake(abuf);
|
86 |
}
|
103 |
}
|
87 |
}
|
104 |
}
|
|
|
105 |
if (killafterms > 0 && connfd >= 0) {
|
|
|
106 |
if (chron.millis() > killafterms) {
|
|
|
107 |
int fd = open("/dev/null", 0);
|
|
|
108 |
if (fd < 0) {
|
|
|
109 |
abort();
|
|
|
110 |
}
|
|
|
111 |
dup2(fd, connfd);
|
|
|
112 |
close(fd);
|
|
|
113 |
connfd = -1;
|
|
|
114 |
}
|
|
|
115 |
}
|
88 |
LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
|
116 |
LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
|
89 |
return totcnt;
|
117 |
return totcnt;
|
90 |
}
|
118 |
}
|
91 |
|
119 |
|
92 |
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
|
120 |
static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
|
|
... |
|
... |
117 |
void **con_cls, enum MHD_RequestTerminationCode toe);
|
145 |
void **con_cls, enum MHD_RequestTerminationCode toe);
|
118 |
|
146 |
|
119 |
int listenport{-1};
|
147 |
int listenport{-1};
|
120 |
UrlTransFunc urltrans;
|
148 |
UrlTransFunc urltrans;
|
121 |
struct MHD_Daemon *mhd{nullptr};
|
149 |
struct MHD_Daemon *mhd{nullptr};
|
|
|
150 |
int killafterms{-1};
|
122 |
};
|
151 |
};
|
123 |
|
152 |
|
124 |
|
153 |
|
125 |
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
|
154 |
StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
|
126 |
: m(new Internal(listenport, urltrans))
|
155 |
: m(new Internal(listenport, urltrans))
|
127 |
{
|
156 |
{
|
128 |
}
|
157 |
}
|
129 |
|
158 |
|
|
|
159 |
void StreamProxy::setKillAfterMs(int ms)
|
|
|
160 |
{
|
|
|
161 |
m->killafterms = ms;
|
|
|
162 |
}
|
|
|
163 |
|
130 |
StreamProxy::~StreamProxy()
|
164 |
StreamProxy::~StreamProxy()
|
131 |
{
|
165 |
{
|
132 |
}
|
166 |
}
|
|
|
167 |
|
133 |
StreamProxy::Internal::~Internal()
|
168 |
StreamProxy::Internal::~Internal()
|
134 |
{
|
169 |
{
|
135 |
LOGDEB("StreamProxy::Internal::~Internal()\n");
|
170 |
LOGDEB("StreamProxy::Internal::~Internal()\n");
|
136 |
if (mhd) {
|
171 |
if (mhd) {
|
137 |
MHD_stop_daemon(mhd);
|
172 |
MHD_stop_daemon(mhd);
|
|
... |
|
... |
160 |
} else {
|
195 |
} else {
|
161 |
return -1;
|
196 |
return -1;
|
162 |
}
|
197 |
}
|
163 |
}
|
198 |
}
|
164 |
|
199 |
|
165 |
#define PRINT_KEYS
|
200 |
#undef PRINT_KEYS
|
166 |
#ifdef PRINT_KEYS
|
201 |
#ifdef PRINT_KEYS
|
167 |
static vector<CharFlags> valueKind {
|
202 |
static vector<CharFlags> valueKind {
|
168 |
{MHD_RESPONSE_HEADER_KIND, "Response header"},
|
203 |
{MHD_RESPONSE_HEADER_KIND, "Response header"},
|
169 |
{MHD_HEADER_KIND, "HTTP header"},
|
204 |
{MHD_HEADER_KIND, "HTTP header"},
|
170 |
{MHD_COOKIE_KIND, "Cookies"},
|
205 |
{MHD_COOKIE_KIND, "Cookies"},
|
|
... |
|
... |
251 |
struct MHD_Connection *mhdconn, const char *_url,
|
286 |
struct MHD_Connection *mhdconn, const char *_url,
|
252 |
const char *method, const char *version,
|
287 |
const char *method, const char *version,
|
253 |
const char *upload_data, size_t *upload_data_size,
|
288 |
const char *upload_data, size_t *upload_data_size,
|
254 |
void **con_cls)
|
289 |
void **con_cls)
|
255 |
{
|
290 |
{
|
|
|
291 |
LOGDEB1("answerConn con_cls " << *con_cls << "\n");
|
256 |
int curlcode; long httpcode;
|
292 |
int curlcode, httpcode;
|
257 |
|
293 |
|
258 |
if (nullptr == *con_cls) {
|
294 |
if (nullptr == *con_cls) {
|
259 |
uint64_t offset = 0;
|
295 |
uint64_t offset = 0;
|
260 |
// First call, look at headers, method etc.
|
296 |
// First call, look at headers, method etc.
|
261 |
#ifdef PRINT_KEYS
|
297 |
#ifdef PRINT_KEYS
|
|
... |
|
... |
290 |
MHD_destroy_response(response);
|
326 |
MHD_destroy_response(response);
|
291 |
return ret;
|
327 |
return ret;
|
292 |
}
|
328 |
}
|
293 |
|
329 |
|
294 |
// Create/Start the curl transaction, and wait for the headers.
|
330 |
// Create/Start the curl transaction, and wait for the headers.
|
295 |
LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
|
331 |
LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
|
|
|
332 |
int cfd{-1};
|
|
|
333 |
const union MHD_ConnectionInfo *cinf =
|
|
|
334 |
MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
|
|
|
335 |
if (nullptr == cinf) {
|
|
|
336 |
LOGERR("StreamProxy::answerConn: can't get connection fd\n");
|
|
|
337 |
} else {
|
|
|
338 |
cfd = cinf->connect_fd;
|
|
|
339 |
}
|
296 |
auto reader = new ContentReader(new CurlFetch(url));
|
340 |
auto reader = new ContentReader(new CurlFetch(url), cfd);
|
|
|
341 |
if (killafterms > 0) {
|
|
|
342 |
reader->killafterms = killafterms;
|
|
|
343 |
killafterms = -1;
|
|
|
344 |
}
|
297 |
reader->fetcher->start(&reader->queue, offset);
|
345 |
reader->fetcher->start(&reader->queue, offset);
|
298 |
*con_cls = reader;
|
346 |
*con_cls = reader;
|
|
|
347 |
LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
|
299 |
return MHD_YES;
|
348 |
return MHD_YES;
|
300 |
// End first call
|
349 |
// End first call
|
301 |
}
|
350 |
}
|
302 |
|
351 |
|
303 |
// Second call for this request. We know that the curl request has
|
352 |
// Second call for this request. We know that the curl request has
|
|
... |
|
... |
315 |
MHD_destroy_response(response);
|
364 |
MHD_destroy_response(response);
|
316 |
LOGINF("StreamProxy::answerConn (1): return with http code: " <<
|
365 |
LOGINF("StreamProxy::answerConn (1): return with http code: " <<
|
317 |
code << endl);
|
366 |
code << endl);
|
318 |
return ret;
|
367 |
return ret;
|
319 |
}
|
368 |
}
|
|
|
369 |
LOGDEB1("StreamProxy::answerConn: waitForHeaders done\n");
|
320 |
|
370 |
|
321 |
string cl;
|
371 |
string cl;
|
322 |
uint64_t size = MHD_SIZE_UNKNOWN;
|
372 |
uint64_t size = MHD_SIZE_UNKNOWN;
|
323 |
if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
|
373 |
if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
|
324 |
LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
|
374 |
LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
|
325 |
size = (uint64_t)atoll(cl.c_str());
|
375 |
size = (uint64_t)atoll(cl.c_str());
|
326 |
}
|
376 |
}
|
327 |
|
|
|
328 |
// Build a data response.
|
377 |
// Build a data response.
|
329 |
// the block size seems to be flatly ignored by libmicrohttpd
|
378 |
// the block size seems to be flatly ignored by libmicrohttpd
|
330 |
// Any random value would probably work the same
|
379 |
// Any random value would probably work the same
|
331 |
struct MHD_Response *response =
|
380 |
struct MHD_Response *response =
|
332 |
MHD_create_response_from_callback(size, 4096,
|
381 |
MHD_create_response_from_callback(size, 4096,
|
|
... |
|
... |
345 |
LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
|
394 |
LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
|
346 |
MHD_add_response_header(response, "Content-Type", ct.c_str());
|
395 |
MHD_add_response_header(response, "Content-Type", ct.c_str());
|
347 |
}
|
396 |
}
|
348 |
|
397 |
|
349 |
int code = MHD_HTTP_OK;
|
398 |
int code = MHD_HTTP_OK;
|
|
|
399 |
LOGDEB1("StreamProxy::answerConn: calling curldone\n");
|
350 |
if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
|
400 |
if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
|
351 |
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
401 |
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
352 |
}
|
402 |
}
|
353 |
int ret = MHD_queue_response(mhdconn, code, response);
|
403 |
int ret = MHD_queue_response(mhdconn, code, response);
|
354 |
MHD_destroy_response(response);
|
404 |
MHD_destroy_response(response);
|