--- a
+++ b/upsend_src/streamer.cpp
@@ -0,0 +1,257 @@
+/* Copyright (C) 2014 J.F.Dockes
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the
+ * Free Software Foundation, Inc.,
+ * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+#include "config.h"
+
+#include <string.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+
+#include <iostream>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+
+#include <microhttpd.h>
+
+#include "streamer.h"
+#include "libupnpp/log.h"
+
+using namespace std;
+
+#ifndef MIN
+#define MIN(A,B) ((A)<(B)?(A):(B))
+#endif
+
+// #define PRINT_KEYS
+
+// Only accept HTTP connections from localhost: no
+#define ACCEPT_LOCALONLY 0
+
+// The queue for audio blocks coming our way
+static queue<AudioMessage*> dataqueue;
+static std::mutex dataqueueLock;
+static std::condition_variable dataqueueWaitCond;
+
+#ifdef PRINT_KEYS
+static const char *ValueKindToCp(enum MHD_ValueKind kind)
+{
+ switch (kind) {
+ case MHD_RESPONSE_HEADER_KIND: return "Response header";
+ case MHD_HEADER_KIND: return "HTTP header";
+ case MHD_COOKIE_KIND: return "Cookies";
+ case MHD_POSTDATA_KIND: return "POST data";
+ case MHD_GET_ARGUMENT_KIND: return "GET (URI) arguments";
+ case MHD_FOOTER_KIND: return "HTTP footer";
+ default: return "Unknown";
+ }
+}
+
+static int print_out_key (void *cls, enum MHD_ValueKind kind,
+ const char *key, const char *value)
+{
+ LOGDEB(ValueKindToCp(kind) << ": " << key << " -> " << value << endl);
+ return MHD_YES;
+}
+#endif /* PRINT_KEYS */
+
+struct DataGenContext {
+ DataGenContext() :
+ eof(false) {
+ }
+ bool eof;
+};
+
+// This gets called by microhttpd when it needs data.
+static ssize_t
+data_generator(void *cls, uint64_t pos, char *buf, size_t max)
+{
+ LOGDEB1("data_generator: " << " max " << max << endl);
+ DataGenContext *dgc = (DataGenContext *)cls;
+ std::unique_lock<std::mutex> lock(dataqueueLock);
+ if (dgc->eof) {
+ LOGDEB1("data_generator: already eof\n");
+ return -1;
+ }
+
+ // Loop reading on the input queue until we have satistified the request
+ size_t bytes = 0;
+ while (bytes < max) {
+ while (dataqueue.empty()) {
+ LOGDEB1("data_generator: waiting for buffer" << endl);
+ dataqueueWaitCond.wait(lock);
+ }
+
+ AudioMessage *m = dataqueue.front();
+ if (m->m_bytes == 0) {
+ // EOF
+ LOGDEB1("data_generator: empty buffer\n");
+ dgc->eof = true;
+ // Do not notify or clear the queue: freeCallback will do it.
+ break;
+ }
+ LOGDEB1("data_generator: data buffer\n");
+
+ size_t newbytes = MIN(max - bytes, m->m_bytes - m->m_curoffs);
+ memcpy(buf + bytes, m->m_buf + m->m_curoffs, newbytes);
+ m->m_curoffs += newbytes;
+ bytes += newbytes;
+ if (m->m_curoffs == m->m_bytes) {
+ delete dataqueue.front();
+ dataqueue.pop();
+ dataqueueWaitCond.notify_all();
+ }
+ }
+
+ LOGDEB1("data_generator: returning " << bytes << " bytes" << endl);
+ return bytes;
+}
+
+static void ContentReaderFreeCallback(void *cls)
+{
+ LOGDEB1("ContentReaderFreeCallback\n");
+ DataGenContext *dgc = (DataGenContext*)cls;
+ std::unique_lock<std::mutex> lock(dataqueueLock);
+ while (!dataqueue.empty()) {
+ delete dataqueue.front();
+ dataqueue.pop();
+ }
+ delete dgc;
+ dataqueueWaitCond.notify_all();
+}
+
+static int answer_to_connection(void *cls, struct MHD_Connection *connection,
+ const char *url,
+ const char *method, const char *version,
+ const char *upload_data,
+ size_t *upload_data_size, void **con_cls)
+{
+ AudioSink::Context *ctxt = (AudioSink::Context *)cls;
+
+#ifdef PRINT_KEYS
+ MHD_get_connection_values(connection, MHD_HEADER_KIND, &print_out_key, 0);
+#endif
+
+ static int aptr;
+ if (&aptr != *con_cls) {
+ /* do not respond on first call ?*/
+ *con_cls = &aptr;
+ return MHD_YES;
+ }
+
+ LOGDEB("answer_to_connection: url " << url << " method " << method <<
+ " version " << version << endl);
+
+ long long size = MHD_SIZE_UNKNOWN;
+ DataGenContext *dgc = new DataGenContext();
+
+ // the block size seems to be flatly ignored by libmicrohttpd
+ // Any random value would probably work the same
+ struct MHD_Response *response =
+ MHD_create_response_from_callback(size, 4096, &data_generator,
+ dgc, ContentReaderFreeCallback);
+ if (response == NULL) {
+ LOGERR("httpgate: answer: could not create response" << endl);
+ return MHD_NO;
+ }
+
+ MHD_add_response_header(response, "Content-Type",
+ ctxt->content_type.c_str());
+
+// #define FORCE_CHUNKED
+#if defined(FORCE_CHUNKED)
+#warning content-length is needed for mpd to play wav (else tries to seek).
+ MHD_add_response_header(response, "Transfer-Encoding", "chunked");
+#else
+ char cl[100];
+ sprintf(cl, "%lld", (long long)ctxt->filesize);
+ MHD_add_response_header(response, "Content-Length", cl);
+#endif
+
+ int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+ MHD_destroy_response(response);
+ return ret;
+}
+
+static void *audioEater(AudioSink::Context *ctxt)
+{
+ LOGDEB1("audioEater\n");
+ string value;
+ int port = 8869;
+ auto it = ctxt->config.find("httpport");
+ if (it != ctxt->config.end()) {
+ port = atoi(it->second.c_str());
+ }
+
+ WorkQueue<AudioMessage*> *queue = ctxt->queue;
+
+ LOGDEB1("audioEater: queue " << ctxt->queue << " HTTP port " << port
+ << endl);
+
+ struct MHD_Daemon *daemon =
+ MHD_start_daemon(
+ MHD_USE_SELECT_INTERNALLY,
+ port,
+ /* Accept policy callback and arg */
+ NULL, NULL,
+ /* handler and arg */
+ &answer_to_connection, ctxt,
+ MHD_OPTION_END);
+
+ if (NULL == daemon) {
+ queue->workerExit();
+ delete ctxt;
+ return (void *)0;
+ }
+
+ bool eof = false;
+ while (true) {
+ AudioMessage *tsk = nullptr;
+ size_t qsz;
+ if (!queue->take(&tsk, &qsz)) {
+ tsk = nullptr;
+ eof = true;
+ }
+ std::unique_lock<std::mutex> lock(dataqueueLock);
+ if (eof) {
+ LOGDEB1("audioEater: pushing empty buffer\n");
+ dataqueue.push(new AudioMessage(nullptr, 0, 0));
+ dataqueueWaitCond.notify_all();
+ }
+
+ /* limit size of queuing / wait for drain. */
+ while (dataqueue.size() > (eof ? 0 : 2)) {
+ if (eof) {
+ LOGDEB1("audioEater: waiting for queue drain, sz " <<
+ dataqueue.size() << endl);
+ }
+ dataqueueWaitCond.wait(lock);
+ }
+ if (eof)
+ break;
+ dataqueue.push(tsk);
+ dataqueueWaitCond.notify_all();
+ }
+ LOGDEB0("audioEater: returning\n");
+ MHD_stop_daemon(daemon);
+ queue->workerExit();
+ delete ctxt;
+ return (void*)1;
+}
+
+AudioSink httpAudioSink(&audioEater);