from __future__ import print_function
from PyQt5.QtCore import QObject
import sys
import time
import random
import upnpp
import abxutils
_verbose = True
_superverbose = False
def _debug(x):
if _verbose:
print("%s" % x, file = sys.stderr)
from abxtestparams import ABXTestParams
random.seed()
def upnpSecsToDuration(secs):
hours = secs // 3600
secs -= hours * 3600
minutes = secs // 60
secs -= minutes * 60
return "%02d:%02d:%02d" % (hours, minutes, secs)
def _makeVS(l):
ret = upnpp.VectorString()
for v in l:
ret.append(v)
return ret
class ABXPlayer(QObject):
def __init__(self, testconf, parent = None):
super(ABXPlayer, self).__init__(parent)
self.params = ABXTestParams(testconf)
self.XIs = None
self.rdrs = {}
self.actrdr = None
rdrA = self.params.param('renderer', 'A')
srv = upnpp.findTypedService(rdrA, "avtransport", True)
if not srv:
_debug("findTypedService failed for " + rdrA)
sys.exit(1)
self.rdrs['A'] = srv
self.stoprdr(srv)
rdrB = self.params.param('renderer', 'B')
if rdrB == rdrA:
self.rdrs['B'] = srv
else:
srv = upnpp.findTypedService(rdrB, "avtransport", True)
if not srv:
_debug("findTypedService failed for " + rdrB)
sys.exit(1)
self.rdrs['B'] = srv
self.stoprdr(srv)
if self.params.needswitch():
self.swhost = abxutils.abxconfig.get('switchhost')
self.swport = abxutils.abxconfig.get('switchport')
else:
self.swhost = None
self.swport = None
def reset(self):
self.stopit()
if random.getrandbits(1):
self.XIs = 'A'
else:
self.XIs = 'B'
def check(self, X):
if self.XIs == X:
return True
else:
return False
def runaction(self, rdr, action, args):
args = _makeVS(args)
retdata = upnpp.MapStringString()
ret = rdr.runAction(action, args, retdata)
if ret:
_debug("%s failed with %d" % (action, ret))
return None
else:
if _superverbose:
_debug("%s succeeded" % action)
if len(retdata) != 0 and _superverbose:
_debug("Got data:")
for nm, val in retdata.iteritems():
_debug(" %s : %s" % (nm, val))
retdict = {}
for nm, val in retdata.iteritems():
retdict[nm] = val
return retdict
def setupSwitches(self, what):
line = self.params.param('line')
spk = self.params.param('spk')
def initplay(self, what):
if what == 'X':
what = self.XIs
if self.swhost:
self.setupSwitches(what)
rdr = self.rdrs[what]
self.stoprdr(rdr)
self.runaction(rdr, "SetAVTransportURI",
["0", self.params.param('url', what), ""])
# Instanceid speed
self.runaction(rdr, "Play", ["0", "1"])
self.actrdr = rdr
seek = int(self.params.param('seek', what))
if seek != 0:
# InstanceId, Unit, target
self.runaction(rdr, "Seek",
["0", "REL_TIME", upnpSecsToDuration(seek)])
if _verbose:
data = self.runaction(rdr, "GetMediaInfo", ["0"])
_debug("URL: %s" % data["CurrentURI"])
def stoprdr(self, rdr):
self.runaction(rdr, "Stop", ["0"])
def stopit(self):
if self.actrdr:
self.stoprdr(self.actrdr)