--- a
+++ b/Allura/allura/command/taskd_cleanup.py
@@ -0,0 +1,166 @@
+import os
+import signal
+import socket
+import subprocess
+from ming.orm.ormsession import ThreadLocalORMSession
+
+from allura import model as M
+import base
+
+class TaskdCleanupCommand(base.Command):
+ summary = 'Tasks cleanup command'
+ parser = base.Command.standard_parser(verbose=True)
+ parser.add_option('-k', '--kill-stuck-taskd',
+ dest='kill', action='store_true',
+ help='automatically kill stuck taskd processes')
+ usage = '<ini file> [-k] <taskd status log file>'
+ min_args = 2
+ max_args = 2
+
+ def command(self):
+ self.basic_setup()
+ self.hostname = socket.gethostname()
+ self.taskd_status_log = self.args[1]
+ self.stuck_pids = []
+ self.error_tasks = []
+ self.suspicious_tasks = []
+
+ taskd_pids = self._taskd_pids()
+ base.log.info('Taskd processes on %s: %s' % (self.hostname, taskd_pids))
+
+ # find stuck taskd processes
+ base.log.info('Seeking for stuck taskd processes')
+ for pid in taskd_pids:
+ base.log.info('...sending USR1 to %s and watching status log' % (pid))
+ status = self._check_taskd_status(int(pid))
+ if status != 'OK':
+ base.log.info('...taskd pid %s has stuck' % pid)
+ self.stuck_pids.append(pid)
+ if self.options.kill:
+ base.log.info('...-k is set. Killing %s' % pid)
+ self._kill_stuck_taskd(pid)
+ else:
+ base.log.info('...%s' % status)
+
+ # find 'forsaken' tasks
+ base.log.info('Seeking for forsaken busy tasks')
+ tasks = [t for t in self._busy_tasks()
+ if t not in self.error_tasks] # skip seen tasks
+ base.log.info('Found %s busy tasks on %s' % (len(tasks), self.hostname))
+ for task in tasks:
+ base.log.info('Verifying task %s' % task)
+ pid = task.process.split()[-1]
+ if pid not in taskd_pids:
+ # 'forsaken' task
+ base.log.info('Task is forsaken '
+ '(can\'t find taskd with given pid). '
+ 'Setting state to \'error\'')
+ task.state = 'error'
+ task.result = 'Can\'t find taskd with given pid'
+ self.error_tasks.append(task)
+ else:
+ # check if taskd with given pid really processing this task now:
+ base.log.info('Checking that taskd pid %s is really processing task %s' % (pid, task._id))
+ status = self._check_task(pid, task)
+ if status != 'OK':
+ # maybe task moved quickly and now is complete
+ # so we need to check such tasks later
+ # and mark incomplete ones as 'error'
+ self.suspicious_tasks.append(task)
+ base.log.info('...NO. Adding task to suspisious list')
+ else:
+ base.log.info('...OK')
+
+ # check suspicious task and mark incomplete as error
+ base.log.info('Checking suspicious list for incomplete tasks')
+ self._check_suspicious_tasks()
+ ThreadLocalORMSession.flush_all()
+ self.print_summary()
+
+ def print_summary(self):
+ base.log.info('-' * 80)
+ if self.stuck_pids:
+ base.log.info('Found stuck taskd: %s' % self.stuck_pids)
+ if self.options.kill:
+ base.log.info('...stuck taskd processes were killed')
+ else:
+ base.log.info('...to kill these processes run command with -k flag')
+ if self.error_tasks:
+ base.log.info('Tasks marked as \'error\': %s' % self.error_tasks)
+
+ def _busy_tasks(self, pid=None):
+ regex = '^%s ' % self.hostname
+ if pid is not None:
+ regex = '^%s pid %s' % (self.hostname, pid)
+ return M.MonQTask.query.find({
+ 'state': 'busy',
+ 'process': {'$regex': regex}
+ })
+
+ def _taskd_pids(self):
+ p = subprocess.Popen(['pgrep', '-f', '/paster taskd'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ tasks = []
+ if p.returncode == 0:
+ # p.communicate() returns self-process too,
+ # so we need to skip last pid
+ tasks = [pid for pid in stdout.split('\n') if pid != ''][:-1]
+ return tasks
+
+ def _taskd_status(self, pid):
+ os.kill(int(pid), signal.SIGUSR1)
+ p = subprocess.Popen(['tail', '-n1', self.taskd_status_log],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode != 0:
+ base.log.error('Can\'t read taskd status log %s' % self.taskd_status_log)
+ exit(1)
+ return stdout
+
+ def _check_taskd_status(self, pid):
+ status = self._taskd_status(pid)
+ if ('taskd pid %s' % pid) not in status:
+ return 'STUCK'
+ return 'OK'
+
+ def _check_task(self, taskd_pid, task):
+ status = self._taskd_status(taskd_pid)
+ line = 'taskd pid %s is currently handling task %s' % (taskd_pid, task)
+ if line not in status:
+ return 'FAIL'
+ return 'OK'
+
+ def _kill_stuck_taskd(self, pid):
+ os.kill(int(pid), signal.SIGKILL)
+ # find all 'busy' tasks for this pid and mark them as 'error'
+ tasks = list(self._busy_tasks(pid=pid))
+ base.log.info('...taskd pid %s has assigned tasks: %s. '
+ 'setting state to \'error\' for all of them' % (pid, tasks))
+ for task in tasks:
+ task.state = 'error'
+ task.result = 'Taskd has stuck with this task'
+ self.error_tasks.append(task)
+
+ def _complete_suspicious_tasks(self):
+ complete_tasks = M.MonQTask.query.find({
+ 'state': 'complete',
+ '_id': {'$in': [t._id for t in self.suspicious_tasks]}
+ });
+ return [t._id for t in complete_tasks]
+
+ def _check_suspicious_tasks(self):
+ if not self.suspicious_tasks:
+ return
+ complete_tasks = self._complete_suspicious_tasks()
+ for task in self.suspicious_tasks:
+ base.log.info('Verifying task %s' % task)
+ if task._id not in complete_tasks:
+ base.log.info('...incomplete. Setting status to \'error\'')
+ task.state = 'error'
+ task.result = 'Forsaken task'
+ self.error_tasks.append(task)
+ else:
+ base.log.info('...complete')