--- a/Allura/allura/lib/async.py
+++ b/Allura/allura/lib/async.py
@@ -1,6 +1,11 @@
+import re
+import logging
+from collections import defaultdict
 from contextlib import contextmanager
 
+import mock
 import kombu
+import pkg_resources
 
 class Connection(object):
 
@@ -66,3 +71,84 @@
            for routing_key, body, kwargs in key_msgs:
                kwargs.setdefault('serializer', 'pickle')
                p.publish(body, routing_key=routing_key, **kwargs)
+
+class MockAMQ(object):
+
+    def __init__(self, globals):
+        self.exchanges = defaultdict(list)
+        self.queue_bindings = defaultdict(list)
+        self.globals = globals
+
+    def clear(self):
+        for k in self.exchanges.keys():
+            self.exchanges[k][:] = []
+
+    def create_backend(self):
+        return mock.Mock()
+
+    def publish(self, xn, routing_key, message, **kw):
+        self.exchanges[xn].append(
+            dict(routing_key=routing_key, message=message, kw=kw))
+
+    def pop(self, xn):
+        return self.exchanges[xn].pop(0)
+
+    def declare_exchanges(self):
+        pass
+
+    def declare_queue(self, xn, qn, keys):
+        pass
+
+    def setup_handlers(self, paste_registry=None):
+        from allura.command.reactor import tool_consumers, ReactorCommand
+        from allura.command import base
+        from allura import model as M
+        self.queue_bindings = defaultdict(list)
+        base.log = logging.getLogger('allura.command')
+        base.M = M
+        self.tools = []
+        for ep in pkg_resources.iter_entry_points('allura'):
+            try:
+                self.tools.append((ep.name, ep.load()))
+            except ImportError:
+                base.log.warning('Canot load entry point %s', ep)
+        self.reactor = ReactorCommand('reactor_setup')
+        if paste_registry:
+            self.reactor.registry = paste_registry
+        self.reactor.globals = self.globals
+        self.reactor.parse_args([])
+        for name, tool in self.tools:
+            for method, xn, qn, keys in tool_consumers(name, tool):
+                for k in keys:
+                    self.queue_bindings[xn].append(
+                        dict(key=k, tool_name=name, method=method))
+
+    def handle(self, xn):
+        msg = self.pop(xn)
+        for handler in self.queue_bindings[xn]:
+            if self._route_matches(handler['key'], msg['routing_key']):
+                self._route(xn, msg, handler['tool_name'], handler['method'])
+
+    def handle_all(self):
+        for xn, messages in self.exchanges.items():
+            while messages:
+                self.handle(xn)
+
+    def _route(self, xn, msg, tool_name, method):
+        if xn == 'audit':
+            callback = self.reactor.route_audit(tool_name, method)
+        else:
+            callback = self.reactor.route_react(tool_name, method)
+        data = msg['message']
+        message = mock.Mock()
+        message.delivery_info = dict(
+            routing_key=msg['routing_key'])
+        message.ack = lambda:None
+        return callback(data, message)
+
+    def _route_matches(self, pattern, key):
+        re_pattern = (pattern
+                      .replace('.', r'\.')
+                      .replace('*', r'(?:\w+)')
+                      .replace('#', r'(?:\w+)(?:\.\w+)*'))
+        return re.match(re_pattern+'$', key)