|
a/Allura/allura/lib/async.py |
|
b/Allura/allura/lib/async.py |
1 |
import re
|
|
|
2 |
import logging
|
1 |
import logging
|
3 |
from collections import defaultdict
|
2 |
from Queue import Queue
|
4 |
from contextlib import contextmanager
|
|
|
5 |
|
3 |
|
6 |
import mock
|
|
|
7 |
import kombu
|
4 |
import kombu
|
8 |
import pkg_resources
|
|
|
9 |
|
5 |
|
10 |
log = logging.getLogger(__name__)
|
6 |
log = logging.getLogger(__name__)
|
11 |
|
7 |
|
12 |
class Connection(object):
|
8 |
class Connection(object):
|
13 |
|
9 |
|
|
... |
|
... |
17 |
port=port,
|
13 |
port=port,
|
18 |
userid=userid,
|
14 |
userid=userid,
|
19 |
password=password,
|
15 |
password=password,
|
20 |
virtual_host=vhost)
|
16 |
virtual_host=vhost)
|
21 |
self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
|
17 |
self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
|
22 |
self._exchanges = dict(
|
|
|
23 |
audit=kombu.Exchange('audit'),
|
|
|
24 |
react=kombu.Exchange('react'))
|
|
|
25 |
self.reset()
|
18 |
self.reset()
|
26 |
|
19 |
|
27 |
def reset(self):
|
20 |
def reset(self):
|
28 |
self._conn = self._connection_pool.acquire()
|
21 |
self._conn = self._connection_pool.acquire()
|
29 |
self._channel_pool = self._conn.ChannelPool(preload=2, limit=10)
|
|
|
30 |
self.queue = self._conn.SimpleQueue('task')
|
22 |
self.queue = self._conn.SimpleQueue('task')
|
31 |
|
|
|
32 |
@property
|
|
|
33 |
def connection(self):
|
|
|
34 |
return self._conn
|
|
|
35 |
|
|
|
36 |
@contextmanager
|
|
|
37 |
def channel(self):
|
|
|
38 |
try:
|
|
|
39 |
ch = self._channel_pool.acquire()
|
|
|
40 |
except kombu.exceptions.ChannelLimitExceeded:
|
|
|
41 |
log.info('Channel pool exhausted, opening a new connection')
|
|
|
42 |
self.reset()
|
|
|
43 |
ch = self._channel_pool.acquire()
|
|
|
44 |
try:
|
|
|
45 |
yield ch
|
|
|
46 |
finally:
|
|
|
47 |
if ch.is_open: ch.release()
|
|
|
48 |
else: log.info('ch is not open, not returning to pool')
|
|
|
49 |
|
|
|
50 |
@contextmanager
|
|
|
51 |
def producer(self, xn):
|
|
|
52 |
with self.channel() as ch:
|
|
|
53 |
prod = kombu.Producer(ch, self._exchanges[xn], auto_declare=False)
|
|
|
54 |
yield prod
|
|
|
55 |
|
|
|
56 |
def clear_exchanges(self):
|
|
|
57 |
try:
|
|
|
58 |
with self.channel() as ch:
|
|
|
59 |
ch.exchange_delete('audit')
|
|
|
60 |
except:
|
|
|
61 |
pass
|
|
|
62 |
try:
|
|
|
63 |
with self.channel() as ch:
|
|
|
64 |
ch.exchange_delete('react')
|
|
|
65 |
except:
|
|
|
66 |
pass
|
|
|
67 |
|
|
|
68 |
def declare_exchanges(self):
|
|
|
69 |
self.clear_exchanges()
|
|
|
70 |
with self.channel() as ch:
|
|
|
71 |
ch.exchange_declare('audit', 'topic', durable=True, auto_delete=False)
|
|
|
72 |
with self.channel() as ch:
|
|
|
73 |
ch.exchange_declare('react', 'topic', durable=True, auto_delete=False)
|
|
|
74 |
|
|
|
75 |
def declare_queue(self, xn, qn, keys):
|
|
|
76 |
try:
|
|
|
77 |
with self.channel() as ch:
|
|
|
78 |
ch.queue_delete(qn)
|
|
|
79 |
except:
|
|
|
80 |
pass
|
|
|
81 |
with self.channel() as ch:
|
|
|
82 |
ch.queue_declare(qn, durable=True, auto_delete=False)
|
|
|
83 |
for k in keys:
|
|
|
84 |
with self.channel() as ch:
|
|
|
85 |
ch.queue_bind(qn, xn, k)
|
|
|
86 |
|
|
|
87 |
def publish(self, xn, key_msgs):
|
|
|
88 |
'''Publish a series of messages to an exchange using a single producer
|
|
|
89 |
|
|
|
90 |
xn: exchange name
|
|
|
91 |
key_msgs: sequence of (routing_key, message, kwargs) pairs to be published
|
|
|
92 |
'''
|
|
|
93 |
with self.producer(xn) as p:
|
|
|
94 |
for routing_key, body, kwargs in key_msgs:
|
|
|
95 |
kwargs.setdefault('serializer', 'pickle')
|
|
|
96 |
p.publish(body, routing_key=routing_key, **kwargs)
|
|
|
97 |
|
23 |
|
98 |
class MockAMQ(object):
|
24 |
class MockAMQ(object):
|
99 |
|
25 |
|
100 |
def __init__(self, globals):
|
26 |
def __init__(self, globals):
|
101 |
assert False
|
|
|
102 |
self.exchanges = defaultdict(list)
|
|
|
103 |
self.queue_bindings = defaultdict(list)
|
|
|
104 |
self.globals = globals
|
27 |
self.globals = globals
|
|
|
28 |
self.reset()
|
105 |
|
29 |
|
106 |
def clear(self):
|
30 |
def reset(self):
|
107 |
for k,v in self.exchanges.iteritems():
|
31 |
self.queue = Queue()
|
108 |
v[:] = []
|
|
|
109 |
|
|
|
110 |
def create_backend(self):
|
|
|
111 |
return mock.Mock()
|
|
|
112 |
|
|
|
113 |
def publish(self, xn, routing_key, message, **kw):
|
|
|
114 |
self.exchanges[xn].append(
|
|
|
115 |
dict(routing_key=routing_key, message=message, kw=kw))
|
|
|
116 |
|
|
|
117 |
def pop(self, xn):
|
|
|
118 |
return self.exchanges[xn].pop(0)
|
|
|
119 |
|
|
|
120 |
def declare_exchanges(self):
|
|
|
121 |
pass
|
|
|
122 |
|
|
|
123 |
def declare_queue(self, xn, qn, keys):
|
|
|
124 |
pass
|
|
|
125 |
|
|
|
126 |
def setup_handlers(self, paste_registry=None):
|
|
|
127 |
from allura.command.reactor import tool_consumers, ReactorCommand
|
|
|
128 |
from allura.command import base
|
|
|
129 |
from allura import model as M
|
|
|
130 |
self.queue_bindings = defaultdict(list)
|
|
|
131 |
base.log = logging.getLogger('allura.command')
|
|
|
132 |
base.M = M
|
|
|
133 |
self.tools = []
|
|
|
134 |
for ep in pkg_resources.iter_entry_points('allura'):
|
|
|
135 |
try:
|
|
|
136 |
self.tools.append((ep.name, ep.load()))
|
|
|
137 |
except ImportError:
|
|
|
138 |
base.log.warning('Canot load entry point %s', ep)
|
|
|
139 |
self.reactor = ReactorCommand('reactor_setup')
|
|
|
140 |
if paste_registry:
|
|
|
141 |
self.reactor.registry = paste_registry
|
|
|
142 |
self.reactor.globals = self.globals
|
|
|
143 |
self.reactor.parse_args([])
|
|
|
144 |
for name, tool in self.tools:
|
|
|
145 |
for method, xn, qn, keys in tool_consumers(name, tool):
|
|
|
146 |
for k in keys:
|
|
|
147 |
self.queue_bindings[xn].append(
|
|
|
148 |
dict(key=k, tool_name=name, method=method))
|
|
|
149 |
|
|
|
150 |
def handle(self, xn, msg=None):
|
|
|
151 |
if msg is None:
|
|
|
152 |
msg = self.pop(xn)
|
|
|
153 |
for handler in self.queue_bindings[xn]:
|
|
|
154 |
if self._route_matches(handler['key'], msg['routing_key']):
|
|
|
155 |
self._route(xn, msg, handler['tool_name'], handler['method'])
|
|
|
156 |
|
|
|
157 |
def handle_all(self):
|
|
|
158 |
for xn, messages in self.exchanges.items():
|
|
|
159 |
messages = list(messages)
|
|
|
160 |
self.exchanges[xn][:] = []
|
|
|
161 |
for msg in messages:
|
|
|
162 |
self.handle(xn, msg)
|
|
|
163 |
|
|
|
164 |
def _route(self, xn, msg, tool_name, method):
|
|
|
165 |
if xn == 'audit':
|
|
|
166 |
callback = self.reactor.route_audit(tool_name, method)
|
|
|
167 |
else:
|
|
|
168 |
callback = self.reactor.route_react(tool_name, method)
|
|
|
169 |
data = msg['message']
|
|
|
170 |
message = mock.Mock()
|
|
|
171 |
message.delivery_info = dict(
|
|
|
172 |
routing_key=msg['routing_key'])
|
|
|
173 |
message.ack = lambda:None
|
|
|
174 |
return callback(data, message)
|
|
|
175 |
|
|
|
176 |
def _route_matches(self, pattern, key):
|
|
|
177 |
re_pattern = (pattern
|
|
|
178 |
.replace('.', r'\.')
|
|
|
179 |
.replace('*', r'(?:\w+)')
|
|
|
180 |
.replace('#', r'(?:\w+)(?:\.\w+)*'))
|
|
|
181 |
return re.match(re_pattern+'$', key)
|
|
|