Switch to side-by-side view

--- a/Allura/allura/command/taskd.py
+++ b/Allura/allura/command/taskd.py
@@ -2,6 +2,8 @@
 import time
 import Queue
 from datetime import datetime, timedelta
+import signal
+import sys
 
 import faulthandler
 import pylons
@@ -24,8 +26,21 @@
 
     def command(self):
         self.basic_setup()
-        base.log.info('Starting single taskd process')
+        self.keep_running = True
+        self.restart_when_done = False
+        base.log.info('Starting taskd, pid %s' % os.getpid())
+        signal.signal(signal.SIGUSR1, self.graceful_restart)
+        signal.signal(signal.SIGUSR2, self.graceful_stop)
         self.worker()
+
+    def graceful_restart(self, signum, frame):
+        base.log.info('taskd pid %s recieved signal %s preparing to do a graceful restart' % (os.getpid(), signum))
+        self.keep_running = False
+        self.restart_when_done = True
+
+    def graceful_stop(self, signum, frame):
+        base.log.info('taskd pid %s recieved signal %s preparing to do a graceful stop' % (os.getpid(), signum))
+        self.keep_running = False
 
     def worker(self):
         from allura import model as M
@@ -38,35 +53,52 @@
         exclude = self.options.exclude
         if exclude:
             exclude = exclude.split(',')
+
         def start_response(status, headers, exc_info=None):
             pass
+
         def waitfunc_amqp():
             try:
                 return pylons.g.amq_conn.queue.get(timeout=poll_interval)
             except Queue.Empty:
                 return None
+
         def waitfunc_noq():
             time.sleep(poll_interval)
+
+        def check_running(func):
+            if self.keep_running:
+                return func()
+            else:
+                return None
+
         if pylons.g.amq_conn:
             waitfunc = waitfunc_amqp
         else:
             waitfunc = waitfunc_noq
-        while True:
+        waitfunc = check_running(waitfunc)
+        while self.keep_running:
             if pylons.g.amq_conn:
                 pylons.g.amq_conn.reset()
             try:
-                while True:
+                while self.keep_running:
                     task = M.MonQTask.get(
                             process=name,
                             waitfunc=waitfunc,
                             only=only,
                             exclude=exclude)
-                    # Build the (fake) request
-                    r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
-                    list(wsgi_app(r.environ, start_response))
+                    if task:
+                        # Build the (fake) request
+                        r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
+                        list(wsgi_app(r.environ, start_response))
             except Exception:
-                base.log.exception('Taskd, restart in 10s')
+                base.log.exception('taskd error; pausing for 10s before taking more tasks')
                 time.sleep(10)
+        base.log.info('taskd pid %s stopping gracefully.' % os.getpid())
+
+        if self.restart_when_done:
+            base.log.info('taskd pid %s restarting itself' % os.getpid())
+            os.execv(sys.argv[0], sys.argv)
 
 
 class TaskCommand(base.Command):