Switch to unified view

a/Allura/allura/lib/async.py b/Allura/allura/lib/async.py
1
import re
2
import logging
1
import logging
3
from collections import defaultdict
2
from Queue import Queue
4
from contextlib import contextmanager
5
3
6
import mock
7
import kombu
4
import kombu
8
import pkg_resources
9
5
10
log = logging.getLogger(__name__)
6
log = logging.getLogger(__name__)
11
7
12
class Connection(object):
8
class Connection(object):
13
9
...
...
17
            port=port,
13
            port=port,
18
            userid=userid,
14
            userid=userid,
19
            password=password,
15
            password=password,
20
            virtual_host=vhost)
16
            virtual_host=vhost)
21
        self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
17
        self._connection_pool = self._conn_proto.Pool(preload=1, limit=None)
22
        self._exchanges = dict(
23
            audit=kombu.Exchange('audit'),
24
            react=kombu.Exchange('react'))
25
        self.reset()
18
        self.reset()
26
19
27
    def reset(self):
20
    def reset(self):
28
        self._conn = self._connection_pool.acquire()
21
        self._conn = self._connection_pool.acquire()
29
        self._channel_pool = self._conn.ChannelPool(preload=2, limit=10)
30
        self.queue = self._conn.SimpleQueue('task')
22
        self.queue = self._conn.SimpleQueue('task')
31
32
    @property
33
    def connection(self):
34
        return self._conn
35
36
    @contextmanager
37
    def channel(self):
38
        try:
39
            ch = self._channel_pool.acquire()
40
        except kombu.exceptions.ChannelLimitExceeded:
41
            log.info('Channel pool exhausted, opening a new connection')
42
            self.reset()
43
            ch = self._channel_pool.acquire()
44
        try:
45
            yield ch
46
        finally:
47
            if ch.is_open: ch.release()
48
            else: log.info('ch is not open, not returning to pool')
49
50
    @contextmanager
51
    def producer(self, xn):
52
        with self.channel() as ch:
53
            prod = kombu.Producer(ch, self._exchanges[xn], auto_declare=False)
54
            yield prod
55
56
    def clear_exchanges(self):
57
        try:
58
            with self.channel() as ch:
59
                ch.exchange_delete('audit')
60
        except:
61
            pass
62
        try:
63
            with self.channel() as ch:
64
                ch.exchange_delete('react')
65
        except:
66
            pass
67
68
    def declare_exchanges(self):
69
        self.clear_exchanges()
70
        with self.channel() as ch:
71
            ch.exchange_declare('audit', 'topic', durable=True, auto_delete=False)
72
        with self.channel() as ch:
73
            ch.exchange_declare('react', 'topic', durable=True, auto_delete=False)
74
75
    def declare_queue(self, xn, qn, keys):
76
        try:
77
            with self.channel() as ch:
78
                ch.queue_delete(qn)
79
        except:
80
            pass
81
        with self.channel() as ch:
82
            ch.queue_declare(qn, durable=True, auto_delete=False)
83
        for k in keys:
84
            with self.channel() as ch:
85
                ch.queue_bind(qn, xn, k)
86
87
    def publish(self, xn, key_msgs):
88
       '''Publish a series of messages to an exchange using a single producer
89
90
       xn: exchange name
91
       key_msgs: sequence of (routing_key, message, kwargs) pairs to be published
92
       '''
93
       with self.producer(xn) as p:
94
           for routing_key, body, kwargs in key_msgs:
95
               kwargs.setdefault('serializer', 'pickle')
96
               p.publish(body, routing_key=routing_key, **kwargs)
97
23
98
class MockAMQ(object):
24
class MockAMQ(object):
99
25
100
    def __init__(self, globals):
26
    def __init__(self, globals):
101
        assert False
102
        self.exchanges = defaultdict(list)
103
        self.queue_bindings = defaultdict(list)
104
        self.globals = globals
27
        self.globals = globals
28
        self.reset()
105
29
106
    def clear(self):
30
    def reset(self):
107
        for k,v in self.exchanges.iteritems():
31
        self.queue = Queue()
108
            v[:] = []
109
110
    def create_backend(self):
111
        return mock.Mock()
112
113
    def publish(self, xn, routing_key, message, **kw):
114
        self.exchanges[xn].append(
115
            dict(routing_key=routing_key, message=message, kw=kw))
116
117
    def pop(self, xn):
118
        return self.exchanges[xn].pop(0)
119
120
    def declare_exchanges(self):
121
        pass
122
123
    def declare_queue(self, xn, qn, keys):
124
        pass
125
126
    def setup_handlers(self, paste_registry=None):
127
        from allura.command.reactor import tool_consumers, ReactorCommand
128
        from allura.command import base
129
        from allura import model as M
130
        self.queue_bindings = defaultdict(list)
131
        base.log = logging.getLogger('allura.command')
132
        base.M = M
133
        self.tools = []
134
        for ep in pkg_resources.iter_entry_points('allura'):
135
            try:
136
                self.tools.append((ep.name, ep.load()))
137
            except ImportError:
138
                base.log.warning('Canot load entry point %s', ep)
139
        self.reactor = ReactorCommand('reactor_setup')
140
        if paste_registry:
141
            self.reactor.registry = paste_registry
142
        self.reactor.globals = self.globals
143
        self.reactor.parse_args([])
144
        for name, tool in self.tools:
145
            for method, xn, qn, keys in tool_consumers(name, tool):
146
                for k in keys:
147
                    self.queue_bindings[xn].append(
148
                        dict(key=k, tool_name=name, method=method))
149
150
    def handle(self, xn, msg=None):
151
        if msg is None:
152
            msg = self.pop(xn)
153
        for handler in self.queue_bindings[xn]:
154
            if self._route_matches(handler['key'], msg['routing_key']):
155
                self._route(xn, msg, handler['tool_name'], handler['method'])
156
157
    def handle_all(self):
158
        for xn, messages in self.exchanges.items():
159
            messages = list(messages)
160
            self.exchanges[xn][:] = []
161
            for msg in messages:
162
                self.handle(xn, msg)
163
164
    def _route(self, xn, msg, tool_name, method):
165
        if xn == 'audit':
166
            callback = self.reactor.route_audit(tool_name, method)
167
        else:
168
            callback = self.reactor.route_react(tool_name, method)
169
        data = msg['message']
170
        message = mock.Mock()
171
        message.delivery_info = dict(
172
            routing_key=msg['routing_key'])
173
        message.ack = lambda:None
174
        return callback(data, message)
175
176
    def _route_matches(self, pattern, key):
177
        re_pattern = (pattern
178
                      .replace('.', r'\.')
179
                      .replace('*', r'(?:\w+)')
180
                      .replace('#', r'(?:\w+)(?:\.\w+)*'))
181
        return re.match(re_pattern+'$', key)