a b/upsend_src/streamer.cpp
1
/* Copyright (C) 2014 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
6
 *
7
 *     This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
11
 *
12
 *     You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
#include "config.h"
18
19
#include <string.h>
20
#include <arpa/inet.h>
21
#include <sys/types.h>
22
#include <sys/select.h>
23
#include <sys/socket.h>
24
25
#include <iostream>
26
#include <queue>
27
#include <mutex>
28
#include <condition_variable>
29
30
#include <microhttpd.h>
31
32
#include "streamer.h"
33
#include "libupnpp/log.h"
34
35
using namespace std;
36
37
#ifndef MIN
38
#define MIN(A,B) ((A)<(B)?(A):(B))
39
#endif
40
41
// #define PRINT_KEYS
42
43
// Only accept HTTP connections from localhost: no
44
#define ACCEPT_LOCALONLY 0
45
46
// The queue for audio blocks coming our way
47
static queue<AudioMessage*> dataqueue;
48
static std::mutex dataqueueLock;
49
static std::condition_variable dataqueueWaitCond;
50
51
#ifdef PRINT_KEYS
52
static const char *ValueKindToCp(enum MHD_ValueKind kind)
53
{
54
    switch (kind) {
55
    case MHD_RESPONSE_HEADER_KIND: return "Response header";
56
    case MHD_HEADER_KIND: return "HTTP header";
57
    case MHD_COOKIE_KIND: return "Cookies";
58
    case MHD_POSTDATA_KIND: return "POST data";
59
    case MHD_GET_ARGUMENT_KIND: return "GET (URI) arguments";
60
    case MHD_FOOTER_KIND: return "HTTP footer";
61
    default: return "Unknown";
62
    }
63
}
64
65
static int print_out_key (void *cls, enum MHD_ValueKind kind, 
66
                          const char *key, const char *value)
67
{
68
    LOGDEB(ValueKindToCp(kind) << ": " << key << " -> " << value << endl);
69
    return MHD_YES;
70
}
71
#endif /* PRINT_KEYS */
72
73
struct DataGenContext {
74
    DataGenContext() :
75
        eof(false) {
76
    }
77
    bool eof;
78
};
79
80
// This gets called by microhttpd when it needs data.
81
static ssize_t
82
data_generator(void *cls, uint64_t pos, char *buf, size_t max)
83
{
84
    LOGDEB1("data_generator: " << " max " << max << endl);
85
    DataGenContext *dgc = (DataGenContext *)cls;
86
    std::unique_lock<std::mutex> lock(dataqueueLock);
87
    if (dgc->eof) {
88
        LOGDEB1("data_generator: already eof\n");
89
        return -1;
90
    }
91
    
92
    // Loop reading on the input queue until we have satistified the request
93
    size_t bytes = 0;
94
    while (bytes < max) {
95
        while (dataqueue.empty()) {
96
            LOGDEB1("data_generator: waiting for buffer" << endl);
97
            dataqueueWaitCond.wait(lock);
98
        }
99
100
        AudioMessage *m = dataqueue.front();
101
        if (m->m_bytes == 0) {
102
            // EOF
103
            LOGDEB1("data_generator: empty buffer\n");
104
            dgc->eof = true;
105
            // Do not notify or clear the queue: freeCallback will do it.
106
            break;
107
        }
108
        LOGDEB1("data_generator: data buffer\n");
109
110
        size_t newbytes = MIN(max - bytes, m->m_bytes - m->m_curoffs);
111
        memcpy(buf + bytes, m->m_buf + m->m_curoffs, newbytes);
112
        m->m_curoffs += newbytes;
113
        bytes += newbytes;
114
        if (m->m_curoffs == m->m_bytes) {
115
            delete dataqueue.front();
116
            dataqueue.pop();
117
            dataqueueWaitCond.notify_all();
118
        }
119
    }
120
121
    LOGDEB1("data_generator: returning " << bytes << " bytes" << endl);
122
    return bytes;
123
}
124
125
static void ContentReaderFreeCallback(void *cls)
126
{
127
    LOGDEB1("ContentReaderFreeCallback\n");
128
    DataGenContext *dgc = (DataGenContext*)cls;
129
    std::unique_lock<std::mutex> lock(dataqueueLock);
130
    while (!dataqueue.empty()) {
131
        delete dataqueue.front();
132
        dataqueue.pop();
133
    }
134
    delete dgc;
135
    dataqueueWaitCond.notify_all();
136
}
137
138
static int answer_to_connection(void *cls, struct MHD_Connection *connection, 
139
                                const char *url, 
140
                                const char *method, const char *version, 
141
                                const char *upload_data, 
142
                                size_t *upload_data_size, void **con_cls)
