from __future__ import print_function
from PyQt5.QtCore import QObject
from PyQt5.QtCore import QTimer
from PyQt5 import QtCore
import sys
import time
import random
import urllib
import upnpp
import abxutils
_verbose = True
_superverbose = False
def _debug(x):
if _verbose:
print("%s" % x, file = sys.stderr)
from abxtestparams import ABXTestParams
random.seed()
def upnpSecsToDuration(secs):
hours = secs // 3600
secs -= hours * 3600
minutes = secs // 60
secs -= minutes * 60
return "%02d:%02d:%02d" % (hours, minutes, secs)
def upnpDurationToSecs(duration):
l = duration.split(':')
return int(l[0]) * 3600 + int(l[1]) * 60 + int(l[2])
def _makeVS(l):
ret = upnpp.VectorString()
for v in l:
ret.append(v)
return ret
class ABXPlayer(QObject):
pauseToggled = QtCore.pyqtSignal(str)
mediaSpan = QtCore.pyqtSignal(int, int)
userSpan = QtCore.pyqtSignal(str, str)
posUpdate = QtCore.pyqtSignal(int)
def __init__(self, testconf, parent = None):
super(ABXPlayer, self).__init__(parent)
self.params = ABXTestParams(testconf)
self.XIs = None
self.rdrs = {}
self.actrdr = None
self.paused = False
rdrA = self.params.param('renderer', 'A')
srv = upnpp.findTypedService(rdrA, "avtransport", True)
if not srv:
_debug("findTypedService failed for " + rdrA)
sys.exit(1)
self.rdrs['A'] = srv
self.stoprdr(srv)
rdrB = self.params.param('renderer', 'B')
if rdrB == rdrA:
self.rdrs['B'] = srv
else:
srv = upnpp.findTypedService(rdrB, "avtransport", True)
if not srv:
_debug("findTypedService failed for " + rdrB)
sys.exit(1)
self.rdrs['B'] = srv
self.stoprdr(srv)
if self.params.needswitch():
self.swhost = abxutils.abxconfig.get('switchhost')
self.swport = abxutils.abxconfig.get('switchport')
else:
self.swhost = None
self.swport = None
self.begin = 0
self.end = 0
self.timer = QTimer()
self.timer.timeout.connect(self.ontimer)
def runaction(self, rdr, action, args):
args = _makeVS(args)
retdata = upnpp.MapStringString()
ret = rdr.runAction(action, args, retdata)
if ret:
_debug("%s failed with %d" % (action, ret))
return None
else:
if _superverbose:
_debug("%s succeeded" % action)
if len(retdata) != 0 and _superverbose:
_debug("Got data:")
for nm, val in retdata.iteritems():
_debug(" %s : %s" % (nm, val))
retdict = {}
for nm, val in retdata.iteritems():
retdict[nm] = val
return retdict
def reset(self):
'''Called on choice confirm or exit: stop playing and possibly
put relays to rest'''
self.stop()
self.resetSwitches()
if random.getrandbits(1):
self.XIs = 'A'
else:
self.XIs = 'B'
def check(self, X):
'''Check user answer against actual state'''
if self.XIs == X:
return True
else:
return False
def setupSwitches(self, what):
baseurl = 'http://' + self.swhost + ':' + str(self.swport) + '/'
# http://swhost:swport/linea or lineb
url1 = baseurl + 'line' + self.params.param('line', what).lower()
# http://swhost:swport/spka or lineb
url2 = baseurl + 'spk' + self.params.param('spk', what).lower()
for u in [url1, url2]:
urllib.urlopen(u).close()
def resetSwitches(self):
if not self.swhost:
return
# deactivate relays
baseurl = 'http://' + self.swhost + ':' + str(self.swport) + '/'
url1 = baseurl + 'linea'
url2 = baseurl + 'spka'
for u in [url1, url2]:
urllib.urlopen(u).close()
def initplay(self, what):
if what == 'X':
what = self.XIs
self.stop()
rdr = self.rdrs[what]
self.stoprdr(rdr)
if self.swhost:
self.setupSwitches(what)
url = self.params.param('url', what)
self.runaction(rdr, "SetAVTransportURI", ["0", url, ""])
# Instanceid speed
self.runaction(rdr, "Play", ["0", "1"])
self.actrdr = rdr
# Need to let some renderers (e.g. mpd) buffer a little else
# it won't accept to seek. Ennoying, before some others will
time.sleep(0.2)
data = self.runaction(rdr, "GetMediaInfo", ["0"])
_debug("URL: %s" % data["CurrentURI"])
duration = data["MediaDuration"]
_debug("duration: %s" % duration)
totsecs = upnpDurationToSecs(duration)
seek = int(self.params.param('seek', what))
self.mediaSpan.emit(seek, totsecs)
if self.begin:
seek = self.begin
_debug("Seek to %d (user selected)"%seek)
else:
if seek:
_debug("Seek to %d (by configuration)"%seek)
if seek:
# InstanceId, Unit, target
self.runaction(rdr, "Seek", ["0", "REL_TIME",
upnpSecsToDuration(seek)])
self.timer.start(1000)
def stoprdr(self, rdr):
self.runaction(rdr, "Stop", ["0"])
self.paused = False
self.pauseToggled.emit("Pause")
def pauserdr(self, rdr):
self.runaction(rdr, "Pause", ["0"])
self.paused = True
self.pauseToggled.emit("Resume")
def playrdr(self, rdr):
self.runaction(rdr, "Play", ["0", "1"])
self.paused = False
self.pauseToggled.emit("Pause")
def getposinfo(self, rdr):
data = self.runaction(rdr, "GetPositionInfo", ["0"])
if "RelTime" in data:
return upnpDurationToSecs(data["RelTime"])
else:
return 0
def stop(self):
if self.actrdr:
self.stoprdr(self.actrdr)
self.timer.stop()
def pause(self):
if self.actrdr:
if self.paused:
self.playrdr(self.actrdr)
else:
self.pauserdr(self.actrdr)
def seek(self, value):
if self.actrdr:
self.runaction(self.actrdr, "Seek", ["0", "REL_TIME",
upnpSecsToDuration(value)])
def setbegin(self):
if not self.actrdr:
return
self.begin = self.getposinfo(self.actrdr)
_debug("setbegin: current position %d" % self.begin)
self.userSpan.emit(upnpSecsToDuration(self.begin),
upnpSecsToDuration(self.end))
def setend(self):
if not self.actrdr:
return
self.end = self.getposinfo(self.actrdr)
_debug("setend: current position %s" % self.end)
self.userSpan.emit(upnpSecsToDuration(self.begin),
upnpSecsToDuration(self.end))
def resetspan(self):
self.begin = 0
self.end = 0
self.userSpan.emit(upnpSecsToDuration(self.begin),
upnpSecsToDuration(self.end))
def ontimer(self):
if self.actrdr:
cursecs = self.getposinfo(self.actrdr)
_debug("ontimer: secs: %d" % cursecs)
if self.end and cursecs >= self.end:
self.stop()
self.posUpdate.emit(cursecs)