import os
import sys
import time
import json
import Queue
from pprint import pformat
import logging
import ming
import pylons
import kombu
from webob import Request
from bson import ObjectId
from weberror.errormiddleware import handle_exception
from allura.lib import utils
from . import base
log = logging.getLogger(__name__)
class ReactorSetupCommand(base.Command):
summary = 'Configure the RabbitMQ queues and bindings for the given set of tools'
parser = base.Command.standard_parser(verbose=True)
def __init__(self): assert False
def command(self):
self.basic_setup()
pylons.g.amq_conn.declare_exchanges()
for name, tool in self.tools:
self.configure_tool(name, tool)
def configure_tool(self, name, tool):
base.log.info('Configuring tool %s:%s', name, tool)
for method, xn, qn, keys in tool_consumers(name, tool):
base.log.info('... %s %s %r', xn, qn, keys)
pylons.g.amq_conn.declare_queue(xn, qn, keys)
class ReactorCommand(base.Command):
summary = 'Start up all the auditors and reactors for registered tools'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-p', '--proc', dest='proc', type='int', default=1,
help='number of worker processes to spawn')
parser.add_option('--dry_run', dest='dry_run', action='store_true', default=False,
help="get ready to run the reactor, but don't actually run it")
def __init__(self): assert False
def command(self):
self.basic_setup()
processes = [ base.RestartableProcess(target=self.periodic_main, log=base.log, args=()) ]
configs = [
dict(tool_name=name,
method=method, xn=xn, qn=qn, keys=keys)
for name, tool in self.tools
for method, xn, qn, keys in tool_consumers(name, tool) ]
for x in xrange(self.options.proc):
processes.append(base.RestartableProcess(target=self.multi_worker_main,
log=base.log,
args=(configs,)))
continue
if self.options.dry_run: return configs
elif self.options.proc == 1:
base.log.info('Starting single reactor process')
processes[0].start()
self.multi_worker_main(configs)
else: # pragma no cover
for p in processes:
p.start()
while True:
for x in xrange(60):
time.sleep(5)
for p in processes: p.check()
base.log.info('=== Mark ===')
def multi_worker_main(self, configs):
if self.options.dry_run: return
while True:
pylons.g.amq_conn.reset()
with pylons.g.amq_conn.channel() as channel:
try:
exchanges = dict(
audit=kombu.Exchange('audit', type='topic', channel=channel),
react=kombu.Exchange('react', type='topic', channel=channel))
for config in configs:
q = kombu.Queue(config['qn'], exchanges[config['xn']], channel=channel)
consumer = kombu.Consumer(channel,q, auto_declare=False)
import pdb; pdb.set_trace()
if config['xn'] == 'audit':
consumer.register_callback(
self.route_audit(config['tool_name'], config['method']))
elif config['xn'] == 'react':
consumer.register_callback(
self.route_react(config['tool_name'], config['method']))
consumer.consume()
while True:
channel.connection.drain_events()
except:
base.log.exception('AMQP error, restart in 10s')
time.sleep(10)
def periodic_main(self):
base.log.info('Entering periodic reactor')
while True:
base.M.ScheduledMessage.fire_when_ready()
time.sleep(5)
if self.options.dry_run: return
def send_error_report(self, exc_info):
C = pylons.config['pylons.errorware']
handle_exception(
exc_info, sys.stderr,
html=True,
debug_mode=C.get('debug', False),
error_email=C.get('error_email'),
error_log=C.get('error_log'),
show_exceptions_in_wsgi_errors=False,
error_email_from=C.get('from_address'),
smtp_server=C.get('smtp_server'),
smtp_username=C.get('smtp_username'),
smtp_password=C.get('smtp_password'),
smtp_use_tls=C.get('smtp_use_tls'),
error_subject_prefix=C.get('error_subject_prefix'),
error_message=C.get('error_message'),
simple_html_error=C.get('simple_html_error'))
def route_audit(self, tool_name, method):
'Auditors only respond to their particluar mount point'
log = logging.getLogger('allura.queue.audit')
def callback(data, msg):
msg.ack()
log.info('AUDSIT received msg for %s', msg.delivery_info['routing_key'])
try:
self.setup_globals()
__traceback_supplement__ = (
self.Supplement, pylons.c, data, msg, 'audit')
if 'project_id' in data:
try:
if data['project_id']:
pylons.c.project = base.M.Project.query.get(_id=ObjectId(str(data['project_id'])))
else:
pylons.c.project = None
except:
pylons.c.project = None
base.log.exception('Error looking up project %r', data['project_id'])
if pylons.c.project is None:
base.log.error('The project_id was %s but it was not found',
data['project_id'])
else:
pylons.c.project = None
if 'user_id' in data:
try:
pylons.c.user = base.M.User.query.get(
_id=data['user_id'] and ObjectId(str(data['user_id'])) or None)
except:
base.log.exception('Bad user_id: %s', data['user_id'])
mount_point = data.get('mount_point')
if pylons.c.project and mount_point is not None:
pylons.c.app = pylons.c.project.app_instance(mount_point)
base.log.debug('Setting app %s', pylons.c.app)
if getattr(method, 'im_self', ()) is None:
# Instancemethod - call, binding self
method.im_func(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function - don't bind self
method(msg.delivery_info['routing_key'], data)
except: # pragma no cover
base.log.exception(
'Exception audit handling %s: %s',
tool_name, method)
for k,v in self.Supplement(pylons.c, data, msg, 'audit').extraData().items():
base.log.error(' %s: %s', k, v)
if self.options.dry_run: raise
self.send_error_report(sys.exc_info())
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
self.teardown_globals()
ming.orm.ormsession.ThreadLocalORMSession.close_all()
return callback
def route_react(self, tool_name, method):
'All tool instances respond to the react exchange'
log = logging.getLogger('allura.queue.react')
def callback(data, msg):
msg.ack()
log.info('REACTE received msg for %s', msg.delivery_info['routing_key'])
try:
self.setup_globals()
__traceback_supplement__ = (
self.Supplement, pylons.c, data, msg, 'react')
# log.info('React(%s): %s', msg.delivery_info['routing_key'], data)
if 'user_id' in data:
try:
pylons.c.user = base.M.User.query.get(_id=data['user_id'] and ObjectId(str(data['user_id'])) or None)
except:
base.log.exception('Bad user_id: %s', data['user_id'])
if 'project_id' in data:
try:
if data['project_id']:
pylons.c.project = base.M.Project.query.get(_id=ObjectId(str(data['project_id'])))
else:
pylons.c.project = None
except:
pylons.c.project = None
base.log.exception('Error looking up project %r', data['project_id'])
if getattr(method, 'im_self', ()) is None:
# Instancemethod - call once for each app, binding self
if not pylons.c.project:
# Can't route it, so drop
return
for cfg in pylons.c.project.app_configs:
if cfg.tool_name != tool_name: continue
pylons.c.app = pylons.c.project.app_instance(
cfg.options.mount_point)
method(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function -- just call once
method(msg.delivery_info['routing_key'], data)
except: # pragma no cover
base.log.exception('Exception react handling %s: %s', tool_name, method)
for k,v in self.Supplement(pylons.c, data, msg, 'audit').extraData().items():
base.log.error(' %s: %s', k, v)
if self.options.dry_run: raise
self.send_error_report(sys.exc_info())
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
ming.orm.ormsession.ThreadLocalORMSession.close_all()
self.teardown_globals()
return callback
class Supplement(object):
def __init__(self, c, data, msg, exchange):
self.c = c
self.data = data
self.msg = msg
self.exchange = exchange
def extraData(self):
project = getattr(self.c, 'project', None)
app = getattr(self.c, 'app', None)
user = getattr(self.c, 'user', None)
return dict(
project=project.shortname if project else None,
app=app.config.options.mount_point if app else None,
user=user.username if user else None,
key=self.msg.delivery_info['routing_key'],
exchange=self.exchange,
message=pformat(self.data))
class SendMessageCommand(base.Command):
min_args=3
max_args=4
usage = '<ini file> <exchange> <topic> [<json message>]'
summary = 'Send a message to a RabbitMQ exchange'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-c', '--context', dest='context',
help=('The context of the message (path to the project'
' and/or tool'))
def command(self):
from allura.lib.helpers import find_project
self.basic_setup()
exchange = self.args[1]
topic = self.args[2]
# Set the context of the message
if self.options.context:
project, rest = find_project(self.options.context)
pylons.c.project = project
if rest:
pylons.g.set_app(rest[0])
if len(self.args) > 3:
base_message = json.loads(self.args[3])
else: # pragma no cover
base_message = json.loads(sys.stdin.read())
base.log.info('Sending message to %s / %s:\n%s',
exchange, topic, base_message)
pylons.g.publish(exchange, topic, base_message)
def tool_consumers(name, tool):
from allura.lib.decorators import ConsumerDecoration
i = 0
for name in dir(tool):
method = getattr(tool, name)
deco = ConsumerDecoration.get_decoration(method, False)
if not deco: continue
if not hasattr(method, '__name__'): continue
name = '%s.%s' % (method.__module__, method.__name__)
if deco.audit_keys:
qn = '%s/%d/audit' % (name, i)
i += 1
yield method, 'audit', qn, list(deco.audit_keys)
if deco.react_keys:
qn = '%s/%d/react' % (name, i)
i += 1
yield method, 'react', qn, list(deco.react_keys)
def debug(*a,**kw): # pragma no cover
from IPython.ipapi import make_session; make_session()
from IPython.Debugger import Pdb
base.log.info('Entering debugger')
p = Pdb(color_scheme='Linux')
p.reset()
p.setup(sys._getframe(), None)
p.cmdloop()
p.forget()
# sys.excepthook =debug