--- a/src/mediaserver/cdplugins/streamproxy.cpp
+++ b/src/mediaserver/cdplugins/streamproxy.cpp
@@ -19,8 +19,11 @@
#include "curlfetch.h"
#include "log.h"
#include "smallut.h"
-
+#include "chrono.h"
+
+#include <fcntl.h>
#include <microhttpd.h>
+#include <curl/curl.h>
#include <mutex>
#include <condition_variable>
@@ -30,7 +33,7 @@
class ContentReader {
public:
- ContentReader(CurlFetch *fetcher);
+ ContentReader(CurlFetch *fetcher, int cfd);
~ContentReader() {
LOGDEB1("ContentReader::~ContentReader\n");
// This should not be neccessary but see comments in
@@ -42,10 +45,14 @@
std::unique_ptr<CurlFetch> fetcher;
BufXChange<ABuffer*> queue{"crqueue"};
bool normalEOS{false};
+ // Used for experimentations in killing the connection
+ int connfd{-1};
+ int killafterms{-1};
+ Chrono chron;
};
-ContentReader::ContentReader(CurlFetch *f)
- : fetcher(std::unique_ptr<CurlFetch>(f))
+ContentReader::ContentReader(CurlFetch *f, int cfd)
+ : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
{
}
@@ -60,6 +67,16 @@
ABuffer *abuf;
while (totcnt < max) {
if (!queue.take(&abuf)) {
+ int curlcode{0}, httpcode{0};
+ fetcher->curlDone(&curlcode, &httpcode);
+ LOGDEB("Reader: queue take failed curlcode " << curlcode <<
+ " httpcode " << httpcode << endl);
+ if (curlcode == CURLE_PARTIAL_FILE) {
+ LOGINF("Reader: retrying at " << pos+totcnt << endl);
+ fetcher->reset();
+ fetcher->start(&queue, pos+totcnt);
+ return 0;
+ }
LOGDEB("ContentReader::contentRead: return ERROR\n");
return MHD_CONTENT_READER_END_WITH_ERROR;
}
@@ -85,6 +102,17 @@
queue.untake(abuf);
}
}
+ if (killafterms > 0 && connfd >= 0) {
+ if (chron.millis() > killafterms) {
+ int fd = open("/dev/null", 0);
+ if (fd < 0) {
+ abort();
+ }
+ dup2(fd, connfd);
+ close(fd);
+ connfd = -1;
+ }
+ }
LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
return totcnt;
}
@@ -119,6 +147,7 @@
int listenport{-1};
UrlTransFunc urltrans;
struct MHD_Daemon *mhd{nullptr};
+ int killafterms{-1};
};
@@ -127,9 +156,15 @@
{
}
+void StreamProxy::setKillAfterMs(int ms)
+{
+ m->killafterms = ms;
+}
+
StreamProxy::~StreamProxy()
{
}
+
StreamProxy::Internal::~Internal()
{
LOGDEB("StreamProxy::Internal::~Internal()\n");
@@ -162,7 +197,7 @@
}
}
-#define PRINT_KEYS
+#undef PRINT_KEYS
#ifdef PRINT_KEYS
static vector<CharFlags> valueKind {
{MHD_RESPONSE_HEADER_KIND, "Response header"},
@@ -253,7 +288,8 @@
const char *upload_data, size_t *upload_data_size,
void **con_cls)
{
- int curlcode; long httpcode;
+ LOGDEB1("answerConn con_cls " << *con_cls << "\n");
+ int curlcode, httpcode;
if (nullptr == *con_cls) {
uint64_t offset = 0;
@@ -292,10 +328,23 @@
}
// Create/Start the curl transaction, and wait for the headers.
- LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
- auto reader = new ContentReader(new CurlFetch(url));
+ LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
+ int cfd{-1};
+ const union MHD_ConnectionInfo *cinf =
+ MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
+ if (nullptr == cinf) {
+ LOGERR("StreamProxy::answerConn: can't get connection fd\n");
+ } else {
+ cfd = cinf->connect_fd;
+ }
+ auto reader = new ContentReader(new CurlFetch(url), cfd);
+ if (killafterms > 0) {
+ reader->killafterms = killafterms;
+ killafterms = -1;
+ }
reader->fetcher->start(&reader->queue, offset);
*con_cls = reader;
+ LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
return MHD_YES;
// End first call
}
@@ -317,6 +366,7 @@
code << endl);
return ret;
}
+ LOGDEB1("StreamProxy::answerConn: waitForHeaders done\n");
string cl;
uint64_t size = MHD_SIZE_UNKNOWN;
@@ -324,7 +374,6 @@
LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
size = (uint64_t)atoll(cl.c_str());
}
-
// Build a data response.
// the block size seems to be flatly ignored by libmicrohttpd
// Any random value would probably work the same
@@ -347,6 +396,7 @@
}
int code = MHD_HTTP_OK;
+ LOGDEB1("StreamProxy::answerConn: calling curldone\n");
if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
}