Switch to side-by-side view

--- a/src/mediaserver/cdplugins/streamproxy.cpp
+++ b/src/mediaserver/cdplugins/streamproxy.cpp
@@ -19,8 +19,11 @@
 #include "curlfetch.h"
 #include "log.h"
 #include "smallut.h"
-
+#include "chrono.h"
+
+#include <fcntl.h>
 #include <microhttpd.h>
+#include <curl/curl.h>
 
 #include <mutex>
 #include <condition_variable>
@@ -30,7 +33,7 @@
 
 class ContentReader {
 public:
-    ContentReader(CurlFetch *fetcher);
+    ContentReader(CurlFetch *fetcher, int cfd);
     ~ContentReader() {
         LOGDEB1("ContentReader::~ContentReader\n");
         // This should not be neccessary but see comments in
@@ -42,10 +45,14 @@
     std::unique_ptr<CurlFetch> fetcher;
     BufXChange<ABuffer*> queue{"crqueue"};
     bool normalEOS{false};
+    // Used for experimentations in killing the connection
+    int connfd{-1};
+    int killafterms{-1};
+    Chrono chron;
 };
 
-ContentReader::ContentReader(CurlFetch *f)
-    : fetcher(std::unique_ptr<CurlFetch>(f))
+ContentReader::ContentReader(CurlFetch *f, int cfd)
+    : fetcher(std::unique_ptr<CurlFetch>(f)), connfd(cfd)
 {
 }
 
@@ -60,6 +67,16 @@
     ABuffer *abuf;
     while (totcnt < max) {
         if (!queue.take(&abuf)) {
+            int curlcode{0}, httpcode{0};
+            fetcher->curlDone(&curlcode, &httpcode);
+            LOGDEB("Reader: queue take failed curlcode " << curlcode <<
+                   " httpcode " << httpcode << endl);
+            if (curlcode == CURLE_PARTIAL_FILE) {
+                LOGINF("Reader: retrying at " << pos+totcnt << endl);
+                fetcher->reset();
+                fetcher->start(&queue, pos+totcnt);
+                return 0;
+            }
             LOGDEB("ContentReader::contentRead: return ERROR\n");
             return MHD_CONTENT_READER_END_WITH_ERROR;
         }
@@ -85,6 +102,17 @@
             queue.untake(abuf);
         }
     }
+    if (killafterms > 0 && connfd >= 0) {
+        if (chron.millis() > killafterms) {
+            int fd = open("/dev/null", 0);
+            if (fd < 0) {
+                abort();
+            }
+            dup2(fd, connfd);
+            close(fd);
+            connfd = -1;
+        }
+    }
     LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
     return totcnt;
 }
@@ -119,6 +147,7 @@
     int listenport{-1};
     UrlTransFunc urltrans;
     struct MHD_Daemon *mhd{nullptr};
+    int killafterms{-1};
 };
 
 
@@ -127,9 +156,15 @@
 {
 }
 
+void StreamProxy::setKillAfterMs(int ms)
+{
+    m->killafterms = ms;
+}
+
 StreamProxy::~StreamProxy()
 {
 }
+
 StreamProxy::Internal::~Internal()
 {
     LOGDEB("StreamProxy::Internal::~Internal()\n");
@@ -162,7 +197,7 @@
     }
 }
 
-#define PRINT_KEYS
+#undef PRINT_KEYS
 #ifdef PRINT_KEYS
 static vector<CharFlags> valueKind {
     {MHD_RESPONSE_HEADER_KIND, "Response header"},
@@ -253,7 +288,8 @@
     const char *upload_data, size_t *upload_data_size,
     void **con_cls)
 {
-    int curlcode; long httpcode;
+    LOGDEB1("answerConn con_cls " << *con_cls << "\n");
+    int curlcode, httpcode;
 
     if (nullptr == *con_cls) {
         uint64_t offset = 0;
@@ -292,10 +328,23 @@
         }
 
         // Create/Start the curl transaction, and wait for the headers.
-        LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
-        auto reader = new ContentReader(new CurlFetch(url));
+        LOGDEB0("StreamProxy::answerConn: starting curl for " << url << endl);
+        int cfd{-1};
+        const union MHD_ConnectionInfo *cinf =
+            MHD_get_connection_info(mhdconn, MHD_CONNECTION_INFO_CONNECTION_FD);
+        if (nullptr == cinf) {
+            LOGERR("StreamProxy::answerConn: can't get connection fd\n");
+        } else {
+            cfd = cinf->connect_fd;
+        }
+        auto reader = new ContentReader(new CurlFetch(url), cfd);
+        if (killafterms > 0) {
+            reader->killafterms = killafterms;
+            killafterms = -1;
+        }
         reader->fetcher->start(&reader->queue, offset);
         *con_cls = reader;
+        LOGDEB1("StreamProxy::answerConn: returning after 1st call\n");
         return MHD_YES;
         // End first call
     }
@@ -317,6 +366,7 @@
                code << endl);
         return ret;
     }
+    LOGDEB1("StreamProxy::answerConn: waitForHeaders done\n");
 
     string cl;
     uint64_t size = MHD_SIZE_UNKNOWN;
@@ -324,7 +374,6 @@
         LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
         size  = (uint64_t)atoll(cl.c_str());
     }
-
     // Build a data response.
     // the block size seems to be flatly ignored by libmicrohttpd
     // Any random value would probably work the same
@@ -347,6 +396,7 @@
     }
 
     int code = MHD_HTTP_OK;
+    LOGDEB1("StreamProxy::answerConn: calling curldone\n");
     if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
         code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
     }