Parent: [d7e14f] (diff)

Child: [1ad23b] (diff)

Download this file

reactor.py    237 lines (214 with data), 9.6 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import sys
import time
import json
from multiprocessing import Process
import ming
import pylons
from pymongo.bson import ObjectId
from carrot.messaging import Consumer, ConsumerSet
from . import base
class ReactorSetupCommand(base.Command):
summary = 'Configure the RabbitMQ queues and bindings for the given set of plugins'
parser = base.Command.standard_parser(verbose=True)
def command(self):
self.basic_setup()
self.backend = pylons.g.conn.create_backend()
self.reset()
for name, plugin in self.plugins:
self.configure_plugin(name, plugin)
def reset(self):
'Tear down all queues and bindings'
be = self.backend
ch = self.backend.channel
try:
ch.exchange_delete('audit')
except:
base.log.warning('Error deleting audit exchange')
self.backend = be = pylons.g.conn.create_backend()
ch = self.backend.channel
try:
ch.exchange_delete('react')
except:
base.log.warning('Error deleting react exchange')
self.backend = be = pylons.g.conn.create_backend()
ch = self.backend.channel
be.exchange_declare('audit', 'topic', True, False)
be.exchange_declare('react', 'topic', True, False)
def configure_plugin(self, name, plugin):
base.log.info('Configuring plugin %s:%s', name, plugin)
be = self.backend
for method, xn, qn, keys in plugin_consumers(name, plugin):
if be.queue_exists(qn):
be.channel.queue_delete(qn)
be.queue_declare(qn, True, False, False, True)
for k in keys:
be.queue_bind(exchange=xn, queue=qn, routing_key=k)
base.log.info('... %s %s %r', xn, qn, keys)
class ReactorCommand(base.Command):
summary = 'Start up all the auditors and reactors for registered plugins'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-p', '--proc', dest='proc', type='int', default=1,
help='number of worker processes to spawn')
def command(self):
self.basic_setup()
processes = [ Process(target=self.periodic_main, args=()) ]
configs = [
dict(plugin_name=name,
method=method, xn=xn, qn=qn, keys=keys)
for name, plugin in self.plugins
for method, xn, qn, keys in plugin_consumers(name, plugin) ]
for x in xrange(self.options.proc):
processes.append(Process(target=self.multi_worker_main,
args=(configs,)))
continue
processes += [
Process(target=self.worker_main, args=((name,)+ args))
for name, plugin in self.plugins
for args in plugin_consumers(name, plugin)]
for p in processes:
p.start()
while True:
time.sleep(300)
base.log.info('=== Mark ===')
def multi_worker_main(self, configs):
base.log.info('Entering multiqueue worker process')
consumers = [ ]
cset = ConsumerSet(pylons.g.conn)
for config in configs:
c = Consumer(connection=pylons.g.conn, queue=config['qn'])
if config['xn'] == 'audit':
c.register_callback(self.route_audit(config['plugin_name'], config['method']))
else:
c.register_callback(self.route_react(config['plugin_name'], config['method']))
cset.add_consumer(c)
for x in cset.iterconsume():
pass
def worker_main(self, plugin_name, method, xn, qn, keys):
base.log.info('Entering worker process: %s', qn)
consumer = Consumer(connection=pylons.g.conn, queue=qn)
if xn == 'audit':
consumer.register_callback(self.route_audit(plugin_name, method))
else:
consumer.register_callback(self.route_react(plugin_name, method))
consumer.wait()
base.log.info('Exiting worker process: %s', qn)
def periodic_main(self):
base.log.info('Entering periodic reactor')
while True:
base.M.ScheduledMessage.fire_when_ready()
time.sleep(5)
def route_audit(self, plugin_name, method):
'Auditors only respond to their particluar mount point'
def callback(data, msg):
msg.ack()
try:
project_id = data.get('project_id')
if project_id:
pylons.c.project = base.M.Project.query.get(_id=project_id)
else:
pylons.c.project = None
if pylons.c.project is None and project_id:
base.log.error('The project_id was %s but it was not found',
project_id)
if data.get('user_id'):
try:
pylons.c.user = base.M.User.query.get(_id=ObjectId.url_decode(data['user_id']))
except:
base.log.exception('Bad user_id: %s', data['user_id'])
mount_point = data.get('mount_point')
if mount_point is not None:
pylons.c.app = pylons.c.project.app_instance(mount_point)
base.log.debug('Setting app %s', pylons.c.app)
if getattr(method, 'im_self', ()) is None:
base.log.debug('im_self is None')
# Instancemethod - call, binding self
method(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function - don't bind self
method(msg.delivery_info['routing_key'], data)
except:
base.log.exception('Exception audit handling %s: %s',
plugin_name, method)
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
ming.orm.ormsession.ThreadLocalORMSession.close_all()
return callback
def route_react(self, plugin_name, method):
'All plugin instances respond to the react exchange'
def callback(data, msg):
msg.ack()
try:
# log.info('React(%s): %s', msg.delivery_info['routing_key'], data)
if data.get('user_id'):
pylons.c.user = base.M.User.query.get(_id=ObjectId.url_decode(data['user_id']))
pylons.c.project = base.M.Project.query.get(_id=data['project_id'])
if getattr(method, 'im_self', ()) is None:
# Instancemethod - call once for each app, binding self
for cfg in pylons.c.project.app_configs:
if cfg.plugin_name != plugin_name: continue
pylons.c.app = pylons.c.project.app_instance(
cfg.options.mount_point)
method(pylons.c.app, msg.delivery_info['routing_key'], data)
else:
# Classmethod or function -- just call once
method(msg.delivery_info['routing_key'], data)
except:
base.log.exception('Exception react handling %s: %s', plugin_name, method)
else:
ming.orm.ormsession.ThreadLocalORMSession.flush_all()
finally:
ming.orm.ormsession.ThreadLocalORMSession.close_all()
return callback
class SendMessageCommand(base.Command):
min_args=3
max_args=4
usage = 'NAME <ini file> <exchange> <topic> [<json message>]'
summary = 'Send a message to a RabbitMQ exchange'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-c', '--context', dest='context',
help=('The context of the message (path to the project'
' and/or plugin'))
def command(self):
from pyforge.lib.helpers import find_project
self.basic_setup()
exchange = self.args[1]
topic = self.args[2]
# Set the context of the message
if self.options.context:
project, rest = find_project(self.options.context.split('/'))
pylons.c.project = project
if rest:
pylons.g.set_app(rest[0])
if len(self.args) > 3:
base_message = json.loads(self.args[3])
else:
base_message = json.loads(sys.stdin.read())
base.log.info('Sending message to %s / %s:\n%s',
exchange, topic, base_message)
pylons.g.publish(exchange, topic, base_message)
def plugin_consumers(name, plugin):
from pyforge.lib.decorators import ConsumerDecoration
i = 0
for name in dir(plugin):
method = getattr(plugin, name)
deco = ConsumerDecoration.get_decoration(method, False)
if not deco: continue
name = '%s.%s' % (method.__module__, method.__name__)
if deco.audit_keys:
qn = '%s/%d/audit' % (name, i)
i += 1
yield method, 'audit', qn, list(deco.audit_keys)
if deco.react_keys:
qn = '%s/%d/react' % (name, i)
i += 1
yield method, 'react', qn, list(deco.react_keys)
def debug():
from IPython.ipapi import make_session; make_session()
from IPython.Debugger import Pdb
base.log.info('Entering debugger')
p = Pdb(color_scheme='Linux')
p.reset()
p.setup(sys._getframe(), None)
p.cmdloop()
p.forget()