Parent: [b939c2] (diff)

Child: [ce8e28] (diff)

Download this file

reactor.py    283 lines (258 with data), 11.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import sys
import time
import json
from multiprocessing import Process
import ming
import pylons
from bson import ObjectId
from carrot.messaging import Consumer, ConsumerSet
from . import base
class RestartableProcess(object):
def __init__(self, log, *args, **kwargs):
self._log = log
self._args, self._kwargs = args, kwargs
self.reinit()
def reinit(self):
self._process = Process(*self._args, **self._kwargs)
def check(self):
if not self.is_alive():
self._log.error('Process %d has died, restarting', self.pid)
self.reinit()
self.start()
def __getattr__(self, name):
return getattr(self._process, name)
class ReactorSetupCommand(base.Command):
summary = 'Configure the RabbitMQ queues and bindings for the given set of tools'
parser = base.Command.standard_parser(verbose=True)
def command(self):
self.basic_setup()
self.backend = pylons.g.conn.create_backend()
self.reset()
for name, tool in self.tools:
self.configure_tool(name, tool)
def reset(self):
'Tear down all queues and bindings'
be = self.backend
ch = self.backend.channel
try:
ch.exchange_delete('audit')
except: # pragma no cover
base.log.warning('Error deleting audit exchange')
self.backend = be = pylons.g.conn.create_backend()
ch = self.backend.channel
try:
ch.exchange_delete('react')
except: # pragma no cover
base.log.warning('Error deleting react exchange')
self.backend = be = pylons.g.conn.create_backend()
ch = self.backend.channel
be.exchange_declare('audit', 'topic', True, False)
be.exchange_declare('react', 'topic', True, False)
def configure_tool(self, name, tool):
base.log.info('Configuring tool %s:%s', name, tool)
be = self.backend
for method, xn, qn, keys in tool_consumers(name, tool):
if not be.queue_exists(qn):
be.queue_declare(qn, True, False, False, True)
for k in keys:
be.queue_bind(exchange=xn, queue=qn, routing_key=k)
base.log.info('... %s %s %r', xn, qn, keys)
class ReactorCommand(base.Command):
summary = 'Start up all the auditors and reactors for registered tools'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-p', '--proc', dest='proc', type='int', default=1,
help='number of worker processes to spawn')
parser.add_option('--dry_run', dest='dry_run', action='store_true', default=False,
help="get ready to run the reactor, but don't actually run it")
def command(self):
self.basic_setup()
processes = [ RestartableProcess(target=self.periodic_main, log=base.log, args=()) ]
configs = [
dict(tool_name=name,
method=method, xn=xn, qn=qn, keys=keys)
for name, tool in self.tools
for method, xn, qn, keys in tool_consumers(name, tool) ]
for x in xrange(self.options.proc):
processes.append(RestartableProcess(target=self.multi_worker_main,
log=base.log,
args=(configs,)))
continue
if self.options.dry_run: return configs
elif self.options.proc == 1:
base.log.info('Starting single reactor process')
processes[0].start()
self.multi_worker_main(configs)
else: # pragma no cover
for p in processes:
p.start()
while True:
for x in xrange(60):
time.sleep(5)
for p in processes: p.check()
base.log.info('=== Mark ===')
def multi_worker_main(self, configs):
while True:
try:
base.log.info('Entering multiqueue worker process')
cset = ConsumerSet(pylons.g.conn)
for config in configs:
c = Consumer(connection=pylons.g.conn, queue=config['qn'])
if config['xn'] == 'audit':
c.register_callback(self.route_audit(config['tool_name'], config['method']))
else:
c.register_callback(self.route_react(config['tool_name'], config['method']))
cset.add_consumer(c)
if self.options.dry_run: return
else: # pragma no cover
base.log.info('Ready to handle messages')
for x in cset.iterconsume():
pass
except Exception:
base.log.exception('AMQP error, restart in 10s')
pylons.g.amqp_reconnect()
time.sleep(10)
def periodic_main(self):
base.log.info('Entering periodic reactor')
while True:
base.M.ScheduledMessage.fire_when_ready()
time.sleep(5)
if self.options.dry_run: return
def route_audit(self, tool_name, method):
'Auditors only respond to their particluar mount point'
def callback(data, msg):
msg.ack()
try:
if 'project_id' in data:
try:
if data['project_id']:
pylons.c.project = base.M.Project.query.get(_id=ObjectId(str(data['project_id'])))
else:
pylons.c.project = None
except:
pylons.c.project = None
base.log.exception('Error looking up project %r', data['project_id'])
if pylons.c.project is None:
base.log.error('The project_id was %s but it was not found',
data['project_id'])
else:
pylons.c.project = None
if 'user_id' in data:
try:
pylons.c.user = base.M.User.query.get(
_id=data['user_id'] and ObjectId(str(data['user_id'])) or None)
except:
base.log.exception('Bad user_id: %s', data['user_id'])
mount_point = data.get('mount_point')
if pylons.c.project and mount_point is not None:
pylons.c.app = pylons.c.project.app_instance(mount_point)
base.log.debug('Setting app %s', pylons.c.app)
if getattr(method, 'im_self', ()) is None:
# Instancemethod - call, binding self
method.im_func(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function - don't bind self
method(msg.delivery_info['routing_key'], data)
except: # pragma no cover
base.log.exception(
'Exception audit handling %s: %s',
tool_name, method)
if self.options.dry_run: raise
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
ming.orm.ormsession.ThreadLocalORMSession.close_all()
return callback
def route_react(self, tool_name, method):
'All tool instances respond to the react exchange'
def callback(data, msg):
msg.ack()
try:
# log.info('React(%s): %s', msg.delivery_info['routing_key'], data)
if 'user_id' in data:
try:
pylons.c.user = base.M.User.query.get(_id=data['user_id'] and ObjectId(str(data['user_id'])) or None)
except:
base.log.exception('Bad user_id: %s', data['user_id'])
if 'project_id' in data:
try:
if data['project_id']:
pylons.c.project = base.M.Project.query.get(_id=ObjectId(str(data['project_id'])))
else:
pylons.c.project = None
except:
pylons.c.project = None
base.log.exception('Error looking up project %r', data['project_id'])
if getattr(method, 'im_self', ()) is None:
# Instancemethod - call once for each app, binding self
if not pylons.c.project:
# Can't route it, so drop
return
for cfg in pylons.c.project.app_configs:
if cfg.tool_name != tool_name: continue
pylons.c.app = pylons.c.project.app_instance(
cfg.options.mount_point)
method(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function -- just call once
method(msg.delivery_info['routing_key'], data)
except: # pragma no cover
base.log.exception('Exception react handling %s: %s', tool_name, method)
if self.options.dry_run: raise
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
ming.orm.ormsession.ThreadLocalORMSession.close_all()
return callback
class SendMessageCommand(base.Command):
min_args=3
max_args=4
usage = 'NAME <ini file> <exchange> <topic> [<json message>]'
summary = 'Send a message to a RabbitMQ exchange'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-c', '--context', dest='context',
help=('The context of the message (path to the project'
' and/or tool'))
def command(self):
from allura.lib.helpers import find_project
self.basic_setup()
exchange = self.args[1]
topic = self.args[2]
# Set the context of the message
if self.options.context:
project, rest = find_project(self.options.context)
pylons.c.project = project
if rest:
pylons.g.set_app(rest[0])
if len(self.args) > 3:
base_message = json.loads(self.args[3])
else: # pragma no cover
base_message = json.loads(sys.stdin.read())
base.log.info('Sending message to %s / %s:\n%s',
exchange, topic, base_message)
pylons.g.publish(exchange, topic, base_message)
def tool_consumers(name, tool):
from allura.lib.decorators import ConsumerDecoration
i = 0
for name in dir(tool):
method = getattr(tool, name)
deco = ConsumerDecoration.get_decoration(method, False)
if not deco: continue
if not hasattr(method, '__name__'): continue
name = '%s.%s' % (method.__module__, method.__name__)
if deco.audit_keys:
qn = '%s/%d/audit' % (name, i)
i += 1
yield method, 'audit', qn, list(deco.audit_keys)
if deco.react_keys:
qn = '%s/%d/react' % (name, i)
i += 1
yield method, 'react', qn, list(deco.react_keys)
def debug(): # pragma no cover
from IPython.ipapi import make_session; make_session()
from IPython.Debugger import Pdb
base.log.info('Entering debugger')
p = Pdb(color_scheme='Linux')
p.reset()
p.setup(sys._getframe(), None)
p.cmdloop()
p.forget()