--- a/Allura/allura/model/monq_model.py
+++ b/Allura/allura/model/monq_model.py
@@ -6,11 +6,13 @@
import bson
import pymongo
-from pylons import c, g
+from pylons import c
import ming
from ming import schema as S
from ming.orm import session, MappedClass, FieldProperty
+
+from allura.lib import helpers as h
from .session import main_orm_session
@@ -42,7 +44,8 @@
project_id=S.ObjectId,
app_config_id=S.ObjectId,
user_id=S.ObjectId))
- data = FieldProperty(None, if_missing=None)
+ args = FieldProperty([])
+ kwargs = FieldProperty({})
result = FieldProperty(None, if_missing=None)
def __repr__(self):
@@ -57,7 +60,8 @@
@classmethod
def post(cls,
function,
- data=None,
+ args=None,
+ kwargs=None,
task_name=None,
result_type='forget',
priority=10):
@@ -81,12 +85,11 @@
result_type=result_type,
task_name=task_name,
function=bson.Binary(dumps(function)),
- data=data,
+ args=args,
+ kwargs=kwargs,
process=None,
result=None,
context=context)
- session(obj).flush(obj)
- g.amq_conn.queue.put(dict(_id=obj._id), serializer='pickle')
return obj
@classmethod
@@ -124,9 +127,19 @@
spec['timestamp'] = {'$lt':older_than}
cls.query.remove(spec)
+ @classmethod
+ def run_ready(cls, worker=None):
+ '''Run all the tasks that are currently ready'''
+ for task in cls.query.find(dict(state='ready')).all():
+ task.process = worker
+ task()
+
def __call__(self):
from allura import model as M
log.info('%r', self)
+ old_cproject = c.project
+ old_capp = c.app
+ old_cuser = c.user
try:
func = loads(self.function)
if self.context.project_id:
@@ -136,13 +149,17 @@
c.app = c.project.app_instance(app_config)
if self.context.user_id:
c.user = M.User.query.get(_id=self.context.user_id)
- self.result = func(**self.data)
+ self.result = func(*self.args, **self.kwargs)
self.state = 'complete'
return self.result
except Exception:
log.exception('%r', self)
self.state = 'error'
self.result = traceback.format_exc()
+ finally:
+ c.project = old_cproject
+ c.app = old_capp
+ c.user = old_cuser
def join(self, poll_interval=0.1):
while self.state not in ('complete', 'error'):