various TaskCommand improvements triggered by expansions to purge command
diff --git a/Allura/allura/command/taskd.py b/Allura/allura/command/taskd.py
index 4d7d624..5f9d747 100644
--- a/Allura/allura/command/taskd.py
+++ b/Allura/allura/command/taskd.py
@@ -167,12 +167,20 @@
class TaskCommand(base.Command):
+ cmd_default_states = {
+ 'list': 'ready',
+ 'count': 'ready',
+ 'purge': '*'
+ }
+
summary = 'Task command'
parser = base.Command.standard_parser(verbose=True)
- parser.add_option('-s', '--state', dest='state', default='ready',
- help='state of processes for "list" subcommand. * means all')
+ parser.add_option('-s', '--state', dest='state', default=None,
+ help='state of processes for "list", "count", or "purge" subcommands. * means all. '
+ '(Defaults per command: %s)' %
+ ", ".join(['%s="%s"' % (k, v) for k, v in cmd_default_states.items()]))
parser.add_option('-t', '--timeout', dest='timeout', type=int, default=60,
- help='timeout (in seconds) for busy tasks')
+ help='timeout (in seconds) for busy tasks (only applies to "timeout" command)')
parser.add_option('--filter-name-prefix', dest='filter_name_prefix', default=None,
help='limit to task names starting with this. Example allura.tasks.index_tasks.')
parser.add_option('--filter-result-regex', dest='filter_result_regex', default=None,
@@ -181,12 +189,13 @@
help='limit to tasks queued NUM days ago. Example "180"')
min_args = 2
max_args = None
- usage = '''<ini file> [list|retry|purge|timeout|commit]
+ usage = '''<ini file> [list|count|retry|purge|timeout|commit]
- list: list tasks with given --state value
- retry: re-run tasks with error state
- purge: remove all "complete" tasks with result_type "forget" (which is the default)
- timeout: retry all busy tasks older than --timeout seconds (does not stop existing task)
+ list: prints tasks matching --state (default: 'ready') and filters
+ count: counts tasks matching --state (default: 'ready') and filters
+ retry: re-run tasks with 'error' state. --state has no effect
+ purge: remove all tasks that match --state ( default: '*') with result_type "forget".
+ timeout: retry all tasks with state 'busy' and older than --timeout seconds (does not stop existing task). --state has no effect
commit: run a solr 'commit' as a background task
All subcommands except 'commit' can use the --filter-... options.
@@ -197,12 +206,27 @@
cmd = self.args[1]
tab = dict(
list=self._list,
+ count=self._count,
retry=self._retry,
purge=self._purge,
timeout=self._timeout,
commit=self._commit)
tab[cmd]()
+ def _get_state_query(self):
+ state = self.options.state
+ if not state:
+ cmd = self.args[1]
+ state = self.cmd_default_states.get(cmd, 'ready')
+
+ if state == '*':
+ from allura import model as M
+ # Providing all possible state values allows us to leverage the mongo index.
+ # omitting a state field might result in an entire COLLSCAN
+ state = {'$in': M.MonQTask.states}
+
+ return state
+
def _add_filters(self, q):
if self.options.filter_name_prefix:
q['task_name'] = {'$regex': r'^{}.*'.format(re.escape(self.options.filter_name_prefix))}
@@ -214,35 +238,52 @@
print(q)
return q
+ def _print_query(self, cmd, *args):
+ print('running mongod cmd: %s, %s' % (cmd, args))
+
def _list(self):
'''List tasks'''
from allura import model as M
- base.log.info('Listing tasks of state %s', self.options.state)
- if self.options.state == '*':
- q = dict()
- else:
- q = dict(state=self.options.state)
+ state = self._get_state_query()
+ base.log.info('Listing tasks of state %s', state)
+ q = dict(state=state)
q = self._add_filters(q)
+ self._print_query('find', q)
for t in M.MonQTask.query.find(q):
print(t)
+ def _count(self):
+ '''Count tasks'''
+ from allura import model as M
+ state = self._get_state_query()
+ base.log.info('Counting tasks of state %s', state)
+ q = dict(state=state)
+ q = self._add_filters(q)
+ self._print_query('find', q)
+ count = M.MonQTask.query.find(q).count()
+ print('Task Count %s' % count)
+
def _retry(self):
'''Retry tasks in an error state'''
from allura import model as M
base.log.info('Retry tasks in error state')
q = dict(state='error')
q = self._add_filters(q)
+ update = {'$set': dict(state='ready')}
+ self._print_query('update', q, update)
M.MonQTask.query.update(
q,
- {'$set': dict(state='ready')},
+ update,
multi=True)
def _purge(self):
'''Purge completed tasks'''
from allura import model as M
base.log.info('Purge complete/forget tasks')
- q = dict(state='complete', result_type='forget')
+ state = self._get_state_query()
+ q = dict(state=state, result_type='forget')
q = self._add_filters(q)
+ self._print_query('remove', q)
M.MonQTask.query.remove(q)
def _timeout(self):
@@ -256,9 +297,11 @@
time_start={'$lt': cutoff},
)
q = self._add_filters(q)
+ update = {'$set': dict(state='ready')}
+ self._print_query('update', q, update)
M.MonQTask.query.update(
q,
- {'$set': dict(state='ready')},
+ update,
multi=True)
def _commit(self):