Switch to side-by-side view

--- a/Allura/allura/lib/async.py
+++ b/Allura/allura/lib/async.py
@@ -1,11 +1,7 @@
-import re
 import logging
-from collections import defaultdict
-from contextlib import contextmanager
+from Queue import Queue
 
-import mock
 import kombu
-import pkg_resources
 
 log = logging.getLogger(__name__)
 
@@ -19,163 +15,17 @@
             password=password,
             virtual_host=vhost)
         self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
-        self._exchanges = dict(
-            audit=kombu.Exchange('audit'),
-            react=kombu.Exchange('react'))
         self.reset()
 
     def reset(self):
         self._conn = self._connection_pool.acquire()
-        self._channel_pool = self._conn.ChannelPool(preload=2, limit=10)
         self.queue = self._conn.SimpleQueue('task')
-
-    @property
-    def connection(self):
-        return self._conn
-
-    @contextmanager
-    def channel(self):
-        try:
-            ch = self._channel_pool.acquire()
-        except kombu.exceptions.ChannelLimitExceeded:
-            log.info('Channel pool exhausted, opening a new connection')
-            self.reset()
-            ch = self._channel_pool.acquire()
-        try:
-            yield ch
-        finally:
-            if ch.is_open: ch.release()
-            else: log.info('ch is not open, not returning to pool')
-
-    @contextmanager
-    def producer(self, xn):
-        with self.channel() as ch:
-            prod = kombu.Producer(ch, self._exchanges[xn], auto_declare=False)
-            yield prod
-
-    def clear_exchanges(self):
-        try:
-            with self.channel() as ch:
-                ch.exchange_delete('audit')
-        except:
-            pass
-        try:
-            with self.channel() as ch:
-                ch.exchange_delete('react')
-        except:
-            pass
-
-    def declare_exchanges(self):
-        self.clear_exchanges()
-        with self.channel() as ch:
-            ch.exchange_declare('audit', 'topic', durable=True, auto_delete=False)
-        with self.channel() as ch:
-            ch.exchange_declare('react', 'topic', durable=True, auto_delete=False)
-
-    def declare_queue(self, xn, qn, keys):
-        try:
-            with self.channel() as ch:
-                ch.queue_delete(qn)
-        except:
-            pass
-        with self.channel() as ch:
-            ch.queue_declare(qn, durable=True, auto_delete=False)
-        for k in keys:
-            with self.channel() as ch:
-                ch.queue_bind(qn, xn, k)
-
-    def publish(self, xn, key_msgs):
-       '''Publish a series of messages to an exchange using a single producer
-
-       xn: exchange name
-       key_msgs: sequence of (routing_key, message, kwargs) pairs to be published
-       '''
-       with self.producer(xn) as p:
-           for routing_key, body, kwargs in key_msgs:
-               kwargs.setdefault('serializer', 'pickle')
-               p.publish(body, routing_key=routing_key, **kwargs)
 
 class MockAMQ(object):
 
     def __init__(self, globals):
-        assert False
-        self.exchanges = defaultdict(list)
-        self.queue_bindings = defaultdict(list)
         self.globals = globals
+        self.reset()
 
-    def clear(self):
-        for k,v in self.exchanges.iteritems():
-            v[:] = []
-
-    def create_backend(self):
-        return mock.Mock()
-
-    def publish(self, xn, routing_key, message, **kw):
-        self.exchanges[xn].append(
-            dict(routing_key=routing_key, message=message, kw=kw))
-
-    def pop(self, xn):
-        return self.exchanges[xn].pop(0)
-
-    def declare_exchanges(self):
-        pass
-
-    def declare_queue(self, xn, qn, keys):
-        pass
-
-    def setup_handlers(self, paste_registry=None):
-        from allura.command.reactor import tool_consumers, ReactorCommand
-        from allura.command import base
-        from allura import model as M
-        self.queue_bindings = defaultdict(list)
-        base.log = logging.getLogger('allura.command')
-        base.M = M
-        self.tools = []
-        for ep in pkg_resources.iter_entry_points('allura'):
-            try:
-                self.tools.append((ep.name, ep.load()))
-            except ImportError:
-                base.log.warning('Canot load entry point %s', ep)
-        self.reactor = ReactorCommand('reactor_setup')
-        if paste_registry:
-            self.reactor.registry = paste_registry
-        self.reactor.globals = self.globals
-        self.reactor.parse_args([])
-        for name, tool in self.tools:
-            for method, xn, qn, keys in tool_consumers(name, tool):
-                for k in keys:
-                    self.queue_bindings[xn].append(
-                        dict(key=k, tool_name=name, method=method))
-
-    def handle(self, xn, msg=None):
-        if msg is None:
-            msg = self.pop(xn)
-        for handler in self.queue_bindings[xn]:
-            if self._route_matches(handler['key'], msg['routing_key']):
-                self._route(xn, msg, handler['tool_name'], handler['method'])
-
-    def handle_all(self):
-        for xn, messages in self.exchanges.items():
-            messages = list(messages)
-            self.exchanges[xn][:] = []
-            for msg in messages:
-                self.handle(xn, msg)
-
-    def _route(self, xn, msg, tool_name, method):
-        if xn == 'audit':
-            callback = self.reactor.route_audit(tool_name, method)
-        else:
-            callback = self.reactor.route_react(tool_name, method)
-        data = msg['message']
-        message = mock.Mock()
-        message.delivery_info = dict(
-            routing_key=msg['routing_key'])
-        message.ack = lambda:None
-        return callback(data, message)
-
-    def _route_matches(self, pattern, key):
-        re_pattern = (pattern
-                      .replace('.', r'\.')
-                      .replace('*', r'(?:\w+)')
-                      .replace('#', r'(?:\w+)(?:\.\w+)*'))
-        return re.match(re_pattern+'$', key)
+    def reset(self):
+        self.queue = Queue()