a/Allura/allura/command/taskd.py b/Allura/allura/command/taskd.py
1
import os
1
import os
2
import time
2
import time
3
import Queue
3
import Queue
4
from datetime import datetime, timedelta
4
from datetime import datetime, timedelta
5
import signal
6
import sys
5
7
6
import faulthandler
8
import faulthandler
7
import pylons
9
import pylons
8
from paste.deploy import loadapp
10
from paste.deploy import loadapp
9
from paste.deploy.converters import asint
11
from paste.deploy.converters import asint
...
...
22
    parser.add_option('--exclude', dest='exclude', type='string', default=None,
24
    parser.add_option('--exclude', dest='exclude', type='string', default=None,
23
                      help='never handle tasks of the given name(s) (can be comma-separated list)')
25
                      help='never handle tasks of the given name(s) (can be comma-separated list)')
24
26
25
    def command(self):
27
    def command(self):
26
        self.basic_setup()
28
        self.basic_setup()
27
        base.log.info('Starting single taskd process')
29
        self.keep_running = True
30
        self.restart_when_done = False
31
        base.log.info('Starting taskd, pid %s' % os.getpid())
32
        signal.signal(signal.SIGUSR1, self.graceful_restart)
33
        signal.signal(signal.SIGUSR2, self.graceful_stop)
28
        self.worker()
34
        self.worker()
35
36
    def graceful_restart(self, signum, frame):
37
        base.log.info('taskd pid %s recieved signal %s preparing to do a graceful restart' % (os.getpid(), signum))
38
        self.keep_running = False
39
        self.restart_when_done = True
40
41
    def graceful_stop(self, signum, frame):
42
        base.log.info('taskd pid %s recieved signal %s preparing to do a graceful stop' % (os.getpid(), signum))
43
        self.keep_running = False
29
44
30
    def worker(self):
45
    def worker(self):
31
        from allura import model as M
46
        from allura import model as M
32
        name = '%s pid %s' % (os.uname()[1], os.getpid())
47
        name = '%s pid %s' % (os.uname()[1], os.getpid())
33
        wsgi_app = loadapp('config:%s#task' % self.args[0],relative_to=os.getcwd())
48
        wsgi_app = loadapp('config:%s#task' % self.args[0],relative_to=os.getcwd())
...
...
36
        if only:
51
        if only:
37
            only = only.split(',')
52
            only = only.split(',')
38
        exclude = self.options.exclude
53
        exclude = self.options.exclude
39
        if exclude:
54
        if exclude:
40
            exclude = exclude.split(',')
55
            exclude = exclude.split(',')
56
41
        def start_response(status, headers, exc_info=None):
57
        def start_response(status, headers, exc_info=None):
42
            pass
58
            pass
59
43
        def waitfunc_amqp():
60
        def waitfunc_amqp():
44
            try:
61
            try:
45
                return pylons.g.amq_conn.queue.get(timeout=poll_interval)
62
                return pylons.g.amq_conn.queue.get(timeout=poll_interval)
46
            except Queue.Empty:
63
            except Queue.Empty:
47
                return None
64
                return None
65
48
        def waitfunc_noq():
66
        def waitfunc_noq():
49
            time.sleep(poll_interval)
67
            time.sleep(poll_interval)
68
69
        def check_running(func):
70
            if self.keep_running:
71
                return func()
72
            else:
73
                return None
74
50
        if pylons.g.amq_conn:
75
        if pylons.g.amq_conn:
51
            waitfunc = waitfunc_amqp
76
            waitfunc = waitfunc_amqp
52
        else:
77
        else:
53
            waitfunc = waitfunc_noq
78
            waitfunc = waitfunc_noq
54
        while True:
79
        waitfunc = check_running(waitfunc)
80
        while self.keep_running:
55
            if pylons.g.amq_conn:
81
            if pylons.g.amq_conn:
56
                pylons.g.amq_conn.reset()
82
                pylons.g.amq_conn.reset()
57
            try:
83
            try:
58
                while True:
84
                while self.keep_running:
59
                    task = M.MonQTask.get(
85
                    task = M.MonQTask.get(
60
                            process=name,
86
                            process=name,
61
                            waitfunc=waitfunc,
87
                            waitfunc=waitfunc,
62
                            only=only,
88
                            only=only,
63
                            exclude=exclude)
89
                            exclude=exclude)
90
                    if task:
64
                    # Build the (fake) request
91
                        # Build the (fake) request
65
                    r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
92
                        r = Request.blank('/--%s--/' % task.task_name, dict(task=task))
66
                    list(wsgi_app(r.environ, start_response))
93
                        list(wsgi_app(r.environ, start_response))
67
            except Exception:
94
            except Exception:
68
                base.log.exception('Taskd, restart in 10s')
95
                base.log.exception('taskd error; pausing for 10s before taking more tasks')
69
                time.sleep(10)
96
                time.sleep(10)
97
        base.log.info('taskd pid %s stopping gracefully.' % os.getpid())
98
99
        if self.restart_when_done:
100
            base.log.info('taskd pid %s restarting itself' % os.getpid())
101
            os.execv(sys.argv[0], sys.argv)
70
102
71
103
72
class TaskCommand(base.Command):
104
class TaskCommand(base.Command):
73
    summary = 'Task command'
105
    summary = 'Task command'
74
    parser = base.Command.standard_parser(verbose=True)
106
    parser = base.Command.standard_parser(verbose=True)