a b/src/httpgate.cpp
1
/* Copyright (C) 2014 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
6
 *
7
 *     This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
11
 *
12
 *     You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
#include "config.h"
18
19
#include <string.h>
20
#include <arpa/inet.h>
21
#include <sys/types.h>
22
#include <sys/select.h>
23
#include <sys/socket.h>
24
25
#include <iostream>
26
#include <queue>
27
28
#include <microhttpd.h>
29
30
#include "log.h"
31
#include "rcvqueue.h"
32
#include "wav.h"
33
34
using namespace std;
35
36
/** 
37
   Implement web server to export the audio stream. I had to jump
38
   through many hoops to get this to work with mpd (probably because
39
   of audiofile/sndfile restrictions). None of these are necessary
40
   with VLC which is quite happy if we just serve any wav header
41
   directly followed by audio data. 
42
43
   The trick to get this to work with libsndfile was apparently to
44
   send a Content-Size header. If we don't, the lib tries to seek
45
   inside the file (which I tried to emulate by accepting ranges), but
46
   fails to parse it anyway, which is quite probably a bug in
47
   libsndfile (does not see files with unknown size often...).
48
*/
49
50
// Only accept HTTP connections from localhost
51
#define ACCEPT_LOCALONLY 1
52
53
static const int dataformat_wav = 1;
54
55
#ifndef MIN
56
#define MIN(A, B) ((A) < (B) ? (A) : (B))
57
#endif
58
59
// The queue for audio blocks out of SongCast
60
static queue<AudioMessage*> dataqueue;
61
static PTMutexInit dataqueueLock;
62
static pthread_cond_t dataqueueWaitCond = PTHREAD_COND_INITIALIZER;
63
64
// Bogus data size for our streams. Total size is databytes+44 (header)
65
const unsigned int databytes = 2 * 1000 * 1000 * 1000;
66
67
// Used this while trying to emulate ranges, did not do the trick
68
struct ReadContext {
69
    ReadContext(long long o = 0) : baseoffset(o) {}
70
    long long baseoffset;
71
};
72
73
static const char *ValueKindToCp(enum MHD_ValueKind kind)
74
{
75
    switch (kind) {
76
    case MHD_RESPONSE_HEADER_KIND: return "Response header";
77
    case MHD_HEADER_KIND: return "HTTP header";
78
    case MHD_COOKIE_KIND: return "Cookies";
79
    case MHD_POSTDATA_KIND: return "POST data";
80
    case MHD_GET_ARGUMENT_KIND: return "GET (URI) arguments";
81
    case MHD_FOOTER_KIND: return "HTTP footer";
82
    default: return "Unknown";
83
    }
84
}
85
86
static int print_out_key (void *cls, enum MHD_ValueKind kind, 
87
                          const char *key, const char *value)
