|
a/Allura/allura/command/taskd.py |
|
b/Allura/allura/command/taskd.py |
1 |
import os
|
1 |
import os
|
2 |
import time
|
2 |
import time
|
3 |
import Queue
|
3 |
import Queue
|
4 |
from datetime import datetime, timedelta
|
4 |
from datetime import datetime, timedelta
|
|
|
5 |
import signal
|
|
|
6 |
import sys
|
5 |
|
7 |
|
6 |
import faulthandler
|
8 |
import faulthandler
|
7 |
import pylons
|
9 |
import pylons
|
8 |
from paste.deploy import loadapp
|
10 |
from paste.deploy import loadapp
|
9 |
from paste.deploy.converters import asint
|
11 |
from paste.deploy.converters import asint
|
|
... |
|
... |
22 |
parser.add_option('--exclude', dest='exclude', type='string', default=None,
|
24 |
parser.add_option('--exclude', dest='exclude', type='string', default=None,
|
23 |
help='never handle tasks of the given name(s) (can be comma-separated list)')
|
25 |
help='never handle tasks of the given name(s) (can be comma-separated list)')
|
24 |
|
26 |
|
25 |
def command(self):
|
27 |
def command(self):
|
26 |
self.basic_setup()
|
28 |
self.basic_setup()
|
27 |
base.log.info('Starting single taskd process')
|
29 |
self.keep_running = True
|
|
|
30 |
self.restart_when_done = False
|
|
|
31 |
base.log.info('Starting taskd, pid %s' % os.getpid())
|
|
|
32 |
signal.signal(signal.SIGUSR1, self.graceful_restart)
|
|
|
33 |
signal.signal(signal.SIGUSR2, self.graceful_stop)
|
28 |
self.worker()
|
34 |
self.worker()
|
|
|
35 |
|
|
|
36 |
def graceful_restart(self, signum, frame):
|
|
|
37 |
base.log.info('taskd pid %s recieved signal %s preparing to do a graceful restart' % (os.getpid(), signum))
|
|
|
38 |
self.keep_running = False
|
|
|
39 |
self.restart_when_done = True
|
|
|
40 |
|
|
|
41 |
def graceful_stop(self, signum, frame):
|
|
|
42 |
base.log.info('taskd pid %s recieved signal %s preparing to do a graceful stop' % (os.getpid(), signum))
|
|
|
43 |
self.keep_running = False
|
29 |
|
44 |
|
30 |
def worker(self):
|
45 |
def worker(self):
|
31 |
from allura import model as M
|
46 |
from allura import model as M
|
32 |
name = '%s pid %s' % (os.uname()[1], os.getpid())
|
47 |
name = '%s pid %s' % (os.uname()[1], os.getpid())
|
33 |
wsgi_app = loadapp('config:%s#task' % self.args[0],relative_to=os.getcwd())
|
48 |
wsgi_app = loadapp('config:%s#task' % self.args[0],relative_to=os.getcwd())
|
|
... |
|
... |
36 |
if only:
|
51 |
if only:
|
37 |
only = only.split(',')
|
52 |
only = only.split(',')
|
38 |
exclude = self.options.exclude
|
53 |
exclude = self.options.exclude
|
39 |
if exclude:
|
54 |
if exclude:
|
40 |
exclude = exclude.split(',')
|
55 |
exclude = exclude.split(',')
|
|
|
56 |
|
41 |
def start_response(status, headers, exc_info=None):
|
57 |
def start_response(status, headers, exc_info=None):
|
42 |
pass
|
58 |
pass
|
|
|
59 |
|
43 |
def waitfunc_amqp():
|
60 |
def waitfunc_amqp():
|
44 |
try:
|
61 |
try:
|
45 |
return pylons.g.amq_conn.queue.get(timeout=poll_interval)
|
62 |
return pylons.g.amq_conn.queue.get(timeout=poll_interval)
|
46 |
except Queue.Empty:
|
63 |
except Queue.Empty:
|
47 |
return None
|
64 |
return None
|
|
|
65 |
|
48 |
def waitfunc_noq():
|
66 |
def waitfunc_noq():
|
49 |
time.sleep(poll_interval)
|
67 |
time.sleep(poll_interval)
|
|
|
68 |
|
|
|
69 |
def check_running(func):
|
|
|
70 |
if self.keep_running:
|
|
|
71 |
return func()
|
|
|
72 |
else:
|
|
|
73 |
return None
|
|
|
74 |
|
50 |
if pylons.g.amq_conn:
|
75 |
if pylons.g.amq_conn:
|
51 |
waitfunc = waitfunc_amqp
|
76 |
waitfunc = waitfunc_amqp
|
52 |
else:
|
77 |
else:
|
53 |
waitfunc = waitfunc_noq
|
78 |
waitfunc = waitfunc_noq
|
54 |
while True:
|
79 |
waitfunc = check_running(waitfunc)
|
|
|
80 |
while self.keep_running:
|
55 |
if pylons.g.amq_conn:
|
81 |
if pylons.g.amq_conn:
|
56 |
pylons.g.amq_conn.reset()
|
82 |
pylons.g.amq_conn.reset()
|
57 |
try:
|
83 |
try:
|
58 |
while True:
|
84 |
while self.keep_running:
|
59 |
task = M.MonQTask.get(
|
85 |
task = M.MonQTask.get(
|
60 |
process=name,
|
86 |
process=name,
|
61 |
waitfunc=waitfunc,
|
87 |
waitfunc=waitfunc,
|
62 |
only=only,
|
88 |
only=only,
|
63 |
exclude=exclude)
|
89 |
exclude=exclude)
|
|
|
90 |
if task:
|
64 |
# Build the (fake) request
|
91 |
# Build the (fake) request
|
65 |
r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
|
92 |
r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
|
66 |
list(wsgi_app(r.environ, start_response))
|
93 |
list(wsgi_app(r.environ, start_response))
|
67 |
except Exception:
|
94 |
except Exception:
|
68 |
base.log.exception('Taskd, restart in 10s')
|
95 |
base.log.exception('taskd error; pausing for 10s before taking more tasks')
|
69 |
time.sleep(10)
|
96 |
time.sleep(10)
|
|
|
97 |
base.log.info('taskd pid %s stopping gracefully.' % os.getpid())
|
|
|
98 |
|
|
|
99 |
if self.restart_when_done:
|
|
|
100 |
base.log.info('taskd pid %s restarting itself' % os.getpid())
|
|
|
101 |
os.execv(sys.argv[0], sys.argv)
|
70 |
|
102 |
|
71 |
|
103 |
|
72 |
class TaskCommand(base.Command):
|
104 |
class TaskCommand(base.Command):
|
73 |
summary = 'Task command'
|
105 |
summary = 'Task command'
|
74 |
parser = base.Command.standard_parser(verbose=True)
|
106 |
parser = base.Command.standard_parser(verbose=True)
|