Switch to side-by-side view

--- a/Allura/allura/command/reactor.py
+++ b/Allura/allura/command/reactor.py
@@ -107,21 +107,26 @@
                 base.log.info('=== Mark ===')
 
     def multi_worker_main(self, configs):
-        base.log.info('Entering multiqueue worker process')
-        consumers = [ ]
-        cset = ConsumerSet(pylons.g.conn)
-        for config in configs:
-            c = Consumer(connection=pylons.g.conn, queue=config['qn'])
-            if config['xn'] == 'audit':
-                c.register_callback(self.route_audit(config['tool_name'], config['method']))
-            else:
-                c.register_callback(self.route_react(config['tool_name'], config['method']))
-            cset.add_consumer(c)
-        if self.options.dry_run: return
-        else: # pragma no cover
-            base.log.info('Ready to handle messages')
-            for x in cset.iterconsume():
-                pass
+        while True:
+            try:
+                base.log.info('Entering multiqueue worker process')
+                cset = ConsumerSet(pylons.g.conn)
+                for config in configs:
+                    c = Consumer(connection=pylons.g.conn, queue=config['qn'])
+                    if config['xn'] == 'audit':
+                        c.register_callback(self.route_audit(config['tool_name'], config['method']))
+                    else:
+                        c.register_callback(self.route_react(config['tool_name'], config['method']))
+                    cset.add_consumer(c)
+                if self.options.dry_run: return
+                else: # pragma no cover
+                    base.log.info('Ready to handle messages')
+                    for x in cset.iterconsume():
+                        pass
+            except Exception:
+                base.log.exception('AMQP error, restart in 10s')
+                pylons.g.amqp_reconnect()
+                time.sleep(10)
 
     def periodic_main(self):
         base.log.info('Entering periodic reactor')