Child: [7f4ce4] (diff)

Download this file

taskd.py    136 lines (121 with data), 4.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import time
import Queue
from datetime import datetime, timedelta
import faulthandler
import pylons
from paste.deploy import loadapp
from paste.deploy.converters import asint
from webob import Request
import base
faulthandler.enable()
class TaskdCommand(base.Command):
summary = 'Task server'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-p', '--proc', dest='proc', type='int', default=1,
help='number of worker processes to spawn')
parser.add_option('--dry_run', dest='dry_run', action='store_true', default=False,
help="get ready to run the task daemon, but don't actually run it")
def command(self):
self.basic_setup()
processes = [ ]
for x in xrange(self.options.proc):
processes.append(base.RestartableProcess(target=self.worker, log=base.log, ))
if self.options.dry_run: return
elif self.options.proc == 1:
base.log.info('Starting single taskd process')
self.worker()
else: # pragma no cover
for p in processes:
p.start()
while True:
for x in xrange(60):
time.sleep(5)
for p in processes: p.check()
base.log.info('=== Mark ===')
def worker(self):
from allura import model as M
name = '%s pid %s' % (os.uname()[1], os.getpid())
if self.options.dry_run: return
wsgi_app = loadapp('config:%s#task' % self.args[0],relative_to=os.getcwd())
poll_interval = asint(pylons.config.get('monq.poll_interval', 10))
def start_response(status, headers, exc_info=None):
pass
def waitfunc_amqp():
try:
return pylons.g.amq_conn.queue.get(timeout=poll_interval)
except Queue.Empty:
return None
def waitfunc_noq():
time.sleep(poll_interval)
if pylons.g.amq_conn:
waitfunc = waitfunc_amqp
else:
waitfunc = waitfunc_noq
while True:
if pylons.g.amq_conn:
pylons.g.amq_conn.reset()
try:
while True:
task = M.MonQTask.get(process=name, waitfunc=waitfunc)
# Build the (fake) request
r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
list(wsgi_app(r.environ, start_response))
except Exception:
base.log.exception('Taskd, restart in 10s')
time.sleep(10)
class TaskCommand(base.Command):
summary = 'Task command'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-s', '--state', dest='state', default='ready',
help='state of processes to examine')
parser.add_option('-t', '--timeout', dest='timeout', type=int, default=60,
help='timeout (in seconds) for busy tasks')
min_args = 2
max_args = None
usage = '<ini file> [list|retry|purge|timeout|commit]'
def command(self):
self.basic_setup()
cmd = self.args[1]
tab = dict(
list=self._list,
retry=self._retry,
purge=self._purge,
timeout=self._timeout,
commit=self._commit)
tab[cmd]()
def _list(self):
'''List tasks'''
from allura import model as M
base.log.info('Listing tasks of state %s', self.options.state)
if self.options.state == '*':
q = dict()
else:
q = dict(state=self.options.state)
for t in M.MonQTask.query.find(q):
print t
def _retry(self):
'''Retry tasks in an error state'''
from allura import model as M
base.log.info('Retry tasks in error state')
M.MonQTask.query.update(
dict(state='error'),
{'$set': dict(state='ready')},
multi=True)
def _purge(self):
'''Purge completed tasks'''
from allura import model as M
base.log.info('Purge complete/forget tasks')
M.MonQTask.query.remove(
dict(state='complete', result_type='forget'))
def _timeout(self):
'''Reset tasks that have been busy too long to 'ready' state'''
from allura import model as M
base.log.info('Reset tasks stuck for %ss or more', self.options.timeout)
cutoff = datetime.utcnow() - timedelta(seconds=self.options.timeout)
M.MonQTask.timeout_tasks(cutoff)
def _commit(self):
'''Schedule a SOLR commit'''
from allura.tasks import index_tasks
base.log.info('Commit to solr')
index_tasks.commit.post()