143
{
144
    AudioSink::Context *ctxt = (AudioSink::Context *)cls;
145
146
#ifdef PRINT_KEYS
147
    MHD_get_connection_values(connection, MHD_HEADER_KIND, &print_out_key, 0);
148
#endif
149
150
    static int aptr;
151
    if (&aptr != *con_cls) {
152
        /* do not respond on first call ?*/
153
        *con_cls = &aptr;
154
        return MHD_YES;
155
    }
156
157
    LOGDEB("answer_to_connection: url " << url << " method " << method << 
158
           " version " << version << endl);
159
160
    long long size = MHD_SIZE_UNKNOWN;
161
    DataGenContext *dgc = new DataGenContext();
162
163
    // the block size seems to be flatly ignored by libmicrohttpd
164
    // Any random value would probably work the same
165
    struct MHD_Response *response = 
166
        MHD_create_response_from_callback(size, 4096, &data_generator, 
167
                                          dgc, ContentReaderFreeCallback);
168
    if (response == NULL) {
169
        LOGERR("httpgate: answer: could not create response" << endl);
170
        return MHD_NO;
171
    }
172
173
    MHD_add_response_header(response, "Content-Type",
174
                            ctxt->content_type.c_str());
175
176
// #define FORCE_CHUNKED
177
#if defined(FORCE_CHUNKED)
178
#warning content-length is needed for mpd to play wav (else tries to seek).
179
        MHD_add_response_header(response, "Transfer-Encoding", "chunked");
180
#else
181
        char cl[100];
182
        sprintf(cl, "%lld", (long long)ctxt->filesize);
183
        MHD_add_response_header(response, "Content-Length", cl);
184
#endif
185
186
    int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
187
    MHD_destroy_response(response);
188
    return ret;
189
}
190
191
static void *audioEater(AudioSink::Context *ctxt)
192
{
193
    LOGDEB1("audioEater\n");
194
    string value;
195
    int port = 8869;
196
    auto it = ctxt->config.find("httpport");
197
    if (it != ctxt->config.end()) {
198
        port = atoi(it->second.c_str());
199
    }
200
201
    WorkQueue<AudioMessage*> *queue = ctxt->queue;
202
203
    LOGDEB1("audioEater: queue " << ctxt->queue << " HTTP port " << port 
204
           << endl);
205
206
    struct MHD_Daemon *daemon = 
207
        MHD_start_daemon(
208
            MHD_USE_SELECT_INTERNALLY, 
209
            port, 
210
            /* Accept policy callback and arg */
211
            NULL, NULL, 
212
            /* handler and arg */
213
            &answer_to_connection, ctxt,
214
            MHD_OPTION_END);
215
216
    if (NULL == daemon) {
217
        queue->workerExit();
218
        delete ctxt;
219
        return (void *)0;
220
    }
221
222
    bool eof = false;
223
    while (true) {
224
        AudioMessage *tsk = nullptr;
225
        size_t qsz;
226
        if (!queue->take(&tsk, &qsz)) {
227
            tsk = nullptr;
228
            eof = true;
229
        }
230
        std::unique_lock<std::mutex> lock(dataqueueLock);
231
        if (eof) {
232
            LOGDEB1("audioEater: pushing empty buffer\n");
233
            dataqueue.push(new AudioMessage(nullptr, 0, 0));
234
            dataqueueWaitCond.notify_all();
235
        }
236
237
        /* limit size of queuing / wait for drain. */
238
        while (dataqueue.size() > (eof ? 0 : 2)) {
239
            if (eof) {
240
                LOGDEB1("audioEater: waiting for queue drain, sz " <<
241
                        dataqueue.size() << endl);
242
            }
243
            dataqueueWaitCond.wait(lock);
244
        }
245
        if (eof)
246
            break;
247
        dataqueue.push(tsk);
248
        dataqueueWaitCond.notify_all();
249
    }
250
    LOGDEB0("audioEater: returning\n");
251
    MHD_stop_daemon(daemon);
252
    queue->workerExit();
253
    delete ctxt;
254
    return (void*)1;
255
}
256
257
AudioSink httpAudioSink(&audioEater);