|
a/Allura/allura/lib/async.py |
|
b/Allura/allura/lib/async.py |
|
... |
|
... |
139 |
for method, xn, qn, keys in tool_consumers(name, tool):
|
139 |
for method, xn, qn, keys in tool_consumers(name, tool):
|
140 |
for k in keys:
|
140 |
for k in keys:
|
141 |
self.queue_bindings[xn].append(
|
141 |
self.queue_bindings[xn].append(
|
142 |
dict(key=k, tool_name=name, method=method))
|
142 |
dict(key=k, tool_name=name, method=method))
|
143 |
|
143 |
|
144 |
def handle(self, xn):
|
144 |
def handle(self, xn, msg=None):
|
|
|
145 |
if msg is None:
|
145 |
msg = self.pop(xn)
|
146 |
msg = self.pop(xn)
|
146 |
for handler in self.queue_bindings[xn]:
|
147 |
for handler in self.queue_bindings[xn]:
|
147 |
if self._route_matches(handler['key'], msg['routing_key']):
|
148 |
if self._route_matches(handler['key'], msg['routing_key']):
|
148 |
self._route(xn, msg, handler['tool_name'], handler['method'])
|
149 |
self._route(xn, msg, handler['tool_name'], handler['method'])
|
149 |
|
150 |
|
150 |
def handle_all(self):
|
151 |
def handle_all(self):
|
151 |
for xn, messages in self.exchanges.items():
|
152 |
for xn, messages in self.exchanges.items():
|
|
|
153 |
messages = list(messages)
|
|
|
154 |
self.exchanges[xn][:] = []
|
152 |
while messages:
|
155 |
for msg in messages:
|
153 |
self.handle(xn)
|
156 |
self.handle(xn, msg)
|
154 |
|
157 |
|
155 |
def _route(self, xn, msg, tool_name, method):
|
158 |
def _route(self, xn, msg, tool_name, method):
|
156 |
if xn == 'audit':
|
159 |
if xn == 'audit':
|
157 |
callback = self.reactor.route_audit(tool_name, method)
|
160 |
callback = self.reactor.route_audit(tool_name, method)
|
158 |
else:
|
161 |
else:
|