--- a/Allura/allura/command/reactor.py
+++ b/Allura/allura/command/reactor.py
@@ -107,21 +107,26 @@
base.log.info('=== Mark ===')
def multi_worker_main(self, configs):
- base.log.info('Entering multiqueue worker process')
- consumers = [ ]
- cset = ConsumerSet(pylons.g.conn)
- for config in configs:
- c = Consumer(connection=pylons.g.conn, queue=config['qn'])
- if config['xn'] == 'audit':
- c.register_callback(self.route_audit(config['tool_name'], config['method']))
- else:
- c.register_callback(self.route_react(config['tool_name'], config['method']))
- cset.add_consumer(c)
- if self.options.dry_run: return
- else: # pragma no cover
- base.log.info('Ready to handle messages')
- for x in cset.iterconsume():
- pass
+ while True:
+ try:
+ base.log.info('Entering multiqueue worker process')
+ cset = ConsumerSet(pylons.g.conn)
+ for config in configs:
+ c = Consumer(connection=pylons.g.conn, queue=config['qn'])
+ if config['xn'] == 'audit':
+ c.register_callback(self.route_audit(config['tool_name'], config['method']))
+ else:
+ c.register_callback(self.route_react(config['tool_name'], config['method']))
+ cset.add_consumer(c)
+ if self.options.dry_run: return
+ else: # pragma no cover
+ base.log.info('Ready to handle messages')
+ for x in cset.iterconsume():
+ pass
+ except Exception:
+ base.log.exception('AMQP error, restart in 10s')
+ pylons.g.amqp_reconnect()
+ time.sleep(10)
def periodic_main(self):
base.log.info('Entering periodic reactor')