blob: 43ee799be03c2c91f779a5723730cf29445142c1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import unicode_literals
from __future__ import absolute_import
import os
import time
import signal
import socket
import subprocess
from ming.orm.ormsession import ThreadLocalORMSession
import six
from allura import model as M
from . import base
from six.moves import range
class TaskdCleanupCommand(base.Command):
summary = 'Tasks cleanup command. Determines which taskd processes are handling tasks, and what has been dropped or got hung.'
parser = base.Command.standard_parser(verbose=True)
parser.add_option('-k', '--kill-stuck-taskd',
dest='kill', action='store_true',
help='automatically kill stuck taskd processes. Be careful with this, a taskd process '
'may just be very busy on certain operations and not able to respond to our status request')
parser.add_option('-n', '--num-retry-status-check',
dest='num_retry', type='int', default=5,
help='number of retries to read taskd status log after sending USR1 signal (5 by default)')
usage = '<ini file> [-k] <taskd status log file>'
min_args = 2
max_args = 2
def command(self):
self.basic_setup()
self.hostname = socket.gethostname()
self.taskd_status_log = self.args[1]
self.stuck_pids = []
self.error_tasks = []
self.suspicious_tasks = []
taskd_pids = self._taskd_pids()
base.log.info('Taskd processes on %s: %s' %
(self.hostname, taskd_pids))
# find stuck taskd processes
base.log.info('Seeking for stuck taskd processes')
for pid in taskd_pids:
base.log.info('...sending USR1 to %s and watching status log' %
(pid))
status = self._check_taskd_status(int(pid))
if status != 'OK':
base.log.info('...taskd pid %s has stuck' % pid)
self.stuck_pids.append(pid)
if self.options.kill:
base.log.info('...-k is set. Killing %s' % pid)
self._kill_stuck_taskd(pid)
else:
base.log.info('...%s' % status)
# find 'forsaken' tasks
base.log.info('Seeking for forsaken busy tasks')
tasks = [t for t in self._busy_tasks()
if t not in self.error_tasks] # skip seen tasks
base.log.info('Found %s busy tasks on %s' %
(len(tasks), self.hostname))
for task in tasks:
base.log.info('Verifying task %s' % task)
pid = task.process.split()[-1]
if pid not in taskd_pids:
# 'forsaken' task
base.log.info('Task is forsaken '
'(can\'t find taskd with given pid). '
'Setting state to \'error\'')
task.state = 'error'
task.result = 'Can\'t find taskd with given pid'
self.error_tasks.append(task)
else:
# check if taskd with given pid really processing this task
# now:
base.log.info(
'Checking that taskd pid %s is really processing task %s' %
(pid, task._id))
status = self._check_task(pid, task)
if status != 'OK':
# maybe task moved quickly and now is complete
# so we need to check such tasks later
# and mark incomplete ones as 'error'
self.suspicious_tasks.append(task)
base.log.info('...NO. Adding task to suspisious list')
else:
base.log.info('...OK')
# check suspicious task and mark incomplete as error
base.log.info('Checking suspicious list for incomplete tasks')
self._check_suspicious_tasks()
ThreadLocalORMSession.flush_all()
self.print_summary()
def print_summary(self):
base.log.info('-' * 80)
if self.stuck_pids:
base.log.info('Found stuck taskd: %s' % self.stuck_pids)
if self.options.kill:
base.log.info('...stuck taskd processes were killed')
else:
base.log.info(
'...to kill these processes run command with -k flag if you are sure they are really stuck')
if self.error_tasks:
base.log.info('Tasks marked as \'error\': %s' % self.error_tasks)
def _busy_tasks(self, pid=None):
regex = '^%s ' % self.hostname
if pid is not None:
regex = '^%s pid %s' % (self.hostname, pid)
return M.MonQTask.query.find({
'state': 'busy',
'process': {'$regex': regex}
})
def _taskd_pids(self):
# space or colon after "taskd" to ensure no match on taskd_cleanup (ourself)
p = subprocess.Popen(['pgrep', '-f', '^taskd[ :]'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
tasks = []
if p.returncode == 0:
tasks = [pid for pid in six.ensure_text(stdout).split('\n') if pid != '']
return tasks
def _taskd_status(self, pid, retry=False):
if not retry:
os.kill(int(pid), signal.SIGUSR1)
p = subprocess.Popen(['tail', '-n1', self.taskd_status_log],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
base.log.error('Can\'t read taskd status log %s' %
self.taskd_status_log)
exit(1)
return six.ensure_text(stdout)
def _check_taskd_status(self, pid):
for i in range(self.options.num_retry):
retry = False if i == 0 else True
status = self._taskd_status(pid, retry)
if ('taskd pid %s' % pid) in status:
return 'OK'
base.log.info('retrying after one second')
time.sleep(1)
return 'STUCK'
def _check_task(self, taskd_pid, task):
for i in range(self.options.num_retry):
retry = False if i == 0 else True
status = self._taskd_status(taskd_pid, retry)
line = 'taskd pid %s is currently handling task %s' % (
taskd_pid, task)
if line in status:
return 'OK'
base.log.info('retrying after one second')
time.sleep(1)
return 'FAIL'
def _kill_stuck_taskd(self, pid):
os.kill(int(pid), signal.SIGKILL)
# find all 'busy' tasks for this pid and mark them as 'error'
tasks = list(self._busy_tasks(pid=pid))
base.log.info('...taskd pid %s has assigned tasks: %s. '
'setting state to \'error\' for all of them' % (pid, tasks))
for task in tasks:
task.state = 'error'
task.result = 'Taskd has stuck with this task'
self.error_tasks.append(task)
def _complete_suspicious_tasks(self):
complete_tasks = M.MonQTask.query.find({
'state': 'complete',
'_id': {'$in': [t._id for t in self.suspicious_tasks]}
})
return [t._id for t in complete_tasks]
def _check_suspicious_tasks(self):
if not self.suspicious_tasks:
return
complete_tasks = self._complete_suspicious_tasks()
for task in self.suspicious_tasks:
base.log.info('Verifying task %s' % task)
if task._id not in complete_tasks:
base.log.info('...incomplete. Setting status to \'error\'')
task.state = 'error'
task.result = 'Forsaken task'
self.error_tasks.append(task)
else:
base.log.info('...complete')