--- a/Allura/allura/lib/async.py
+++ b/Allura/allura/lib/async.py
@@ -1,11 +1,7 @@
-import re
import logging
-from collections import defaultdict
-from contextlib import contextmanager
+from Queue import Queue
-import mock
import kombu
-import pkg_resources
log = logging.getLogger(__name__)
@@ -19,163 +15,17 @@
password=password,
virtual_host=vhost)
self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
- self._exchanges = dict(
- audit=kombu.Exchange('audit'),
- react=kombu.Exchange('react'))
self.reset()
def reset(self):
self._conn = self._connection_pool.acquire()
- self._channel_pool = self._conn.ChannelPool(preload=2, limit=10)
self.queue = self._conn.SimpleQueue('task')
-
- @property
- def connection(self):
- return self._conn
-
- @contextmanager
- def channel(self):
- try:
- ch = self._channel_pool.acquire()
- except kombu.exceptions.ChannelLimitExceeded:
- log.info('Channel pool exhausted, opening a new connection')
- self.reset()
- ch = self._channel_pool.acquire()
- try:
- yield ch
- finally:
- if ch.is_open: ch.release()
- else: log.info('ch is not open, not returning to pool')
-
- @contextmanager
- def producer(self, xn):
- with self.channel() as ch:
- prod = kombu.Producer(ch, self._exchanges[xn], auto_declare=False)
- yield prod
-
- def clear_exchanges(self):
- try:
- with self.channel() as ch:
- ch.exchange_delete('audit')
- except:
- pass
- try:
- with self.channel() as ch:
- ch.exchange_delete('react')
- except:
- pass
-
- def declare_exchanges(self):
- self.clear_exchanges()
- with self.channel() as ch:
- ch.exchange_declare('audit', 'topic', durable=True, auto_delete=False)
- with self.channel() as ch:
- ch.exchange_declare('react', 'topic', durable=True, auto_delete=False)
-
- def declare_queue(self, xn, qn, keys):
- try:
- with self.channel() as ch:
- ch.queue_delete(qn)
- except:
- pass
- with self.channel() as ch:
- ch.queue_declare(qn, durable=True, auto_delete=False)
- for k in keys:
- with self.channel() as ch:
- ch.queue_bind(qn, xn, k)
-
- def publish(self, xn, key_msgs):
- '''Publish a series of messages to an exchange using a single producer
-
- xn: exchange name
- key_msgs: sequence of (routing_key, message, kwargs) pairs to be published
- '''
- with self.producer(xn) as p:
- for routing_key, body, kwargs in key_msgs:
- kwargs.setdefault('serializer', 'pickle')
- p.publish(body, routing_key=routing_key, **kwargs)
class MockAMQ(object):
def __init__(self, globals):
- assert False
- self.exchanges = defaultdict(list)
- self.queue_bindings = defaultdict(list)
self.globals = globals
+ self.reset()
- def clear(self):
- for k,v in self.exchanges.iteritems():
- v[:] = []
-
- def create_backend(self):
- return mock.Mock()
-
- def publish(self, xn, routing_key, message, **kw):
- self.exchanges[xn].append(
- dict(routing_key=routing_key, message=message, kw=kw))
-
- def pop(self, xn):
- return self.exchanges[xn].pop(0)
-
- def declare_exchanges(self):
- pass
-
- def declare_queue(self, xn, qn, keys):
- pass
-
- def setup_handlers(self, paste_registry=None):
- from allura.command.reactor import tool_consumers, ReactorCommand
- from allura.command import base
- from allura import model as M
- self.queue_bindings = defaultdict(list)
- base.log = logging.getLogger('allura.command')
- base.M = M
- self.tools = []
- for ep in pkg_resources.iter_entry_points('allura'):
- try:
- self.tools.append((ep.name, ep.load()))
- except ImportError:
- base.log.warning('Canot load entry point %s', ep)
- self.reactor = ReactorCommand('reactor_setup')
- if paste_registry:
- self.reactor.registry = paste_registry
- self.reactor.globals = self.globals
- self.reactor.parse_args([])
- for name, tool in self.tools:
- for method, xn, qn, keys in tool_consumers(name, tool):
- for k in keys:
- self.queue_bindings[xn].append(
- dict(key=k, tool_name=name, method=method))
-
- def handle(self, xn, msg=None):
- if msg is None:
- msg = self.pop(xn)
- for handler in self.queue_bindings[xn]:
- if self._route_matches(handler['key'], msg['routing_key']):
- self._route(xn, msg, handler['tool_name'], handler['method'])
-
- def handle_all(self):
- for xn, messages in self.exchanges.items():
- messages = list(messages)
- self.exchanges[xn][:] = []
- for msg in messages:
- self.handle(xn, msg)
-
- def _route(self, xn, msg, tool_name, method):
- if xn == 'audit':
- callback = self.reactor.route_audit(tool_name, method)
- else:
- callback = self.reactor.route_react(tool_name, method)
- data = msg['message']
- message = mock.Mock()
- message.delivery_info = dict(
- routing_key=msg['routing_key'])
- message.ack = lambda:None
- return callback(data, message)
-
- def _route_matches(self, pattern, key):
- re_pattern = (pattern
- .replace('.', r'\.')
- .replace('*', r'(?:\w+)')
- .replace('#', r'(?:\w+)(?:\.\w+)*'))
- return re.match(re_pattern+'$', key)
+ def reset(self):
+ self.queue = Queue()