--- a/src/mediaserver/cdplugins/streamproxy.cpp
+++ b/src/mediaserver/cdplugins/streamproxy.cpp
@@ -16,14 +16,13 @@
*/
#include "streamproxy.h"
-#include "curlfetch.h"
+#include "netfetch.h"
#include "log.h"
#include "smallut.h"
#include "chrono.h"
#include <fcntl.h>
#include <microhttpd.h>
-#include <curl/curl.h>
#include <mutex>
#include <condition_variable>
@@ -33,16 +32,20 @@
class ContentReader {
public:
- ContentReader(CurlFetch *fetcher, int cfd);
+ ContentReader(std::unique_ptr<NetFetch> ftchr, int cfd)
+ : fetcher(std::move(ftchr)), connfd(cfd) {
+
+ }
+
~ContentReader() {
LOGDEB1("ContentReader::~ContentReader\n");
- // This should not be neccessary but see comments in
+ // This should not be necessary but see comments in
// tstcurlfetch code
- fetcher = std::unique_ptr<CurlFetch>();
+ fetcher = std::unique_ptr<NetFetch>();
}
ssize_t contentRead(uint64_t pos, char *buf, size_t max);
- std::unique_ptr<CurlFetch> fetcher;
+ std::unique_ptr<NetFetch> fetcher;
BufXChange<ABuffer*> queue{"crqueue"};
bool normalEOS{false};
// Used for experimentations in killing the connection
@@ -51,10 +54,6 @@
Chrono chron;
};
-ContentReader::ContentReader(CurlFetch *f, int cfd)
- : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
-{
-}
ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
{
@@ -67,11 +66,12 @@
ABuffer *abuf;
while (totcnt < max) {
if (!queue.take(&abuf)) {
- int curlcode{0}, httpcode{0};
- fetcher->curlDone(&curlcode, &httpcode);
- LOGDEB("Reader: queue take failed curlcode " << curlcode <<
+ NetFetch::FetchStatus code;
+ int httpcode;
+ fetcher->fetchDone(&code, &httpcode);
+ LOGDEB("Reader: queue take failed code " << code <<
" httpcode " << httpcode << endl);
- if (curlcode == CURLE_PARTIAL_FILE) {
+ if (code == NetFetch::FETCH_RETRYABLE) {
LOGINF("Reader: retrying at " << pos+totcnt << endl);
fetcher->reset();
fetcher->start(&queue, pos+totcnt);
@@ -289,7 +289,8 @@
void **con_cls)
{
LOGDEB1("answerConn con_cls " << *con_cls << "\n");
- int curlcode, httpcode;
+ NetFetch::FetchStatus fetchcode;
+ int httpcode;
if (nullptr == *con_cls) {
uint64_t offset = 0;
@@ -309,9 +310,13 @@
unordered_map<string,string>querydata;
MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
&mapvalues_cb, &querydata);
+
+ // Request the method (redirect or proxy), and the fetcher if
+ // we are proxying.
+ string url(_url);
+ std::unique_ptr<NetFetch> fetcher;
+ UrlTransReturn ret = urltrans(url, querydata, fetcher);
- string url(_url);
- UrlTransReturn ret = urltrans(url, querydata);
if (ret == Error) {
return MHD_NO;
} else if (ret == Redirect) {
@@ -327,8 +332,9 @@
return ret;
}
- // Create/Start the curl transaction, and wait for the headers.
- LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
+ // The connection fd thing is strictly for debug/diag: faking
+ // a connection loss so that the client side can exercise the
+ // retry code.
int cfd{-1};
const union MHD_ConnectionInfo *cinf =
MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
@@ -337,26 +343,30 @@
} else {
cfd = cinf->connect_fd;
}
- auto reader = new ContentReader(new CurlFetch(url), cfd);
+
+ // Create/Start the fetch transaction, and wait for the headers.
+ LOGDEB0("StreamProxy::answerConn: starting fetch for " << url << endl);
+ auto reader = new ContentReader(std::move(fetcher), cfd);
if (killafterms > 0) {
reader->killafterms = killafterms;
killafterms = -1;
}
reader->fetcher->start(&reader->queue, offset);
*con_cls = reader;
+
LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
return MHD_YES;
// End first call
}
- // Second call for this request. We know that the curl request has
+ // Second call for this request. We know that the fetch request has
// proceeded past the headers (else, we would have failed during
- // the first call). Check for a curl error or http 404 or similar
+ // the first call). Check for an error or http 404 or similar
ContentReader *reader = (ContentReader*)*con_cls;
if (!reader->fetcher->waitForHeaders()) {
LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
- reader->fetcher->curlDone(&curlcode, &httpcode);
+ reader->fetcher->fetchDone(&fetchcode, &httpcode);
int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
struct MHD_Response *response =
MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
@@ -396,8 +406,8 @@
}
int code = MHD_HTTP_OK;
- LOGDEB1("StreamProxy::answerConn: calling curldone\n");
- if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
+ LOGDEB1("StreamProxy::answerConn: calling fetchDone\n");
+ if (reader->fetcher->fetchDone(&fetchcode, &httpcode)) {
code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
}
int ret = MHD_queue_response(mhdconn, code, response);