--- a/Allura/allura/model/monq_model.py
+++ b/Allura/allura/model/monq_model.py
@@ -17,6 +17,25 @@
log = logging.getLogger(__name__)
class MonQTask(MappedClass):
+ '''Task to be executed by the taskd daemon.
+
+ Properties
+
+ - _id - bson.ObjectId() for this task
+ - state - 'ready', 'busy', 'error', or 'complete' task status
+ - priority - integer priority, higher is more priority
+ - result_type - either 'keep' or 'forget', what to do with the task when
+ it's done
+ - time_queue - time the task was queued
+ - time_start - time taskd began working on the task
+ - time_stop - time taskd stopped working on the task
+ - task_name - full dotted name of the task function to run
+ - process - identifier for which taskd process is working on the task
+ - context - values used to set c.project, c.app, c.user for the task
+ - args - *args to be sent to the task function
+ - kwargs - **kwargs to be sent to the task function
+ - result - if the task is complete, the return value. If in error, the traceback.
+ '''
states = ('ready', 'busy', 'error', 'complete')
result_types = ('keep', 'forget')
class __mongometa__:
@@ -58,6 +77,7 @@
@LazyProperty
def function(self):
+ '''The function that is called by this task'''
smod, sfunc = self.task_name.rsplit('.', 1)
cur = __import__(smod, fromlist=[sfunc])
return getattr(cur, sfunc)
@@ -69,6 +89,7 @@
kwargs=None,
result_type='forget',
priority=10):
+ '''Create a new task object based on the current context.'''
if args is None: args = ()
if kwargs is None: kwargs = {}
task_name = '%s.%s' % (
@@ -101,6 +122,11 @@
@classmethod
def get(cls, process='worker', state='ready', waitfunc=None):
+ '''Get the highest-priority, oldest, ready task and lock it to the
+ current process. If no task is available and waitfunc is supplied, call
+ the waitfunc before trying to get the task again. If waitfunc is None
+ and no tasks are available, return None.
+ '''
sort = [
('priority', ming.DESCENDING),
('time_queue', ming.ASCENDING)]
@@ -125,15 +151,16 @@
@classmethod
def timeout_tasks(cls, older_than):
+ '''Mark all busy tasks older than a certain datetime as 'ready' again.
+ Used to retry 'stuck' tasks.'''
spec = dict(state='busy')
spec['time_start'] = {'$lt':older_than}
cls.query.update(spec, {'$set': dict(state='ready')}, multi=True)
@classmethod
- def clear_complete(cls, older_than=None):
- spec = dict(state='busy')
- if older_than:
- spec['timestamp'] = {'$lt':older_than}
+ def clear_complete(cls):
+ '''Delete the task objects for complete tasks'''
+ spec = dict(state='complete')
cls.query.remove(spec)
@classmethod
@@ -146,6 +173,10 @@
return i
def __call__(self, restore_context=True):
+ '''Call the task function with its context. If restore_context is True,
+ c.project/app/user will be restored to the values they had before this
+ function was called.
+ '''
from allura import model as M
self.time_start = datetime.utcnow()
session(self).flush(self)
@@ -183,6 +214,7 @@
c.user = old_cuser
def join(self, poll_interval=0.1):
+ '''Wait until this task is either complete or errors out, then return the result.'''
while self.state not in ('complete', 'error'):
time.sleep(poll_interval)
self.query.find(dict(_id=self._id), refresh=True).first()
@@ -190,5 +222,6 @@
@classmethod
def list(cls, state='ready'):
+ '''Print all tasks of a certain status to sys.stdout. Used for debugging.'''
for t in cls.query.find(dict(state=state)):
sys.stdout.write('%r\n' % t)