--- a/Allura/allura/command/taskd.py
+++ b/Allura/allura/command/taskd.py
@@ -2,6 +2,8 @@
import time
import Queue
from datetime import datetime, timedelta
+import signal
+import sys
import faulthandler
import pylons
@@ -24,8 +26,21 @@
def command(self):
self.basic_setup()
- base.log.info('Starting single taskd process')
+ self.keep_running = True
+ self.restart_when_done = False
+ base.log.info('Starting taskd, pid %s' % os.getpid())
+ signal.signal(signal.SIGUSR1, self.graceful_restart)
+ signal.signal(signal.SIGUSR2, self.graceful_stop)
self.worker()
+
+ def graceful_restart(self, signum, frame):
+ base.log.info('taskd pid %s recieved signal %s preparing to do a graceful restart' % (os.getpid(), signum))
+ self.keep_running = False
+ self.restart_when_done = True
+
+ def graceful_stop(self, signum, frame):
+ base.log.info('taskd pid %s recieved signal %s preparing to do a graceful stop' % (os.getpid(), signum))
+ self.keep_running = False
def worker(self):
from allura import model as M
@@ -38,35 +53,52 @@
exclude = self.options.exclude
if exclude:
exclude = exclude.split(',')
+
def start_response(status, headers, exc_info=None):
pass
+
def waitfunc_amqp():
try:
return pylons.g.amq_conn.queue.get(timeout=poll_interval)
except Queue.Empty:
return None
+
def waitfunc_noq():
time.sleep(poll_interval)
+
+ def check_running(func):
+ if self.keep_running:
+ return func()
+ else:
+ return None
+
if pylons.g.amq_conn:
waitfunc = waitfunc_amqp
else:
waitfunc = waitfunc_noq
- while True:
+ waitfunc = check_running(waitfunc)
+ while self.keep_running:
if pylons.g.amq_conn:
pylons.g.amq_conn.reset()
try:
- while True:
+ while self.keep_running:
task = M.MonQTask.get(
process=name,
waitfunc=waitfunc,
only=only,
exclude=exclude)
- # Build the (fake) request
- r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
- list(wsgi_app(r.environ, start_response))
+ if task:
+ # Build the (fake) request
+ r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
+ list(wsgi_app(r.environ, start_response))
except Exception:
- base.log.exception('Taskd, restart in 10s')
+ base.log.exception('taskd error; pausing for 10s before taking more tasks')
time.sleep(10)
+ base.log.info('taskd pid %s stopping gracefully.' % os.getpid())
+
+ if self.restart_when_done:
+ base.log.info('taskd pid %s restarting itself' % os.getpid())
+ os.execv(sys.argv[0], sys.argv)
class TaskCommand(base.Command):