Switch to side-by-side view

--- a/Allura/allura/model/monq_model.py
+++ b/Allura/allura/model/monq_model.py
@@ -6,11 +6,13 @@
 
 import bson
 import pymongo
-from pylons import c, g
+from pylons import c
 
 import ming
 from ming import schema as S
 from ming.orm import session, MappedClass, FieldProperty
+
+from allura.lib import helpers as h
 
 from .session import main_orm_session
 
@@ -42,7 +44,8 @@
             project_id=S.ObjectId,
             app_config_id=S.ObjectId,
             user_id=S.ObjectId))
-    data = FieldProperty(None, if_missing=None)
+    args = FieldProperty([])
+    kwargs = FieldProperty({})
     result = FieldProperty(None, if_missing=None)
 
     def __repr__(self):
@@ -57,7 +60,8 @@
     @classmethod
     def post(cls,
              function,
-             data=None,
+             args=None,
+             kwargs=None,
              task_name=None,
              result_type='forget',
              priority=10):
@@ -81,12 +85,11 @@
             result_type=result_type,
             task_name=task_name,
             function=bson.Binary(dumps(function)),
-            data=data,
+            args=args,
+            kwargs=kwargs,
             process=None,
             result=None,
             context=context)
-        session(obj).flush(obj)
-        g.amq_conn.queue.put(dict(_id=obj._id), serializer='pickle')
         return obj
 
     @classmethod
@@ -124,9 +127,19 @@
             spec['timestamp'] = {'$lt':older_than}
         cls.query.remove(spec)
 
+    @classmethod
+    def run_ready(cls, worker=None):
+        '''Run all the tasks that are currently ready'''
+        for task in cls.query.find(dict(state='ready')).all():
+            task.process = worker
+            task()
+
     def __call__(self):
         from allura import model as M
         log.info('%r', self)
+        old_cproject = c.project
+        old_capp = c.app
+        old_cuser = c.user
         try:
             func = loads(self.function)
             if self.context.project_id:
@@ -136,13 +149,17 @@
                 c.app = c.project.app_instance(app_config)
             if self.context.user_id:
                 c.user = M.User.query.get(_id=self.context.user_id)
-            self.result = func(**self.data)
+            self.result = func(*self.args, **self.kwargs)
             self.state = 'complete'
             return self.result
         except Exception:
             log.exception('%r', self)
             self.state = 'error'
             self.result = traceback.format_exc()
+        finally:
+            c.project = old_cproject
+            c.app = old_capp
+            c.user = old_cuser
 
     def join(self, poll_interval=0.1):
         while self.state not in ('complete', 'error'):