88
{
89
    LOGDEB(ValueKindToCp(kind) << ": " << key << " -> " << value << endl);
90
    return MHD_YES;
91
}
92
93
// This gets called by microhttpd when it needs data.
94
static ssize_t
95
data_generator(void *cls, uint64_t pos, char *buf, size_t max)
96
{
97
    ReadContext *rc = (ReadContext *)cls;
98
    //LOGDEB("data_generator: baseoffs " <<
99
    //        rc->baseoffset << " pos " << pos << " max " << max << endl);
100
    if (rc->baseoffset + pos >= databytes + 44) {
101
        LOGDEB("data_generator: returning EOS" << endl);
102
        return MHD_CONTENT_READER_END_OF_STREAM;
103
    }
104
105
    PTMutexLocker lock(dataqueueLock);
106
    size_t bytes = 0;
107
    while (bytes < max) {
108
        while (dataqueue.empty()) {
109
            //LOGDEB("data_generator: waiting for buffer" << endl);
110
            pthread_cond_wait(&dataqueueWaitCond, lock.getMutex());
111
            if (!dataqueue.empty() && dataqueue.front()->m_buf == 0) {
112
                // special buf, to be processed ?
113
                LOGINF("data_generator: deleting empty buf" << endl);
114
                delete dataqueue.front();
115
                dataqueue.pop();
116
            }
117
        }
118
119
        AudioMessage *m = dataqueue.front();
120
        if (dataformat_wav && rc->baseoffset == 0 && pos == 0 && bytes == 0) {
121
            LOGINF("data_generator: first buf" << endl);
122
            // Using buf+bytes in case we ever insert icy before the audio
123
            int sz = makewavheader(buf+bytes, max, 
124
                                   m->m_freq, m->m_bits, m->m_chans, databytes);
125
            bytes += sz;
126
        }
127
128
        size_t newbytes = MIN(max - bytes, m->m_bytes - m->m_curoffs);
129
        memcpy(buf + bytes, m->m_buf + m->m_curoffs, newbytes);
130
        m->m_curoffs += newbytes;
131
        bytes += newbytes;
132
        if (m->m_curoffs == m->m_bytes) {
133
            delete dataqueue.front();
134
            dataqueue.pop();
135
        }
136
    }
137
    //LOGDEB("data_generator: returning " << bytes << " bytes" << endl);
138
    return bytes;
139
}
140
141
// Parse range header. 
142
static void parseRanges(const string& ranges, vector<pair<int,int> >& oranges)
143
{
144
    oranges.clear();
145
    string::size_type pos = ranges.find("bytes=");
146
    if (pos == string::npos) {
147
        return;
148
    }
149
    pos += 6;
150
    bool done = false;
151
    while(!done) {
152
        string::size_type dash = ranges.find('-', pos);
153
        string::size_type comma = ranges.find(',', pos);
154
        string firstPart = dash != string::npos ? 
155
            ranges.substr(pos, dash-pos) : "";
156
        int start = firstPart.empty() ? 0 : atoi(firstPart.c_str());
157
        string secondPart = dash != string::npos ? 
158
            ranges.substr(dash+1, comma != string::npos ? 
159
                          comma-dash-1 : string::npos) : "";
160
        int fin = secondPart.empty() ? -1 : atoi(firstPart.c_str());
161
        pair<int,int> nrange(start,fin);
162
        oranges.push_back(nrange);
163
        if (comma != string::npos) {
164
            pos = comma + 1;
165
        }
166
        done = comma == string::npos;
167
    }
168
}
169
170
static void ContentReaderFreeCallback(void *cls)
171
{
172
    ReadContext *rc = (ReadContext*)cls;
173
    delete rc;
174
}
175
176
static int answer_to_connection(void *cls, struct MHD_Connection *connection, 
177
                                const char *url, 
178
                                const char *method, const char *version, 
179
                                const char *upload_data, 
180
                                size_t *upload_data_size, void **con_cls)
