| import os |
| import time |
| import signal |
| import socket |
| import subprocess |
| from ming.orm.ormsession import ThreadLocalORMSession |
| |
| from allura import model as M |
| import base |
| |
| class TaskdCleanupCommand(base.Command): |
| summary = 'Tasks cleanup command' |
| parser = base.Command.standard_parser(verbose=True) |
| parser.add_option('-k', '--kill-stuck-taskd', |
| dest='kill', action='store_true', |
| help='automatically kill stuck taskd processes') |
| parser.add_option('-n', '--num-retry-status-check', |
| dest='num_retry', type='int', default=5, |
| help='number of retries to read taskd status log after sending USR1 signal (5 by default)') |
| usage = '<ini file> [-k] <taskd status log file>' |
| min_args = 2 |
| max_args = 2 |
| |
| def command(self): |
| self.basic_setup() |
| self.hostname = socket.gethostname() |
| self.taskd_status_log = self.args[1] |
| self.stuck_pids = [] |
| self.error_tasks = [] |
| self.suspicious_tasks = [] |
| |
| taskd_pids = self._taskd_pids() |
| base.log.info('Taskd processes on %s: %s' % (self.hostname, taskd_pids)) |
| |
| # find stuck taskd processes |
| base.log.info('Seeking for stuck taskd processes') |
| for pid in taskd_pids: |
| base.log.info('...sending USR1 to %s and watching status log' % (pid)) |
| status = self._check_taskd_status(int(pid)) |
| if status != 'OK': |
| base.log.info('...taskd pid %s has stuck' % pid) |
| self.stuck_pids.append(pid) |
| if self.options.kill: |
| base.log.info('...-k is set. Killing %s' % pid) |
| self._kill_stuck_taskd(pid) |
| else: |
| base.log.info('...%s' % status) |
| |
| # find 'forsaken' tasks |
| base.log.info('Seeking for forsaken busy tasks') |
| tasks = [t for t in self._busy_tasks() |
| if t not in self.error_tasks] # skip seen tasks |
| base.log.info('Found %s busy tasks on %s' % (len(tasks), self.hostname)) |
| for task in tasks: |
| base.log.info('Verifying task %s' % task) |
| pid = task.process.split()[-1] |
| if pid not in taskd_pids: |
| # 'forsaken' task |
| base.log.info('Task is forsaken ' |
| '(can\'t find taskd with given pid). ' |
| 'Setting state to \'error\'') |
| task.state = 'error' |
| task.result = 'Can\'t find taskd with given pid' |
| self.error_tasks.append(task) |
| else: |
| # check if taskd with given pid really processing this task now: |
| base.log.info('Checking that taskd pid %s is really processing task %s' % (pid, task._id)) |
| status = self._check_task(pid, task) |
| if status != 'OK': |
| # maybe task moved quickly and now is complete |
| # so we need to check such tasks later |
| # and mark incomplete ones as 'error' |
| self.suspicious_tasks.append(task) |
| base.log.info('...NO. Adding task to suspisious list') |
| else: |
| base.log.info('...OK') |
| |
| # check suspicious task and mark incomplete as error |
| base.log.info('Checking suspicious list for incomplete tasks') |
| self._check_suspicious_tasks() |
| ThreadLocalORMSession.flush_all() |
| self.print_summary() |
| |
| def print_summary(self): |
| base.log.info('-' * 80) |
| if self.stuck_pids: |
| base.log.info('Found stuck taskd: %s' % self.stuck_pids) |
| if self.options.kill: |
| base.log.info('...stuck taskd processes were killed') |
| else: |
| base.log.info('...to kill these processes run command with -k flag') |
| if self.error_tasks: |
| base.log.info('Tasks marked as \'error\': %s' % self.error_tasks) |
| |
| def _busy_tasks(self, pid=None): |
| regex = '^%s ' % self.hostname |
| if pid is not None: |
| regex = '^%s pid %s' % (self.hostname, pid) |
| return M.MonQTask.query.find({ |
| 'state': 'busy', |
| 'process': {'$regex': regex} |
| }) |
| |
| def _taskd_pids(self): |
| # space after "taskd" to ensure no match on taskd_cleanup (ourself) |
| p = subprocess.Popen(['pgrep', '-f', '/paster taskd '], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| stdout, stderr = p.communicate() |
| tasks = [] |
| if p.returncode == 0: |
| tasks = [pid for pid in stdout.split('\n') if pid != ''] |
| return tasks |
| |
| def _taskd_status(self, pid, retry=False): |
| if not retry: |
| os.kill(int(pid), signal.SIGUSR1) |
| p = subprocess.Popen(['tail', '-n1', self.taskd_status_log], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| stdout, stderr = p.communicate() |
| if p.returncode != 0: |
| base.log.error('Can\'t read taskd status log %s' % self.taskd_status_log) |
| exit(1) |
| return stdout |
| |
| def _check_taskd_status(self, pid): |
| for i in range(self.options.num_retry): |
| retry = False if i == 0 else True |
| status = self._taskd_status(pid, retry) |
| if ('taskd pid %s' % pid) in status: |
| return 'OK' |
| base.log.info('retrying after one second') |
| time.sleep(1) |
| return 'STUCK' |
| |
| def _check_task(self, taskd_pid, task): |
| for i in range(self.options.num_retry): |
| retry = False if i == 0 else True |
| status = self._taskd_status(taskd_pid, retry) |
| line = 'taskd pid %s is currently handling task %s' % (taskd_pid, task) |
| if line in status: |
| return 'OK' |
| base.log.info('retrying after one second') |
| time.sleep(1) |
| return 'FAIL' |
| |
| def _kill_stuck_taskd(self, pid): |
| os.kill(int(pid), signal.SIGKILL) |
| # find all 'busy' tasks for this pid and mark them as 'error' |
| tasks = list(self._busy_tasks(pid=pid)) |
| base.log.info('...taskd pid %s has assigned tasks: %s. ' |
| 'setting state to \'error\' for all of them' % (pid, tasks)) |
| for task in tasks: |
| task.state = 'error' |
| task.result = 'Taskd has stuck with this task' |
| self.error_tasks.append(task) |
| |
| def _complete_suspicious_tasks(self): |
| complete_tasks = M.MonQTask.query.find({ |
| 'state': 'complete', |
| '_id': {'$in': [t._id for t in self.suspicious_tasks]} |
| }); |
| return [t._id for t in complete_tasks] |
| |
| def _check_suspicious_tasks(self): |
| if not self.suspicious_tasks: |
| return |
| complete_tasks = self._complete_suspicious_tasks() |
| for task in self.suspicious_tasks: |
| base.log.info('Verifying task %s' % task) |
| if task._id not in complete_tasks: |
| base.log.info('...incomplete. Setting status to \'error\'') |
| task.state = 'error' |
| task.result = 'Forsaken task' |
| self.error_tasks.append(task) |
| else: |
| base.log.info('...complete') |