message_bus.rst 170 lines (137 with data), 8.0 kB
Guide to the PyForge event system
Our event system is driven by RabbitMQ and a whole message bus framework, most of which you can ignore, because we've simplified it down to two kinds of event listeners, auditors and reactors.
Glossary
Before we get into the details perhaps a few definitions are in order:
- app -- plugin for pyforge such as the tracker, scm, or wiki apps
- message event -- any incoming message that's added to one of the amqp queues
- auditor -- callable defined in a app that gets called on messages BEFORE the changes are persisted to mongo
- reactor -- callable defined in an app that gets called on message events AFTER the message has been persisted to mongo
- queue -- durable round robin style centralized queue with one or more consumers
- app_exchange -- durable queue for messages to app_queues
- reactor_exchange -- durable queue for messages to reactor_queues
- reactor_listener -- should not put things back on the queue and particularly not on the same queue, because that can cause infinite loops.
- app_queue -- queue defined by an app plugin, has a single queue instance for the system.
- app_listeners -- consumers for app_queues, must route appropriately to project/subproject/app instance based on message contents, and call all appropriate auditors, then persisting to mongodb.
- consumer -- workers that listen to a specific queue and managing calling the appropriate auditors, persisting changes to the DB (if required), and calling the appropriate reactors.
System Overview
PyForge uses carrot as the Python library to connect to AMQP (and possibly STOMP for async browser notifications in the future). We will be using a single _durable,_topic_ exchange 'exchange'.
Carrot provides Publisher and Consumer classes. Each consumer defines a queue and a (single) routing key pattern. Messages are distributed to only _one_ consumer on each queue, even if multiple consumers are listening. When a publisher publishes a message, it specifies a routing key for that message. The exchange then delivers a copy of the message to every queue whose routing key pattern matches the routing key for the message.
Since it would not be feasible to register a queue for each project, we will be registering one or more queues per app. Each app defines a list of (routing pattern, callback function) pairs for the routing patterns it is interested in. Queues will be named according to the app that registers them (a wiki's queues might be wiki_0, wiki_1, etc.).
The PyForge system automatically creates new consumer processes for you, so you don't have to worry about any of that. The way it works is basically to :
- Iterate over each plugin P installed on the system and
- For each (pattern, callback) pair (p,c) in P:
- Start a worker process which will construct one consumer for a new queue and register for the routing pattern p and the callback function c
The callback function itself is responsible for inspecting the message to determine which project generated the message and whether any other actions need to be taken.
Routing Keys
Routing keys refer to the _topic_ of a message, not necessarily to its _destination_. In this way, we decouple the producers and consumers of messages. A routing key will generally have the form _source.topic_. For instance, a message may be generated by the scm plugin with the routing key scm.commit. The message body would identify the project and particular SCM instance that _generated_ the message. The tracker plugin would then have a listener on the scm.# routing pattern and would be invoked on each SCM commit.
- Note that in the case of email messages, the email address does _not_
- directly map to the routing key; the particular project/plugin that is the destination of the email message is encoded into the message body itself.
How-to
We will instantiate two AMQP exchanges, 'audit' and 'react'. Each plugin may register consumers on these exchanges.
Auditing
The audit exchange is typically used to request a particular plugin instance in a particular project to perform some action (change the state of a ticket, index an object in SOLR, add a comment to a wiki page or ticket, etc.) To register a consumer on the auditor exchange, decorate a method in the application class with the @audit decorator, specifying which routing keys you are interested in (these decorators can be stacked). For instance, to audit messages destined for the hello_forge plugin, you might write:
@audit('hello_forge.#') def auditor(self, routing_key, data): # do some stuff
When a message is received on the 'audit' exchange with the 'hello_forge.comment' routing key, the worker process will inspect the message for _project_id_ and _mount_point_ fields and use these to populate the pylons context object 'c' with the current project and plugin application instance. The auditor method is then called with the application instance and the routing key and payload from the message.
Reacting
The react exchange is typically used to notify all the interested plugins in a project of changes in a particular plugin. For instance, when a commit is made to the SCM plugin, a notification message would be sent to the 'react' exchange to allow, for instance, a ticket noted in the commit message to be linked to the commit object. To register a consumer for the reactor exchange, decorate a method in the application class with the @react decorator, specifying which routing keys you are interested in. (Like the @audit decorator, these decorators can be stacked). For instance, to react to notifications of wiki comments, you might write:
@react('wiki.#') def reactor(self, routing_key, data); # do some stuff
When a message is received on the react exchange with the wiki.comment routing key, the worker process will inspect the message for the _project_id_ field and use this to populate the pylons context object with the current project. It will then cycle through _all_instances_ of the plugin for the given project, setting the c.app context and invoking the reactor method with that instance, the routing key, and the payload from the message. This allows each instance to decide what action to take in response to the message.
Decorating Class Methods
In the above description, the consumer methods were always called in the context of a particular application instance. If you wish the consumer to be called as a class method (and to be called only once in the case of @react), simply use the @audit and @react decorators on class methods.
Configuring the Queues and Running the Reactor Workers
In order to configure the queues, we have written a paster command reactor_setup. This command will tear down any existing audit and react exchanges and re-create them. It then creates one queue for each consumer method defined in all installed plugins and binds these queues appropriately to their exchanges. To actually run the reactor workers, we have written a paster command reactor which creates a worker process for each queue.