--- a/Allura/allura/command/reactor.py
+++ b/Allura/allura/command/reactor.py
@@ -1,14 +1,16 @@
+import os
import sys
import time
import json
+import Queue
from pprint import pformat
-from multiprocessing import Process
import logging
import ming
import pylons
+import kombu
+from webob import Request
from bson import ObjectId
-import kombu
from weberror.errormiddleware import handle_exception
from allura.lib import utils
@@ -17,29 +19,12 @@
log = logging.getLogger(__name__)
-class RestartableProcess(object):
-
- def __init__(self, log, *args, **kwargs):
- self._log = log
- self._args, self._kwargs = args, kwargs
- self.reinit()
-
- def reinit(self):
- self._process = Process(*self._args, **self._kwargs)
-
- def check(self):
- if not self.is_alive():
- self._log.error('Process %d has died, restarting', self.pid)
- self.reinit()
- self.start()
-
- def __getattr__(self, name):
- return getattr(self._process, name)
-
class ReactorSetupCommand(base.Command):
summary = 'Configure the RabbitMQ queues and bindings for the given set of tools'
parser = base.Command.standard_parser(verbose=True)
+
+ def __init__(self): assert False
def command(self):
self.basic_setup()
@@ -62,16 +47,17 @@
parser.add_option('--dry_run', dest='dry_run', action='store_true', default=False,
help="get ready to run the reactor, but don't actually run it")
+ def __init__(self): assert False
def command(self):
self.basic_setup()
- processes = [ RestartableProcess(target=self.periodic_main, log=base.log, args=()) ]
+ processes = [ base.RestartableProcess(target=self.periodic_main, log=base.log, args=()) ]
configs = [
dict(tool_name=name,
method=method, xn=xn, qn=qn, keys=keys)
for name, tool in self.tools
for method, xn, qn, keys in tool_consumers(name, tool) ]
for x in xrange(self.options.proc):
- processes.append(RestartableProcess(target=self.multi_worker_main,
+ processes.append(base.RestartableProcess(target=self.multi_worker_main,
log=base.log,
args=(configs,)))
continue
@@ -101,6 +87,7 @@
for config in configs:
q = kombu.Queue(config['qn'], exchanges[config['xn']], channel=channel)
consumer = kombu.Consumer(channel,q, auto_declare=False)
+ import pdb; pdb.set_trace()
if config['xn'] == 'audit':
consumer.register_callback(
self.route_audit(config['tool_name'], config['method']))
@@ -144,7 +131,7 @@
log = logging.getLogger('allura.queue.audit')
def callback(data, msg):
msg.ack()
- log.info('received msg for %s', msg.delivery_info['routing_key'])
+ log.info('AUDSIT received msg for %s', msg.delivery_info['routing_key'])
try:
self.setup_globals()
__traceback_supplement__ = (
@@ -200,7 +187,8 @@
log = logging.getLogger('allura.queue.react')
def callback(data, msg):
msg.ack()
- log.info('received msg for %s', msg.delivery_info['routing_key'])
+ log.info('REACTE received msg for %s', msg.delivery_info['routing_key'])
+
try:
self.setup_globals()