Switch to side-by-side view

--- a/src/mediaserver/cdplugins/streamproxy.cpp
+++ b/src/mediaserver/cdplugins/streamproxy.cpp
@@ -16,14 +16,13 @@
  */
 
 #include "streamproxy.h"
-#include "curlfetch.h"
+#include "netfetch.h"
 #include "log.h"
 #include "smallut.h"
 #include "chrono.h"
 
 #include <fcntl.h>
 #include <microhttpd.h>
-#include <curl/curl.h>
 
 #include <mutex>
 #include <condition_variable>
@@ -33,16 +32,20 @@
 
 class ContentReader {
 public:
-    ContentReader(CurlFetch *fetcher, int cfd);
+    ContentReader(std::unique_ptr<NetFetch> ftchr, int cfd)
+        : fetcher(std::move(ftchr)), connfd(cfd) {
+        
+    }
+
     ~ContentReader() {
         LOGDEB1("ContentReader::~ContentReader\n");
-        // This should not be neccessary but see comments in
+        // This should not be necessary but see comments in
         // tstcurlfetch code
-        fetcher = std::unique_ptr<CurlFetch>();
+        fetcher = std::unique_ptr<NetFetch>();
     }
     ssize_t contentRead(uint64_t pos, char *buf, size_t max);
 
-    std::unique_ptr<CurlFetch> fetcher;
+    std::unique_ptr<NetFetch> fetcher;
     BufXChange<ABuffer*> queue{"crqueue"};
     bool normalEOS{false};
     // Used for experimentations in killing the connection
@@ -51,10 +54,6 @@
     Chrono chron;
 };
 
-ContentReader::ContentReader(CurlFetch *f, int cfd)
-    : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
-{
-}
 
 ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
 {
@@ -67,11 +66,12 @@
     ABuffer *abuf;
     while (totcnt < max) {
         if (!queue.take(&abuf)) {
-            int curlcode{0}, httpcode{0};
-            fetcher->curlDone(&curlcode, &httpcode);
-            LOGDEB("Reader: queue take failed curlcode " << curlcode <<
+            NetFetch::FetchStatus code;
+            int httpcode;
+            fetcher->fetchDone(&code, &httpcode);
+            LOGDEB("Reader: queue take failed code " << code <<
                    " httpcode " << httpcode << endl);
-            if (curlcode == CURLE_PARTIAL_FILE) {
+            if (code == NetFetch::FETCH_RETRYABLE) {
                 LOGINF("Reader: retrying at " << pos+totcnt << endl);
                 fetcher->reset();
                 fetcher->start(&queue, pos+totcnt);
@@ -289,7 +289,8 @@
     void **con_cls)
 {
     LOGDEB1("answerConn con_cls " << *con_cls << "\n");
-    int curlcode, httpcode;
+    NetFetch::FetchStatus fetchcode;
+    int httpcode;
 
     if (nullptr == *con_cls) {
         uint64_t offset = 0;
@@ -309,9 +310,13 @@
         unordered_map<string,string>querydata;
         MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
                                   &mapvalues_cb, &querydata);
+
+        // Request the method (redirect or proxy), and the fetcher if
+        // we are proxying.
+        string url(_url);
+        std::unique_ptr<NetFetch> fetcher;
+        UrlTransReturn ret = urltrans(url, querydata, fetcher);
         
-        string url(_url);
-        UrlTransReturn ret = urltrans(url, querydata);
         if (ret == Error) {
             return MHD_NO;
         } else if (ret == Redirect) {
@@ -327,8 +332,9 @@
             return ret;
         }
 
-        // Create/Start the curl transaction, and wait for the headers.
-        LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
+        // The connection fd thing is strictly for debug/diag: faking
+        // a connection loss so that the client side can exercise the
+        // retry code.
         int cfd{-1};
         const union MHD_ConnectionInfo *cinf =
             MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
@@ -337,26 +343,30 @@
         } else {
             cfd = cinf->connect_fd;
         }
-        auto reader = new ContentReader(new CurlFetch(url), cfd);
+
+        // Create/Start the fetch transaction, and wait for the headers.
+        LOGDEB0("StreamProxy::answerConn: starting fetch for " << url << endl);
+        auto reader = new ContentReader(std::move(fetcher), cfd);
         if (killafterms > 0) {
             reader->killafterms = killafterms;
             killafterms = -1;
         }
         reader->fetcher->start(&reader->queue, offset);
         *con_cls = reader;
+
         LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
         return MHD_YES;
         // End first call
     }
 
-    // Second call for this request. We know that the curl request has
+    // Second call for this request. We know that the fetch request has
     // proceeded past the headers (else, we would have failed during
-    // the first call). Check for a curl error or http 404 or similar
+    // the first call). Check for an error or http 404 or similar
     ContentReader *reader = (ContentReader*)*con_cls;
     
     if (!reader->fetcher->waitForHeaders()) {
         LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
-        reader->fetcher->curlDone(&curlcode, &httpcode);
+        reader->fetcher->fetchDone(&fetchcode, &httpcode);
         int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
         struct MHD_Response *response =
             MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
@@ -396,8 +406,8 @@
     }
 
     int code = MHD_HTTP_OK;
-    LOGDEB1("StreamProxy::answerConn: calling curldone\n");
-    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
+    LOGDEB1("StreamProxy::answerConn: calling fetchDone\n");
+    if (reader->fetcher->fetchDone(&fetchcode, &httpcode)) {
         code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
     }
     int ret = MHD_queue_response(mhdconn, code, response);