--- a
+++ b/src/mediaserver/cdplugins/streamproxy.cpp
@@ -0,0 +1,418 @@
+/* Copyright (C) 2017-2018 J.F.Dockes
+ *   This program is free software; you can redistribute it and/or modify
+ *   it under the terms of the GNU General Public License as published by
+ *   the Free Software Foundation; either version 2 of the License, or
+ *   (at your option) any later version.
+ *
+ *   This program is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with this program; if not, write to the
+ *   Free Software Foundation, Inc.,
+ *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#include "streamproxy.h"
+#include "curlfetch.h"
+#include "log.h"
+#include "smallut.h"
+
+#include <microhttpd.h>
+
+#include <mutex>
+#include <condition_variable>
+#include <unordered_map>
+
+using namespace std;
+
+class ContentReader {
+public:
+    ContentReader(CurlFetch *fetcher);
+    ~ContentReader() {
+        LOGDEB1("ContentReader::~ContentReader\n");
+        // This should not be neccessary but see comments in
+        // tstcurlfetch code
+        fetcher = std::unique_ptr<CurlFetch>();
+    }
+    ssize_t contentRead(uint64_t pos, char *buf, size_t max);
+
+    std::unique_ptr<CurlFetch> fetcher;
+    BufXChange<ABuffer*> queue{"crqueue"};
+    bool normalEOS{false};
+};
+
+ContentReader::ContentReader(CurlFetch *f)
+    : fetcher(std::unique_ptr<CurlFetch>(f))
+{
+}
+
+ssize_t ContentReader::contentRead(uint64_t pos, char *obuf, size_t max)
+{
+    LOGDEB1("ContentReader::contentRead: pos "<<pos << " max " << max << endl);
+    if (normalEOS) {
+        LOGDEB1("ContentReader::contentRead: return EOS\n");
+        return MHD_CONTENT_READER_END_OF_STREAM;
+    }
+    size_t totcnt = 0;
+    ABuffer *abuf;
+    while (totcnt < max) {
+        if (!queue.take(&abuf)) {
+            LOGDEB("ContentReader::contentRead: return ERROR\n");
+            return MHD_CONTENT_READER_END_WITH_ERROR;
+        }
+        LOGDEB1("ContentReader::contentRead: got buffer with " <<
+                abuf->bytes << " bytes\n");
+        if (abuf->bytes == 0) {
+            normalEOS = true;
+            if (totcnt == 0) {
+                LOGDEB1("ContentReader::contentRead: return EOS\n");
+                return MHD_CONTENT_READER_END_OF_STREAM;
+            } else {
+                // Copied data, we will return eostream on next call.
+                break;
+            }
+        }
+        size_t tocopy = MIN(max-totcnt, abuf->bytes - abuf->curoffs);
+        memcpy(obuf + totcnt, abuf->buf + abuf->curoffs, tocopy);
+        totcnt += tocopy;
+        abuf->curoffs += tocopy;
+        if (abuf->curoffs >= abuf->bytes) {
+            queue.recycle(abuf);
+        } else {
+            queue.untake(abuf);
+        }
+    }
+    LOGDEB1("ContentReader::contentRead: return " << totcnt << endl);
+    return totcnt;
+}
+
+static ssize_t content_reader_cb(void *cls, uint64_t pos, char *buf, size_t max)
+{
+    ContentReader *reader = static_cast<ContentReader*>(cls);
+    if (reader) {
+        return reader->contentRead(pos, buf, max);
+    } else {
+        return -1;
+    }
+}
+
+
+class StreamProxy::Internal {
+public:
+    Internal(int listenport, UrlTransFunc urlt);
+    ~Internal();
+    bool startMHD();
+
+    int answerConn(
+        struct MHD_Connection *connection, const char *url, 
+        const char *method, const char *version, 
+        const char *upload_data, size_t *upload_data_size,
+        void **con_cls);
+
+    void requestCompleted(
+        struct MHD_Connection *conn,
+        void **con_cls, enum MHD_RequestTerminationCode toe);
+
+    int listenport{-1};
+    UrlTransFunc urltrans;
+    struct MHD_Daemon *mhd{nullptr};
+};
+
+
+StreamProxy::StreamProxy(int listenport,UrlTransFunc urltrans)
+    : m(new Internal(listenport, urltrans))
+{
+}
+
+StreamProxy::~StreamProxy()
+{
+}
+StreamProxy::Internal::~Internal()
+{
+    LOGDEB("StreamProxy::Internal::~Internal()\n");
+    if (mhd) {
+        MHD_stop_daemon(mhd);
+        mhd = nullptr;
+    }
+}
+
+StreamProxy::Internal::Internal(int _listenport, UrlTransFunc _urltrans)
+    : listenport(_listenport), urltrans(_urltrans)
+{
+    startMHD();
+}
+
+
+static int answer_to_connection(
+    void *cls, struct MHD_Connection *conn, 
+    const char *url, const char *method, const char *version, 
+    const char *upload_data, size_t *upload_data_size,
+    void **con_cls)
+{
+    StreamProxy::Internal *internal = static_cast<StreamProxy::Internal*>(cls);
+    
+    if (internal) {
+        return internal->answerConn(
+            conn, url, method, version, upload_data, upload_data_size, con_cls);
+    } else {
+        return -1;
+    }
+}
+
+#define PRINT_KEYS
+#ifdef PRINT_KEYS
+static vector<CharFlags> valueKind {
+    {MHD_RESPONSE_HEADER_KIND, "Response header"},
+    {MHD_HEADER_KIND, "HTTP header"},
+    {MHD_COOKIE_KIND, "Cookies"},
+    {MHD_POSTDATA_KIND, "POST data"},
+    {MHD_GET_ARGUMENT_KIND, "GET (URI) arguments"},
+    {MHD_FOOTER_KIND, "HTTP footer"},
+        };
+
+static int print_out_key (void *cls, enum MHD_ValueKind kind, 
+                          const char *key, const char *value)
+{
+    LOGDEB(valToString(valueKind, kind) << ": " << key << " -> " <<
+           value << endl);
+    return MHD_YES;
+}
+#endif /* PRINT_KEYS */
+
+static int mapvalues_cb(void *cls, enum MHD_ValueKind kind, 
+                        const char *key, const char *value)
+{
+    unordered_map<string,string> *mp = (unordered_map<string,string> *)cls;
+    if (mp) {
+        (*mp)[key] = value;
+    }
+    return MHD_YES;
+}
+
+// Parse range header. 
+static bool parseRanges(
+    const string& ranges, vector<pair<int64_t, int64_t>>& oranges)
+{
+    oranges.clear();
+    string::size_type pos = ranges.find("bytes=");
+    if (pos == string::npos) {
+        return false;
+    }
+    pos += 6;
+    bool done = false;
+    while(!done) {
+        string::size_type dash = ranges.find('-', pos);
+        if (dash == string::npos) {
+            return false;
+        }
+        string::size_type comma = ranges.find(',', pos);
+        string firstPart = ranges.substr(pos, dash-pos);
+        int64_t start = firstPart.empty() ? 0 : atoll(firstPart.c_str());
+        string secondPart = ranges.substr(dash+1, comma != string::npos ? 
+                                          comma-dash-1 : string::npos);
+        int64_t fin = secondPart.empty() ? -1 : atoll(secondPart.c_str());
+        pair<int64_t, int64_t> nrange(start,fin);
+        oranges.push_back(nrange);
+        if (comma != string::npos) {
+            pos = comma + 1;
+        }
+        done = comma == string::npos;
+    }
+    return true;
+}
+
+static bool processRange(struct MHD_Connection *mhdconn, uint64_t& offset)
+{
+    const char* rangeh =
+        MHD_lookup_connection_value(mhdconn, MHD_HEADER_KIND, "range");
+    vector<pair<int64_t, int64_t> > ranges;
+    if (rangeh && parseRanges(rangeh, ranges) && ranges.size()) {
+        if (ranges[0].second != -1 || ranges.size() > 1) {
+            LOGERR("AProxy::mhdAnswerConn: unsupported range: " <<
+                   rangeh << "\n");
+            struct MHD_Response *response = 
+                MHD_create_response_from_buffer(0,0,MHD_RESPMEM_PERSISTENT);
+            MHD_queue_response(mhdconn, 416, response);
+            MHD_destroy_response(response);
+            return false;
+        } else {
+            offset = (uint64_t)ranges[0].first;
+            LOGDEB("AProxy::mhdAnswerConn: accept xx- range, offset "
+                   << offset << endl);
+        }
+    }
+    return true;
+}
+
+int StreamProxy::Internal::answerConn(
+    struct MHD_Connection *mhdconn, const char *_url,
+    const char *method, const char *version, 
+    const char *upload_data, size_t *upload_data_size,
+    void **con_cls)
+{
+    int curlcode; long httpcode;
+
+    if (nullptr == *con_cls) {
+        uint64_t offset = 0;
+        // First call, look at headers, method etc.
+#ifdef PRINT_KEYS
+        MHD_get_connection_values(mhdconn,MHD_HEADER_KIND,&print_out_key,0);
+#endif
+        if (strcmp("GET", method) && strcmp("HEAD", method)) {
+            LOGERR("StreamProxy::answerConn: method is not GET or HEAD\n");
+            return MHD_NO;
+        }
+        if (!processRange(mhdconn, offset)) {
+            return MHD_NO;
+        }
+
+        // Compute destination url
+        unordered_map<string,string>querydata;
+        MHD_get_connection_values(mhdconn, MHD_GET_ARGUMENT_KIND,
+                                  &mapvalues_cb, &querydata);
+        
+        string url(_url);
+        UrlTransReturn ret = urltrans(url, querydata);
+        if (ret == Error) {
+            return MHD_NO;
+        } else if (ret == Redirect) {
+            struct MHD_Response *response =
+                MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
+            if (nullptr == response ) {
+                LOGERR("StreamProxy::answerConn: can't create redirect\n");
+                return MHD_NO;
+            }
+            MHD_add_response_header (response, "Location", url.c_str());
+            int ret = MHD_queue_response(mhdconn, 302, response);
+            MHD_destroy_response(response);
+            return ret;
+        }
+
+        // Create/Start the curl transaction, and wait for the headers.
+        LOGDEB("StreamProxy:answerConn: starting curl for " << url << endl);
+        auto reader = new ContentReader(new CurlFetch(url));
+        reader->fetcher->start(&reader->queue, offset);
+        *con_cls = reader;
+        return MHD_YES;
+        // End first call
+    }
+
+    // Second call for this request. We know that the curl request has
+    // proceeded past the headers (else, we would have failed during
+    // the first call). Check for a curl error or http 404 or similar
+    ContentReader *reader = (ContentReader*)*con_cls;
+    
+    if (!reader->fetcher->waitForHeaders()) {
+        LOGDEB("StreamProxy::answerConn: waitForHeaders error\n");
+        reader->fetcher->curlDone(&curlcode, &httpcode);
+        int code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
+        struct MHD_Response *response =
+            MHD_create_response_from_buffer(0, 0, MHD_RESPMEM_PERSISTENT);
+        int ret = MHD_queue_response(mhdconn, code, response);
+        MHD_destroy_response(response);
+        LOGINF("StreamProxy::answerConn (1): return with http code: " <<
+               code << endl);
+        return ret;
+    }
+
+    string cl;
+    uint64_t size = MHD_SIZE_UNKNOWN;
+    if (reader->fetcher->headerValue("content-length", cl) && !cl.empty()) {
+        LOGDEB1("mhdAnswerConn: header content-length: " << cl << endl);
+        size  = (uint64_t)atoll(cl.c_str());
+    }
+
+    // Build a data response.
+    // the block size seems to be flatly ignored by libmicrohttpd
+    // Any random value would probably work the same
+    struct MHD_Response *response = 
+        MHD_create_response_from_callback(size, 4096,
+                                          content_reader_cb, reader, nullptr);
+    if (response == NULL) {
+        LOGERR("mhdAnswerConn: answer: could not create response" << endl);
+        return MHD_NO;
+    }
+
+    MHD_add_response_header (response, "Accept-Ranges", "bytes");
+    if (!cl.empty()) {
+        MHD_add_response_header(response, "Content-Length", cl.c_str());
+    }
+    string ct;
+    if (reader->fetcher->headerValue("content-type", ct) && !ct.empty()) {
+        LOGDEB1("mhdAnswerConn: header content-type: " << ct << endl);
+        MHD_add_response_header(response, "Content-Type", ct.c_str());
+    }
+
+    int code = MHD_HTTP_OK;
+    if (reader->fetcher->curlDone(&curlcode, &httpcode)) {
+        code = httpcode ? httpcode : MHD_HTTP_INTERNAL_SERVER_ERROR;
+    }
+    int ret = MHD_queue_response(mhdconn, code, response);
+    MHD_destroy_response(response);
+    return ret;
+}
+
+static void
+request_completed_callback(
+    void *cls, struct MHD_Connection *conn,
+    void **con_cls, enum MHD_RequestTerminationCode toe)
+{
+    // We get this even if the answer callback returned MHD_NO
+    // (e.g. for a second connection). check con_cls and do nothing if
+    // it's not set.
+    if (cls && *con_cls) {
+        StreamProxy::Internal *internal =
+            static_cast<StreamProxy::Internal*>(cls);
+        return internal->requestCompleted(conn, con_cls, toe);
+    }
+}
+
+static vector<CharFlags> completionStatus {
+    {MHD_REQUEST_TERMINATED_COMPLETED_OK,
+            "MHD_REQUEST_TERMINATED_COMPLETED_OK", ""},
+    {MHD_REQUEST_TERMINATED_WITH_ERROR,
+            "MHD_REQUEST_TERMINATED_WITH_ERROR", ""},
+    {MHD_REQUEST_TERMINATED_TIMEOUT_REACHED,
+            "MHD_REQUEST_TERMINATED_TIMEOUT_REACHED", ""},
+    {MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN,
+            "MHD_REQUEST_TERMINATED_DAEMON_SHUTDOWN", ""},
+    {MHD_REQUEST_TERMINATED_READ_ERROR,
+            "MHD_REQUEST_TERMINATED_READ_ERROR", ""},
+    {MHD_REQUEST_TERMINATED_CLIENT_ABORT,
+            "MHD_REQUEST_TERMINATED_CLIENT_ABORT", ""},
+        };
+
+void StreamProxy::Internal::requestCompleted(
+    struct MHD_Connection *conn,
+    void **con_cls, enum MHD_RequestTerminationCode toe)
+{
+    LOGDEB("StreamProxy::requestCompleted: status " <<
+           valToString(completionStatus, toe) << endl);
+    if (*con_cls) {
+        ContentReader *reader = static_cast<ContentReader*>(*con_cls);
+        delete reader;
+    }
+}
+
+bool StreamProxy::Internal::startMHD()
+{
+    mhd = MHD_start_daemon(
+        MHD_USE_THREAD_PER_CONNECTION|MHD_USE_SELECT_INTERNALLY|MHD_USE_DEBUG,
+        listenport, 
+        /* Accept policy callback and arg */
+        nullptr, nullptr, 
+        /* handler and arg */
+        &answer_to_connection, this,
+        MHD_OPTION_NOTIFY_COMPLETED, request_completed_callback, this,
+        MHD_OPTION_END);
+
+    if (nullptr == mhd) {
+        LOGERR("Aproxy: MHD_start_daemon failed\n");
+        return false;
+    }
+
+    return true;
+}