/*
Copyright 2011-14, Linn Products Ltd. All rights reserved.
Unless otherwise stated, all code in this project is licensed under the 2-clause
(Simplified) BSD license. See BsdLicense.txt for details.
*/
/* Copyright (C) 2014 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Modified from ohSongcast/Receiver/Receiver.cpp
*/
#include "config.h"
#include <OpenHome/OhNetTypes.h>
#include <OpenHome/Private/Ascii.h>
#include <OpenHome/Private/Thread.h>
#include <OpenHome/Private/OptionParser.h>
#include <OpenHome/Private/Debug.h>
#include <OpenHome/Net/Core/OhNet.h>
#ifdef WITH_OHBUILD
#include "Debug.h"
#include "OhmReceiver.h"
#else
#include <OpenHome/OhmReceiver.h>
#include <OpenHome/OhSongcast/Debug.h>
#endif
#include "workqueue.h"
#include "rcvqueue.h"
#include "log.h"
#include "conftree.h"
#include "chrono.h"
#include "watcher.h"
#ifdef WITH_WAVSC2
#include "openaudio.h"
#include "audioreader.h"
#endif
#include <vector>
#include <stdio.h>
#include <iostream>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <fcntl.h>
using namespace std;
WorkQueue<AudioMessage*> audioqueue("audioqueue", 4);
#ifdef _WIN32
#pragma warning(disable:4355) // use of 'this' in ctor lists safe in this case
#define CDECL __cdecl
int mygetch()
{
return (_getch());
}
#else
#define CDECL
#include <termios.h>
#include <unistd.h>
int mygetch()
{
struct termios oldt, newt;
int ch;
tcgetattr(STDIN_FILENO, &oldt);
newt = oldt;
newt.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
ch = getchar();
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
return ch;
}
#endif
int g_sockfd = -1;
bool g_quitrequest = false;
void sigcatcher(int)
{
g_quitrequest = true;
// As recvfrom() is a blocking operation we need to shut the corresponding
// socket down so that recvfrom() returns with 0.
if (g_sockfd > 0) {
shutdown(g_sockfd, SHUT_RD);
}
}
using namespace OpenHome;
using namespace OpenHome::Net;
using namespace OpenHome::TestFramework;
using namespace OpenHome::Av;
class OhmReceiverDriver : public IOhmReceiverDriver, public IOhmMsgProcessor {
public:
OhmReceiverDriver(AudioEater* eater, AudioEater::Context *ctxt);
private:
// IOhmReceiverDriver
virtual void Add(OhmMsg& aMsg);
virtual void Timestamp(OhmMsg& aMsg);
virtual void Started();
virtual void Connected();
virtual void Playing();
virtual void Disconnected();
virtual void Stopped();
// IOhmMsgProcessor
virtual void Process(OhmMsgAudio& aMsg);
virtual void Process(OhmMsgTrack& aMsg);
virtual void Process(OhmMsgMetatext& aMsg);
private:
// Debug, stats, etc while we get to understand the Songcast streams
class Observer {
public:
TBool iReset;
TUint iCount;
TUint iFrame;
int dumpfd;
Chrono chron;
Observer() : iReset(true), iCount(0), iFrame(0), dumpfd(-1) {
#if 0
dumpfd =
open("/y/av/tmp/sc2dump", O_WRONLY|O_CREAT|O_TRUNC, 0666);
if (dumpfd < 0) {
LOGERR("OhmReceiverDriver::Open dump file failed\n");
}
#endif
}
void reset() {
iReset = true;
}
void process(OhmMsgAudio& aMsg);
};
Observer m_obs;
AudioEater *m_eater;
};
OhmReceiverDriver::OhmReceiverDriver(AudioEater *eater,
AudioEater::Context *ctxt)
: m_eater(eater)
{
audioqueue.start(1, m_eater->worker, ctxt);
}
void OhmReceiverDriver::Add(OhmMsg& aMsg)
{
aMsg.Process(*this);
aMsg.RemoveRef();
}
void OhmReceiverDriver::Timestamp(OhmMsg& /*aMsg*/)
{
}
void OhmReceiverDriver::Started()
{
LOGDEB("=== STARTED ====\n");
}
void OhmReceiverDriver::Connected()
{
m_obs.reset();
printf("CONNECTED\n");
fflush(stdout);
LOGDEB("=== CONNECTED ====\n");
}
void OhmReceiverDriver::Playing()
{
LOGDEB("=== PLAYING ====\n");
}
void OhmReceiverDriver::Disconnected()
{
LOGDEB("=== DISCONNECTED ====\n");
}
void OhmReceiverDriver::Stopped()
{
LOGDEB("=== STOPPED ====\n");
}
// Debug and stats only, not needed for main function
void OhmReceiverDriver::Observer::process(OhmMsgAudio& aMsg)
{
if (++iCount == 400 || aMsg.Halt()) {
LOGDEB("OhmRcvDrv::Process:audio: samplerate " << aMsg.SampleRate() <<
" bitdepth " << aMsg.BitDepth() << " channels " <<
aMsg.Channels() << " samples " << aMsg.Samples() <<
" Halted ? " << aMsg.Halt() << endl);
#if 0
static unsigned long long last_timestamp;
unsigned long long timestamp = aMsg.MediaTimestamp();
if (last_timestamp) {
long long intervalus =
((timestamp - last_timestamp) * 1000000) / (256*48000);
long long atsus =
((timestamp) * 1000000) / (256*48000);
// Not too sure what this did. amicros not in chrono.cpp any more
long long absus = chron.amicros() - 1430477861905884LL;
LOGDEB("Computed-uS: " << intervalus <<
" Elapsed-uS: " << chron.urestart() <<
" Timestamp-uS: " << atsus <<
" Abs-uS: " << absus <<
" Diff-mS " << (absus - atsus) / 1000 <<
endl);
}
last_timestamp = timestamp;
#endif
if (!aMsg.Halt()) {
unsigned int bytes =
aMsg.Samples() * (aMsg.BitDepth() / 8) * aMsg.Channels();
if (bytes != aMsg.Audio().Bytes()) {
LOGERR("OhmRcvDrv::Process:audio: computed bytes " << bytes <<
" != buffer's " << aMsg.Audio().Bytes() << endl);
bytes = aMsg.Audio().Bytes();
}
const unsigned char *icp =
(const unsigned char *)aMsg.Audio().Ptr();
bool silence = true;
for (unsigned int i = 0; i < bytes; i++) {
if (icp[i]) {
silence = false;
break;
}
}
if (silence) {
LOGDEB("OhmRcvDrv::Process:audio: silence buffer" << endl);
}
if (dumpfd >= 0) {
if (write(dumpfd, icp, bytes) != int(bytes)) {
;
}
}
}
iCount = 0;
}
if (iReset) {
iFrame = aMsg.Frame();
iReset = false;
} else {
if (aMsg.Frame() != iFrame + 1) {
LOGINF("Missed frames between " << iFrame << " and " <<
aMsg.Frame() << endl);
}
iFrame = aMsg.Frame();
}
}
void copyswap(unsigned char *dest, const unsigned char *src,
unsigned int bytes, unsigned int bits)
{
unsigned char *ocp = dest;
const unsigned char *icp = src;
if (bits == 16) {
swab(src, dest, bytes);
} else if (bits == 24) {
while (icp - src <= int(bytes) - 3) {
*ocp++ = icp[2];
*ocp++ = icp[1];
*ocp++ = *icp;
icp += 3;
}
} else if (bits == 32) {
// Never seen this but whatever...
while (icp - src <= int(bytes) - 4) {
*ocp++ = icp[3];
*ocp++ = icp[2];
*ocp++ = icp[1];
*ocp++ = *icp;
icp += 4;
}
}
}
void OhmReceiverDriver::Process(OhmMsgAudio& aMsg)
{
unsigned int bytes = 0;
unsigned int allocbytes = 0;
char *buf = NULL;
bool needswap = false;
if (aMsg.Audio().Bytes() == 0) {
if (aMsg.Halt()) {
LOGDEB("OhmReceiverDriver::Process: empty message with halt flag "
"set\n");
goto put_audio_message;
} else {
LOGDEB("OhmReceiverDriver::Process: ignoring empty message\n");
return;
}
}
m_obs.process(aMsg);
if (aMsg.Halt()) {
LOGDEB("OhmReceiverDriver::Process: halt flag set in message\n");
}
bytes = aMsg.Audio().Bytes();
// We allocate a bit more space to avoir reallocations in the resampler
allocbytes = bytes + 100;
buf = (char *)malloc(allocbytes);
if (buf == 0) {
LOGERR("OhmReceiverDriver::Process: can't allocate " <<
bytes << " bytes\n");
return;
}
// Songcast data is always msb-first. Convert to desired order:
// depends on what downstream wants, and just as well we do it
// here because we copy the buf anyway.
switch (m_eater->input_border) {
case AudioEater::BO_MSB:
break;
case AudioEater::BO_LSB:
needswap = true;
break;
case AudioEater::BO_HOST:
#ifdef WORDS_BIGENDIAN
needswap = false;
#else
needswap = true;
#endif
break;
}
if (needswap) {
copyswap((unsigned char *)buf, aMsg.Audio().Ptr(), bytes, aMsg.BitDepth());
} else {
memcpy(buf, aMsg.Audio().Ptr(), bytes);
}
put_audio_message:
AudioMessage *ap = new
AudioMessage(aMsg.BitDepth(), aMsg.Channels(), aMsg.Samples(),
aMsg.SampleRate(), aMsg.Halt(), buf, allocbytes);
// There is nothing special we can do if put fails: no way to
// return status. Should we just exit ?
if (!audioqueue.put(ap, false)) {
LOGERR("sc2mpd: queue dead: exiting\n");
exit(1);
}
}
void OhmReceiverDriver::Process(OhmMsgTrack& aMsg)
{
Brhz uri(aMsg.Uri());
Brhz metadata(aMsg.Metadata());
LOGDEB("OhmRcvDrv::Process:trk: TRACK SEQ " << aMsg.Sequence() <<
" URI " << uri.CString() <<
" METADATA " << metadata.CString() << endl);
}
void OhmReceiverDriver::Process(OhmMsgMetatext& aMsg)
{
Brhz metatext(aMsg.Metatext());
LOGDEB("OhmRcvDrv::Process:meta: METATEXT SEQUENCE " << aMsg.Sequence() <<
" METATEXT " << metatext.CString() << endl);
}
#ifdef WITH_WAVSC2
static int playWav(const string& wavfile, AudioEater *eater,
AudioEater::Context *ctxt)
{
audioqueue.start(1, eater->worker, ctxt);
AudioReader *audio = openAudio(wavfile, "", true);
if (!audio || !audio->open() || audio->bytesPerSample() == 0 ||
audio->numChannels() == 0) {
cerr << "Audio file open failed" << endl;
return 1;
}
LOGDEB("sample rate: " << audio->sampleRate() << endl);
LOGDEB("sample size: " << audio->bytesPerSample() << endl);
LOGDEB("channels: " << audio->numChannels() << endl);
size_t packetBytes = 441 * 16 *2;
while (true) {
const unsigned char *ibuf = audio->data(packetBytes);
if (ibuf == 0) {
return 1;
}
// We allocate a bit more space to avoir reallocations in the resampler
unsigned int allocbytes = packetBytes + 100;
char *buf = (char *)malloc(allocbytes);
if (buf == 0) {
LOGERR("playWav: can't allocate " << allocbytes << " bytes\n");
return 1;
}
// Songcast data is always msb-first. Convert to desired order:
// depends on what downstream wants, and just as well we do it
// here because we copy the buf anyway.
bool needswap = false;
switch (eater->input_border) {
case AudioEater::BO_MSB:
break;
case AudioEater::BO_LSB:
needswap = true;
break;
case AudioEater::BO_HOST:
#ifdef WORDS_BIGENDIAN
needswap = false;
#else
needswap = true;
#endif
break;
}
int bitDepth = 8 * audio->bytesPerSample();
if (needswap) {
copyswap((unsigned char *)buf, ibuf, packetBytes, bitDepth);
} else {
memcpy(buf, ibuf, packetBytes);
}
// The constructor wants the number of frames as input (frame
// being all samples at given timepoint, typically 2 for
// stereo)
int frames = packetBytes /
(audio->bytesPerSample() * audio->numChannels());
AudioMessage *ap = new
AudioMessage(bitDepth, audio->numChannels(), frames,
audio->sampleRate(), buf, allocbytes);
// There is nothing special we can do if put fails: no way to
// return status. Should we just exit ?
if (!audioqueue.put(ap, false)) {
LOGERR("sc2mpd: queue dead: exiting\n");
return 1;
}
}
return 0;
}
#endif
#define SOCK_PATH "/tmp/sc2mpd.sock"
#define BUF_SIZE 16
void HandleUserCmd(OhmReceiver* receiver, AudioEater* audio, const Brx& aUri)
{
struct sockaddr_un server_addr, client_addr;
socklen_t len;
ssize_t num_bytes;
char buf[BUF_SIZE];
LOGDEB("OhmReceiver: Running user command handler\n");
g_sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (g_sockfd == -1) {
LOGERR("Error: Cannot create socket: " << strerror(errno) << endl);
goto _leave;
}
if (remove(SOCK_PATH) == -1 && errno != ENOENT) {
LOGERR("Error: Cannot remove socket path: " << strerror(errno) << endl);
goto _close_socket;
}
memset(&server_addr, 0, sizeof(struct sockaddr_un));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCK_PATH, sizeof(server_addr.sun_path) - 1);
if (bind(g_sockfd, (struct sockaddr *) &server_addr,
sizeof(struct sockaddr_un)) == -1) {
LOGERR("Error: Cannot bind to socket: " << strerror(errno) << endl);
goto _close_socket;
}
while (!g_quitrequest) {
len = sizeof(struct sockaddr_un);
// recvfrom() is a blocking call and will wait for messages to arrive.
// If no data has been received within `read_timeout` then -1 is
// returned by `recvfrom()` with errno set to EAGAIN or EWOULDBLOCK.
num_bytes = recvfrom(g_sockfd, buf, BUF_SIZE, 0,
(struct sockaddr *) &client_addr, &len);
if (num_bytes == -1) {
LOGERR("Error: Failed to receive from socket: " <<
strerror(errno) << endl);
continue; // Ignore failed request
}
if (num_bytes > 0) {
string response("OK");
LOGDEB("HandleUserCmd: Received " << num_bytes << " bytes: " <<
string(buf, num_bytes) << endl);
if (strncmp("play", buf, num_bytes) == 0)
receiver->Play(aUri);
else if (strncmp("stop", buf, num_bytes) == 0)
receiver->Stop();
else if (strncmp("state", buf, num_bytes) == 0) {
if (audio->state) {
switch (audio->state()) {
case AudioEater::AudioState::STOPPED:
response = "Stopped"; break;
case AudioEater::AudioState::PLAYING:
response = "Playing"; break;
case AudioEater::AudioState::UNKNOWN:
response = "Unknown";
break;
}
}
}
ssize_t bytes_sent = sendto(g_sockfd, (const void *)response.c_str(),
response.length(), 0,
(struct sockaddr *)&client_addr, len);
if (bytes_sent == -1) {
LOGERR("Error: Cannot write to socket: " <<
strerror(errno) << endl);
}
}
}
_close_socket:
if (g_sockfd > 0)
close(g_sockfd);
_leave:
LOGDEB("OhmReceiver: Leaving user command handler\n");
}
int CDECL main(int aArgc, char* aArgv[])
{
string logfilename;
int loglevel(Logger::LLINF);
OptionParser parser;
OptionUint optionAdapter("-a", "--adapter", 0,
"[adapter] index of network adapter to use");
parser.AddOption(&optionAdapter);
OptionUint optionTtl("-t", "--ttl", 1, "[ttl] ttl");
parser.AddOption(&optionTtl);
OptionUint optionInteract("-i", "--interact", 0, "[interact] interactive");
parser.AddOption(&optionInteract);
OptionString optionUri("-u", "--uri", Brn("mpus://0.0.0.0:0"),
"[uri] uri of the sender");
parser.AddOption(&optionUri);
OptionString optionConfig("-c", "--config", Brn("/etc/upmpdcli.conf"),
"[config] upmpdcli configuration file path");
parser.AddOption(&optionConfig);
OptionBool optionDevice("-d", "--direct-alsa",
"[stream] Use alsa directly instead of producing "
"http stream");
parser.AddOption(&optionDevice);
OptionBool optionWatcher("-W", "--watcher",
"[watcher] Start a watcher thread"
"(needs --direct-alsa)");
parser.AddOption(&optionWatcher);
#ifdef WITH_WAVSC2
OptionString optionWav("-w", "--wav somefile.wav", Brn(""),
"Test audio with wav file instead of sender");
parser.AddOption(&optionWav);
#endif
if (!parser.Parse(aArgc, aArgv)) {
cerr << "Bad options, exiting\n";
return 1;
}
string uconfigfile = (const char *)optionConfig.Value().Ptr();
bool cfspecified = true;
if (uconfigfile.empty()) {
cfspecified = false;
uconfigfile = "/etc/upmpdcli.conf";
}
ConfSimple config(uconfigfile.c_str(), 1, true);
if (!config.ok()) {
cerr << "Could not open config: " << uconfigfile << endl;
if (cfspecified)
return 1;
}
config.get("sclogfilename", logfilename);
string value;
if (config.get("scloglevel", value))
loglevel = atoi(value.c_str());
if (Logger::getTheLog(logfilename) == 0) {
cerr << "Can't initialize log" << endl;
return 1;
}
Logger::getTheLog("")->setLogLevel(Logger::LogLevel(loglevel));
AudioEater::Context *ctxt = new AudioEater::Context(&audioqueue);
ctxt->config = &config;
#ifdef WITH_WAVSC2
string wavname(Brhz(optionWav.Value()).CString());
if (!wavname.empty()) {
string value;
if (!config.get("sccvttype", value) ||
value.compare("NONE")) {
cerr << "Wav input play test. NEEDS sccvttype == NONE\n";
return 1;
}
return playWav(wavname, &alsaAudioEater, ctxt);
}
#endif
InitialisationParams* initParams = InitialisationParams::Create();
Library* lib = new Library(initParams);
std::vector<NetworkAdapter*>* subnetList = lib->CreateSubnetList();
TIpAddress subnet = (*subnetList)[optionAdapter.Value()]->Subnet();
TIpAddress adapter = (*subnetList)[optionAdapter.Value()]->Address();
Library::DestroySubnetList(subnetList);
TUint ttl = optionTtl.Value();
Brhz uri(optionUri.Value());
LOGINF("scmpdcli: using subnet " << (subnet & 0xff) << "." <<
((subnet >> 8) & 0xff) << "." << ((subnet >> 16) & 0xff) << "." <<
((subnet >> 24) & 0xff) << endl);
AudioEater* eater = optionDevice.Value() ?
&alsaAudioEater : &httpAudioEater;
OhmReceiverDriver* driver = new OhmReceiverDriver(eater, ctxt);
OhmReceiver* receiver = new OhmReceiver(lib->Env(), adapter, ttl, *driver);
CpStack* cpStack = lib->StartCp(subnet);
cpStack = cpStack; // avoid unused variable warning
Debug::SetLevel(Debug::kMedia);
signal(SIGINT, sigcatcher);
signal(SIGTERM, sigcatcher);
if (optionInteract.Value()) {
printf("q = quit\n");
for (;;) {
int key = mygetch();
if (key == 'q') {
printf("QUIT\n");
break;
} else if (key == 'p') {
printf("PLAY %s\n", uri.CString());
receiver->Play(uri);
} else if (key == 's') {
printf("STOP\n");
receiver->Stop();
}
}
} else {
if (optionDevice.Value() && optionWatcher.Value()) {
startWatcher(config);
}
receiver->Play(uri);
for (;;) {
HandleUserCmd(receiver, eater, uri);
if (g_quitrequest) {
LOGDEB("Quit requested\n");
break;
}
sleep(1000);
}
}
delete(receiver);
delete lib;
if (optionInteract.Value())
printf("\n");
LOGDEB("EXIT\n");
return (0);
}