181
{
182
    LOGDEB("answer_to_connection: url " << url << " method " << method << 
183
           " version " << version << endl);
184
185
    //MHD_get_connection_values(connection, MHD_HEADER_KIND, &print_out_key, 0);
186
187
    static int aptr;
188
    if (&aptr != *con_cls) {
189
        /* do not respond on first call */
190
        *con_cls = &aptr;
191
        return MHD_YES;
192
    }
193
194
    const char* rangeh = MHD_lookup_connection_value(connection, 
195
                                                    MHD_HEADER_KIND, "range");
196
    vector<pair<int,int> > ranges;
197
    if (rangeh) {
198
        parseRanges(rangeh, ranges);
199
    }
200
201
    long long size = MHD_SIZE_UNKNOWN;
202
    ReadContext *rc = new ReadContext();
203
    if (ranges.size()) {
204
        if (ranges[0].second != -1) {
205
            size = ranges[0].second - ranges[0].first + 1;
206
        }
207
        rc->baseoffset = ranges[0].first;
208
        //LOGDEB("httpgate: answer: got ranges: baseoffs " << rc->baseoffset <<
209
        //" sz " << size << endl);
210
    }
211
212
    struct MHD_Response *response = 
213
        MHD_create_response_from_callback(size, 1764, &data_generator, 
214
                                          rc, ContentReaderFreeCallback);
215
    if (response == NULL) {
216
        LOGERR("httpgate: answer: could not create response" << endl);
217
        return MHD_NO;
218
    }
219
    if (dataformat_wav) {
220
        MHD_add_response_header (response, "Content-Type", "audio/x-wav");
221
    } else {
222
        // Could not get this to work with mpd??
223
        MHD_add_response_header (response, "Content-Type", 
224
                                 "audio/x-mpd-cdda-pcm-reverse");
225
    }
226
    char cl[30];
227
    sprintf(cl, "%d", databytes+44);
228
    MHD_add_response_header (response, "Content-Length", cl);
229
    MHD_add_response_header (response, "Accept-Ranges", "bytes");
230
    //MHD_add_response_header (response, "icy-metaint", "32768");
231
232
    int ret = MHD_queue_response (connection, MHD_HTTP_OK, response);
233
    MHD_destroy_response (response);
234
    return ret;
235
}
236
237
static int accept_policy(void *, const struct sockaddr* sa,
238
                         socklen_t addrlen)
239
{
240
    static struct sockaddr_in localaddr;
241
    static bool init = false;
242
    if (!init) {
243
        inet_pton(AF_INET, "127.0.0.1", &localaddr.sin_addr);
244
        init = true;
245
    }
246
247
    if (!sa || sa->sa_family != AF_INET) {
248
        LOGERR("httpgate:accept_policy: not AF_INET" << endl);
249
        return MHD_NO;
250
    }
251
252
#if ACCEPT_LOCALONLY != 0
253
    const struct sockaddr_in *sain = (const struct sockaddr_in*)sa;
254
    if (sain->sin_addr.s_addr != localaddr.sin_addr.s_addr) {
255
        LOGERR("httpgate:accept_policy: not localhost" << endl);
256
        return MHD_NO;
257
    }
258
#endif
259
    return MHD_YES;
260
}
261
262
void *audioEater(void *cls)
263
{
264
    AudioEaterContext *ctxt = (AudioEaterContext*)cls;
265
    if (ctxt->port == 0) {
266
        ctxt->port = 8888;
267
    }
268
    LOGDEB("Using port " << ctxt->port << " for HTTP" << endl);
269
    struct MHD_Daemon *daemon = 
270
        MHD_start_daemon(
271
            MHD_USE_THREAD_PER_CONNECTION,
272
            //MHD_USE_SELECT_INTERNALLY, 
273
            ctxt->port, 
274
            /* Accept policy callback and arg */
275
            accept_policy, NULL, 
276
            /* handler and arg */
277
            &answer_to_connection, NULL, 
278
            MHD_OPTION_END);
279
    if (NULL == daemon) {
280
        audioqueue.workerExit();
281
        return (void *)0;
282
    }
283
    delete ctxt;
284
285
    for (int seq=0;;seq++) {
286
        AudioMessage *tsk = 0;
287
        size_t qsz;
288
        if (!audioqueue.take(&tsk, &qsz)) {
289
            MHD_stop_daemon (daemon);
290
            audioqueue.workerExit();
291
            return (void*)1;
292
        }
293
#if 0        
294
        if (seq % 200 == 0) {
295
            LOGDEB("audioEater: msg: " << tsk->m_bytes << " bytes " << endl);
296
        }
297
#endif
298
        PTMutexLocker lock(dataqueueLock);
299
        /* limit size of queuing. If there is a client but it is not
300
           eating blocks fast enough, there will be audio pops */
301
        while (dataqueue.size() > 2) {
302
            //LOGINF("audioEater: discarding buffer !" << endl);
303
            delete dataqueue.front();
304
            dataqueue.pop();
305
        }
306
        dataqueue.push(tsk);
307
        pthread_cond_broadcast(&dataqueueWaitCond);
308
    }
309
}