|
a/src/mediaserver/cdplugins/streamproxy.cpp |
|
b/src/mediaserver/cdplugins/streamproxy.cpp |
|
... |
|
... |
14 |
* Free Software Foundation, Inc.,
|
14 |
* Free Software Foundation, Inc.,
|
15 |
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
15 |
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
16 |
*/
|
16 |
*/
|
17 |
|
17 |
|
18 |
#include "streamproxy.h"
|
18 |
#include "streamproxy.h"
|
|
|
19 |
#include "netfetch.h"
|
19 |
#include "curlfetch.h"
|
20 |
#include "curlfetch.h"
|
20 |
#include "log.h"
|
21 |
#include "log.h"
|
21 |
#include "smallut.h"
|
22 |
#include "smallut.h"
|
22 |
#include "chrono.h"
|
23 |
#include "chrono.h"
|
23 |
|
24 |
|
24 |
#include <fcntl.h>
|
25 |
#include <fcntl.h>
|
25 |
#include <microhttpd.h>
|
26 |
#include <microhttpd.h>
|
26 |
#include <curl/curl.h>
|
|
|
27 |
|
27 |
|
28 |
#include <mutex>
|
28 |
#include <mutex>
|
29 |
#include <condition_variable>
|
29 |
#include <condition_variable>
|
30 |
#include <unordered_map>
|
30 |
#include <unordered_map>
|
31 |
|
31 |
|
32 |
using namespace std;
|
32 |
using namespace std;
|
33 |
|
33 |
|
34 |
class ContentReader {
|
34 |
class ContentReader {
|
35 |
public:
|
35 |
public:
|
36 |
ContentReader(CurlFetch *fetcher, int cfd);
|
36 |
ContentReader(NetFetch *fetcher, int cfd);
|
37 |
~ContentReader() {
|
37 |
~ContentReader() {
|
38 |
LOGDEB1("ContentReader::~ContentReader\n");
|
38 |
LOGDEB1("ContentReader::~ContentReader\n");
|
39 |
// This should not be neccessary but see comments in
|
39 |
// This should not be necessary but see comments in
|
40 |
// tstcurlfetch code
|
40 |
// tstcurlfetch code
|
41 |
fetcher = std::unique_ptr<CurlFetch>();
|
41 |
fetcher = std::unique_ptr<NetFetch>();
|
42 |
}
|
42 |
}
|
43 |
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
|
43 |
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
|
44 |
|
44 |
|
45 |
std::unique_ptr<CurlFetch> fetcher;
|
45 |
std::unique_ptr<NetFetch> fetcher;
|
46 |
BufXChange<ABuffer*> queue{"crqueue"};
|
46 |
BufXChange<ABuffer*> queue{"crqueue"};
|
47 |
bool normalEOS{false};
|
47 |
bool normalEOS{false};
|
48 |
// Used for experimentations in killing the connection
|
48 |
// Used for experimentations in killing the connection
|
49 |
int connfd{-1};
|
49 |
int connfd{-1};
|
50 |
int killafterms{-1};
|
50 |
int killafterms{-1};
|
51 |
Chrono chron;
|
51 |
Chrono chron;
|
52 |
};
|
52 |
};
|
53 |
|
53 |
|
54 |
ContentReader::ContentReader(CurlFetch *f, int cfd)
|
54 |
ContentReader::ContentReader(NetFetch *f, int cfd)
|
55 |
: fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
|
55 |
: fetcher(std::unique_ptr<NetFetch>(f)), connfd(cfd)
|
56 |
{
|
56 |
{
|
57 |
}
|
57 |
}
|
58 |
|
58 |
|
59 |
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
|
59 |
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
|
60 |
{
|
60 |
{
|
|
... |
|
... |
65 |
}
|
65 |
}
|
66 |
size_t totcnt = 0;
|
66 |
size_t totcnt = 0;
|
67 |
ABuffer *abuf;
|
67 |
ABuffer *abuf;
|
68 |
while (totcnt < max) {
|
68 |
while (totcnt < max) {
|
69 |
if (!queue.take(&abuf)) {
|
69 |
if (!queue.take(&abuf)) {
|
|
|
70 |
NetFetch::FetchStatus code;
|
70 |
int curlcode{0}, httpcode{0};
|
71 |
int httpcode;
|
71 |
fetcher->curlDone(&curlcode, &httpcode);
|
72 |
fetcher->fetchDone(&code, &httpcode);
|
72 |
LOGDEB("Reader: queue take failed curlcode " << curlcode <<
|
73 |
LOGDEB("Reader: queue take failed code " << code <<
|
73 |
" httpcode " << httpcode << endl);
|
74 |
" httpcode " << httpcode << endl);
|
74 |
if (curlcode == CURLE_PARTIAL_FILE) {
|
75 |
if (code == NetFetch::FETCH_RETRYABLE) {
|
75 |
LOGINF("Reader: retrying at " << pos+totcnt << endl);
|
76 |
LOGINF("Reader: retrying at " << pos+totcnt << endl);
|
76 |
fetcher->reset();
|
77 |
fetcher->reset();
|
77 |
fetcher->start(&queue, pos+totcnt);
|
78 |
fetcher->start(&queue, pos+totcnt);
|
78 |
return 0;
|
79 |
return 0;
|
79 |
}
|
80 |
}
|
|
... |
|
... |
287 |
const char *method, const char *version,
|
288 |
const char *method, const char *version,
|
288 |
const char *upload_data, size_t *upload_data_size,
|
289 |
const char *upload_data, size_t *upload_data_size,
|
289 |
void **con_cls)
|
290 |
void **con_cls)
|
290 |
{
|
291 |
{
|
291 |
LOGDEB1("answerConn con_cls " << *con_cls << "\n");
|
292 |
LOGDEB1("answerConn con_cls " << *con_cls << "\n");
|
|
|
293 |
NetFetch::FetchStatus fetchcode;
|
292 |
int curlcode, httpcode;
|
294 |
int httpcode;
|
293 |
|
295 |
|
294 |
if (nullptr == *con_cls) {
|
296 |
if (nullptr == *con_cls) {
|
295 |
uint64_t offset = 0;
|
297 |
uint64_t offset = 0;
|
296 |
// First call, look at headers, method etc.
|
298 |
// First call, look at headers, method etc.
|
297 |
#ifdef PRINT_KEYS
|
299 |
#ifdef PRINT_KEYS
|
|
... |
|
... |
325 |
int ret = MHD_queue_response(mhdconn, 302, response);
|
327 |
int ret = MHD_queue_response(mhdconn, 302, response);
|
326 |
MHD_destroy_response(response);
|
328 |
MHD_destroy_response(response);
|
327 |
return ret;
|
329 |
return ret;
|
328 |
}
|
330 |
}
|
329 |
|
331 |
|
330 |
// Create/Start the curl transaction, and wait for the headers.
|
332 |
// Create/Start the fetch transaction, and wait for the headers.
|
331 |
LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
|
333 |
LOGDEB0("StreamProxy::answerConn: starting fetch for " << url << endl);
|
332 |
int cfd{-1};
|
334 |
int cfd{-1};
|
333 |
const union MHD_ConnectionInfo *cinf =
|
335 |
const union MHD_ConnectionInfo *cinf =
|
334 |
MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
|
336 |
MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
|
335 |
if (nullptr == cinf) {
|
337 |
if (nullptr == cinf) {
|
336 |
LOGERR("StreamProxy::answerConn: can't get connection fd\n");
|
338 |
LOGERR("StreamProxy::answerConn: can't get connection fd\n");
|
|
... |
|
... |
347 |
LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
|
349 |
LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
|
348 |
return MHD_YES;
|
350 |
return MHD_YES;
|
349 |
// End first call
|
351 |
// End first call
|
350 |
}
|
352 |
}
|
351 |
|
353 |
|
352 |
// Second call for this request. We know that the curl request has
|
354 |
// Second call for this request. We know that the fetch request has
|
353 |
// proceeded past the headers (else, we would have failed during
|
355 |
// proceeded past the headers (else, we would have failed during
|
354 |
// the first call). Check for a curl error or http 404 or similar
|
356 |
// the first call). Check for an error or http 404 or similar
|
355 |
ContentReader *reader = (ContentReader*)*con_cls;
|
357 |
ContentReader *reader = (ContentReader*)*con_cls;
|
356 |
|
358 |
|
357 |
if (!reader->fetcher->waitForHeaders()) {
|
359 |
if (!reader->fetcher->waitForHeaders()) {
|
358 |
LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
|
360 |
LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
|
359 |
reader->fetcher->curlDone(&curlcode, &httpcode);
|
361 |
reader->fetcher->fetchDone(&fetchcode, &httpcode);
|
360 |
int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
362 |
int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
361 |
struct MHD_Response *response =
|
363 |
struct MHD_Response *response =
|
362 |
MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
|
364 |
MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
|
363 |
int ret = MHD_queue_response(mhdconn, code, response);
|
365 |
int ret = MHD_queue_response(mhdconn, code, response);
|
364 |
MHD_destroy_response(response);
|
366 |
MHD_destroy_response(response);
|
|
... |
|
... |
394 |
LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
|
396 |
LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
|
395 |
MHD_add_response_header(response, "Content-Type", ct.c_str());
|
397 |
MHD_add_response_header(response, "Content-Type", ct.c_str());
|
396 |
}
|
398 |
}
|
397 |
|
399 |
|
398 |
int code = MHD_HTTP_OK;
|
400 |
int code = MHD_HTTP_OK;
|
399 |
LOGDEB1("StreamProxy::answerConn: calling curldone\n");
|
401 |
LOGDEB1("StreamProxy::answerConn: calling fetchDone\n");
|
400 |
if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
|
402 |
if (reader->fetcher->fetchDone(&fetchcode, &httpcode)) {
|
401 |
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
403 |
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
|
402 |
}
|
404 |
}
|
403 |
int ret = MHD_queue_response(mhdconn, code, response);
|
405 |
int ret = MHD_queue_response(mhdconn, code, response);
|
404 |
MHD_destroy_response(response);
|
406 |
MHD_destroy_response(response);
|
405 |
return ret;
|
407 |
return ret;
|