|
a |
|
b/upsend_src/streamer.cpp |
|
|
1 |
/* Copyright (C) 2014 J.F.Dockes
|
|
|
2 |
* This program is free software; you can redistribute it and/or modify
|
|
|
3 |
* it under the terms of the GNU General Public License as published by
|
|
|
4 |
* the Free Software Foundation; either version 2 of the License, or
|
|
|
5 |
* (at your option) any later version.
|
|
|
6 |
*
|
|
|
7 |
* This program is distributed in the hope that it will be useful,
|
|
|
8 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
9 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
10 |
* GNU General Public License for more details.
|
|
|
11 |
*
|
|
|
12 |
* You should have received a copy of the GNU General Public License
|
|
|
13 |
* along with this program; if not, write to the
|
|
|
14 |
* Free Software Foundation, Inc.,
|
|
|
15 |
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
|
16 |
*/
|
|
|
17 |
#include "config.h"
|
|
|
18 |
|
|
|
19 |
#include <string.h>
|
|
|
20 |
#include <arpa/inet.h>
|
|
|
21 |
#include <sys/types.h>
|
|
|
22 |
#include <sys/select.h>
|
|
|
23 |
#include <sys/socket.h>
|
|
|
24 |
|
|
|
25 |
#include <iostream>
|
|
|
26 |
#include <queue>
|
|
|
27 |
#include <mutex>
|
|
|
28 |
#include <condition_variable>
|
|
|
29 |
|
|
|
30 |
#include <microhttpd.h>
|
|
|
31 |
|
|
|
32 |
#include "streamer.h"
|
|
|
33 |
#include "libupnpp/log.h"
|
|
|
34 |
|
|
|
35 |
using namespace std;
|
|
|
36 |
|
|
|
37 |
#ifndef MIN
|
|
|
38 |
#define MIN(A,B) ((A)<(B)?(A):(B))
|
|
|
39 |
#endif
|
|
|
40 |
|
|
|
41 |
// #define PRINT_KEYS
|
|
|
42 |
|
|
|
43 |
// Only accept HTTP connections from localhost: no
|
|
|
44 |
#define ACCEPT_LOCALONLY 0
|
|
|
45 |
|
|
|
46 |
// The queue for audio blocks coming our way
|
|
|
47 |
static queue<AudioMessage*> dataqueue;
|
|
|
48 |
static std::mutex dataqueueLock;
|
|
|
49 |
static std::condition_variable dataqueueWaitCond;
|
|
|
50 |
|
|
|
51 |
#ifdef PRINT_KEYS
|
|
|
52 |
static const char *ValueKindToCp(enum MHD_ValueKind kind)
|
|
|
53 |
{
|
|
|
54 |
switch (kind) {
|
|
|
55 |
case MHD_RESPONSE_HEADER_KIND: return "Response header";
|
|
|
56 |
case MHD_HEADER_KIND: return "HTTP header";
|
|
|
57 |
case MHD_COOKIE_KIND: return "Cookies";
|
|
|
58 |
case MHD_POSTDATA_KIND: return "POST data";
|
|
|
59 |
case MHD_GET_ARGUMENT_KIND: return "GET (URI) arguments";
|
|
|
60 |
case MHD_FOOTER_KIND: return "HTTP footer";
|
|
|
61 |
default: return "Unknown";
|
|
|
62 |
}
|
|
|
63 |
}
|
|
|
64 |
|
|
|
65 |
static int print_out_key (void *cls, enum MHD_ValueKind kind,
|
|
|
66 |
const char *key, const char *value)
|
|
|
67 |
{
|
|
|
68 |
LOGDEB(ValueKindToCp(kind) << ": " << key << " -> " << value << endl);
|
|
|
69 |
return MHD_YES;
|
|
|
70 |
}
|
|
|
71 |
#endif /* PRINT_KEYS */
|
|
|
72 |
|
|
|
73 |
struct DataGenContext {
|
|
|
74 |
DataGenContext() :
|
|
|
75 |
eof(false) {
|
|
|
76 |
}
|
|
|
77 |
bool eof;
|
|
|
78 |
};
|
|
|
79 |
|
|
|
80 |
// This gets called by microhttpd when it needs data.
|
|
|
81 |
static ssize_t
|
|
|
82 |
data_generator(void *cls, uint64_t pos, char *buf, size_t max)
|
|
|
83 |
{
|
|
|
84 |
LOGDEB1("data_generator: " << " max " << max << endl);
|
|
|
85 |
DataGenContext *dgc = (DataGenContext *)cls;
|
|
|
86 |
std::unique_lock<std::mutex> lock(dataqueueLock);
|
|
|
87 |
if (dgc->eof) {
|
|
|
88 |
LOGDEB1("data_generator: already eof\n");
|
|
|
89 |
return -1;
|
|
|
90 |
}
|
|
|
91 |
|
|
|
92 |
// Loop reading on the input queue until we have satistified the request
|
|
|
93 |
size_t bytes = 0;
|
|
|
94 |
while (bytes < max) {
|
|
|
95 |
while (dataqueue.empty()) {
|
|
|
96 |
LOGDEB1("data_generator: waiting for buffer" << endl);
|
|
|
97 |
dataqueueWaitCond.wait(lock);
|
|
|
98 |
}
|
|
|
99 |
|
|
|
100 |
AudioMessage *m = dataqueue.front();
|
|
|
101 |
if (m->m_bytes == 0) {
|
|
|
102 |
// EOF
|
|
|
103 |
LOGDEB1("data_generator: empty buffer\n");
|
|
|
104 |
dgc->eof = true;
|
|
|
105 |
// Do not notify or clear the queue: freeCallback will do it.
|
|
|
106 |
break;
|
|
|
107 |
}
|
|
|
108 |
LOGDEB1("data_generator: data buffer\n");
|
|
|
109 |
|
|
|
110 |
size_t newbytes = MIN(max - bytes, m->m_bytes - m->m_curoffs);
|
|
|
111 |
memcpy(buf + bytes, m->m_buf + m->m_curoffs, newbytes);
|
|
|
112 |
m->m_curoffs += newbytes;
|
|
|
113 |
bytes += newbytes;
|
|
|
114 |
if (m->m_curoffs == m->m_bytes) {
|
|
|
115 |
delete dataqueue.front();
|
|
|
116 |
dataqueue.pop();
|
|
|
117 |
dataqueueWaitCond.notify_all();
|
|
|
118 |
}
|
|
|
119 |
}
|
|
|
120 |
|
|
|
121 |
LOGDEB1("data_generator: returning " << bytes << " bytes" << endl);
|
|
|
122 |
return bytes;
|
|
|
123 |
}
|
|
|
124 |
|
|
|
125 |
static void ContentReaderFreeCallback(void *cls)
|
|
|
126 |
{
|
|
|
127 |
LOGDEB1("ContentReaderFreeCallback\n");
|
|
|
128 |
DataGenContext *dgc = (DataGenContext*)cls;
|
|
|
129 |
std::unique_lock<std::mutex> lock(dataqueueLock);
|
|
|
130 |
while (!dataqueue.empty()) {
|
|
|
131 |
delete dataqueue.front();
|
|
|
132 |
dataqueue.pop();
|
|
|
133 |
}
|
|
|
134 |
delete dgc;
|
|
|
135 |
dataqueueWaitCond.notify_all();
|
|
|
136 |
}
|
|
|
137 |
|
|
|
138 |
static int answer_to_connection(void *cls, struct MHD_Connection *connection,
|
|
|
139 |
const char *url,
|
|
|
140 |
const char *method, const char *version,
|
|
|
141 |
const char *upload_data,
|
|
|
142 |
size_t *upload_data_size, void **con_cls)
|
|
|
143 |
{
|
|
|
144 |
AudioSink::Context *ctxt = (AudioSink::Context *)cls;
|
|
|
145 |
|
|
|
146 |
#ifdef PRINT_KEYS
|
|
|
147 |
MHD_get_connection_values(connection, MHD_HEADER_KIND, &print_out_key, 0);
|
|
|
148 |
#endif
|
|
|
149 |
|
|
|
150 |
static int aptr;
|
|
|
151 |
if (&aptr != *con_cls) {
|
|
|
152 |
/* do not respond on first call ?*/
|
|
|
153 |
*con_cls = &aptr;
|
|
|
154 |
return MHD_YES;
|
|
|
155 |
}
|
|
|
156 |
|
|
|
157 |
LOGDEB("answer_to_connection: url " << url << " method " << method <<
|
|
|
158 |
" version " << version << endl);
|
|
|
159 |
|
|
|
160 |
long long size = MHD_SIZE_UNKNOWN;
|
|
|
161 |
DataGenContext *dgc = new DataGenContext();
|
|
|
162 |
|
|
|
163 |
// the block size seems to be flatly ignored by libmicrohttpd
|
|
|
164 |
// Any random value would probably work the same
|
|
|
165 |
struct MHD_Response *response =
|
|
|
166 |
MHD_create_response_from_callback(size, 4096, &data_generator,
|
|
|
167 |
dgc, ContentReaderFreeCallback);
|
|
|
168 |
if (response == NULL) {
|
|
|
169 |
LOGERR("httpgate: answer: could not create response" << endl);
|
|
|
170 |
return MHD_NO;
|
|
|
171 |
}
|
|
|
172 |
|
|
|
173 |
MHD_add_response_header(response, "Content-Type",
|
|
|
174 |
ctxt->content_type.c_str());
|
|
|
175 |
|
|
|
176 |
// #define FORCE_CHUNKED
|
|
|
177 |
#if defined(FORCE_CHUNKED)
|
|
|
178 |
#warning content-length is needed for mpd to play wav (else tries to seek).
|
|
|
179 |
MHD_add_response_header(response, "Transfer-Encoding", "chunked");
|
|
|
180 |
#else
|
|
|
181 |
char cl[100];
|
|
|
182 |
sprintf(cl, "%lld", (long long)ctxt->filesize);
|
|
|
183 |
MHD_add_response_header(response, "Content-Length", cl);
|
|
|
184 |
#endif
|
|
|
185 |
|
|
|
186 |
int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
|
|
|
187 |
MHD_destroy_response(response);
|
|
|
188 |
return ret;
|
|
|
189 |
}
|
|
|
190 |
|
|
|
191 |
static void *audioEater(AudioSink::Context *ctxt)
|
|
|
192 |
{
|
|
|
193 |
LOGDEB1("audioEater\n");
|
|
|
194 |
string value;
|
|
|
195 |
int port = 8869;
|
|
|
196 |
auto it = ctxt->config.find("httpport");
|
|
|
197 |
if (it != ctxt->config.end()) {
|
|
|
198 |
port = atoi(it->second.c_str());
|
|
|
199 |
}
|
|
|
200 |
|
|
|
201 |
WorkQueue<AudioMessage*> *queue = ctxt->queue;
|
|
|
202 |
|
|
|
203 |
LOGDEB1("audioEater: queue " << ctxt->queue << " HTTP port " << port
|
|
|
204 |
<< endl);
|
|
|
205 |
|
|
|
206 |
struct MHD_Daemon *daemon =
|
|
|
207 |
MHD_start_daemon(
|
|
|
208 |
MHD_USE_SELECT_INTERNALLY,
|
|
|
209 |
port,
|
|
|
210 |
/* Accept policy callback and arg */
|
|
|
211 |
NULL, NULL,
|
|
|
212 |
/* handler and arg */
|
|
|
213 |
&answer_to_connection, ctxt,
|
|
|
214 |
MHD_OPTION_END);
|
|
|
215 |
|
|
|
216 |
if (NULL == daemon) {
|
|
|
217 |
queue->workerExit();
|
|
|
218 |
delete ctxt;
|
|
|
219 |
return (void *)0;
|
|
|
220 |
}
|
|
|
221 |
|
|
|
222 |
bool eof = false;
|
|
|
223 |
while (true) {
|
|
|
224 |
AudioMessage *tsk = nullptr;
|
|
|
225 |
size_t qsz;
|
|
|
226 |
if (!queue->take(&tsk, &qsz)) {
|
|
|
227 |
tsk = nullptr;
|
|
|
228 |
eof = true;
|
|
|
229 |
}
|
|
|
230 |
std::unique_lock<std::mutex> lock(dataqueueLock);
|
|
|
231 |
if (eof) {
|
|
|
232 |
LOGDEB1("audioEater: pushing empty buffer\n");
|
|
|
233 |
dataqueue.push(new AudioMessage(nullptr, 0, 0));
|
|
|
234 |
dataqueueWaitCond.notify_all();
|
|
|
235 |
}
|
|
|
236 |
|
|
|
237 |
/* limit size of queuing / wait for drain. */
|
|
|
238 |
while (dataqueue.size() > (eof ? 0 : 2)) {
|
|
|
239 |
if (eof) {
|
|
|
240 |
LOGDEB1("audioEater: waiting for queue drain, sz " <<
|
|
|
241 |
dataqueue.size() << endl);
|
|
|
242 |
}
|
|
|
243 |
dataqueueWaitCond.wait(lock);
|
|
|
244 |
}
|
|
|
245 |
if (eof)
|
|
|
246 |
break;
|
|
|
247 |
dataqueue.push(tsk);
|
|
|
248 |
dataqueueWaitCond.notify_all();
|
|
|
249 |
}
|
|
|
250 |
LOGDEB0("audioEater: returning\n");
|
|
|
251 |
MHD_stop_daemon(daemon);
|
|
|
252 |
queue->workerExit();
|
|
|
253 |
delete ctxt;
|
|
|
254 |
return (void*)1;
|
|
|
255 |
}
|
|
|
256 |
|
|
|
257 |
AudioSink httpAudioSink(&audioEater);
|