Switch Thermos to lazy log formatting
This is the first part of a small series of Thermos observer performance
improvements.
As a first iteration, this switches all logging to use the logger-embedded
formatting rather than doing it eager up front. This has the advantage that
we produce less garbage if debug logging is disabled.
Reviewed at https://reviews.apache.org/r/66136/
diff --git a/src/main/python/apache/thermos/common/ckpt.py b/src/main/python/apache/thermos/common/ckpt.py
index e79ec6a..cb254a4 100644
--- a/src/main/python/apache/thermos/common/ckpt.py
+++ b/src/main/python/apache/thermos/common/ckpt.py
@@ -186,7 +186,7 @@
builder.dispatch(state, update, truncate=truncate)
return state
except cls.Error as e:
- log.error('Failed to recover from %s: %s' % (filename, e))
+ log.error('Failed to recover from %s: %s', filename, e)
def __init__(self):
self._task_handlers = []
@@ -325,7 +325,7 @@
raise self.ErrorRecoveringState(
"Attempting to rebind task with different parameters!")
else:
- log.debug('Initializing TaskRunner header to %s' % runner_ckpt.runner_header)
+ log.debug('Initializing TaskRunner header to %s', runner_ckpt.runner_header)
state.header = runner_ckpt.runner_header
self._run_header_dispatch(runner_ckpt.runner_header)
return
@@ -343,9 +343,9 @@
else:
state.statuses = [runner_ckpt.task_status]
new_state = runner_ckpt.task_status.state
- log.debug('Flipping task state from %s to %s' % (
+ log.debug('Flipping task state from %s to %s',
TaskState._VALUES_TO_NAMES.get(old_state, '(undefined)'),
- TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)')))
+ TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)'))
self._run_task_dispatch(new_state, runner_ckpt.task_status)
return
@@ -358,11 +358,11 @@
current_run = state.processes[name][-1] if name in state.processes else None
if current_run and process_update.seq != current_run.seq + 1:
if recovery:
- log.debug('Skipping replayed out-of-order update: %s' % process_update)
+ log.debug('Skipping replayed out-of-order update: %s', process_update)
return
else:
raise self.InvalidSequenceNumber(
- "Out of order sequence number! %s => %s" % (current_run, process_update))
+ "Out of order sequence number! %s => %s", current_run, process_update)
# One special case for WAITING: Initialize a new target ProcessState.
if process_update.state == ProcessState.WAITING:
@@ -376,7 +376,7 @@
state.processes[name] = [ProcessStatus(seq=current_run.seq)]
# Run the process state machine.
- log.debug('Running state machine for process=%s/seq=%s' % (name, process_update.seq))
+ log.debug('Running state machine for process=%s/seq=%s', name, process_update.seq)
if not state.processes or name not in state.processes:
raise self.ErrorRecoveringState("Encountered potentially out of order "
"process update. Are you sure this is a full checkpoint stream?")
diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py
index 0811e84..a402498 100644
--- a/src/main/python/apache/thermos/core/helper.py
+++ b/src/main/python/apache/thermos/core/helper.py
@@ -85,8 +85,8 @@
process_create_time = process.create_time()
if abs(start_time - process_create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
- log.info("Expected pid %s start time to be %s but it's %s" % (
- process.pid, start_time, process_create_time))
+ log.info("Expected pid %s start time to be %s but it's %s",
+ process.pid, start_time, process_create_time)
return False
if uid is not None:
@@ -104,12 +104,12 @@
elif uid == 0:
# If the process was launched as root but is now not root, we should
# kill this because it could have called `setuid` on itself.
- log.info("pid %s appears to be have launched by root but it's uid is now %s" % (
- process.pid, process_uid))
+ log.info("pid %s appears to be have launched by root but it's uid is now %s",
+ process.pid, process_uid)
return True
else:
- log.info("Expected pid %s to be ours but the pid uid is %s and we're %s" % (
- process.pid, process_uid, uid))
+ log.info("Expected pid %s to be ours but the pid uid is %s and we're %s",
+ process.pid, process_uid, uid)
return False
try:
@@ -121,8 +121,8 @@
# If the uid was not provided, we must use user -- which is possibly flaky if the
# user gets deleted from the system, so process_user will be None and we must
# return False.
- log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
- process.pid, process_user, user))
+ log.info("Expected pid %s to be ours but the pid user is %s and we're %s",
+ process.pid, process_user, user)
return True
return False
@@ -141,7 +141,7 @@
coordinator_pid, pid, tree = None, None, set()
if uid is None:
- log.debug('Legacy thermos checkpoint stream detected, user = %s' % user)
+ log.debug('Legacy thermos checkpoint stream detected, user = %s', user)
if process_run.coordinator_pid:
try:
@@ -149,11 +149,11 @@
if cls.this_is_really_our_pid(coordinator_process, uid, user, process_run.fork_time):
coordinator_pid = process_run.coordinator_pid
except psutil.NoSuchProcess:
- log.info(' Coordinator %s [pid: %s] completed.' % (process_run.process,
- process_run.coordinator_pid))
+ log.info(' Coordinator %s [pid: %s] completed.', process_run.process,
+ process_run.coordinator_pid)
except psutil.Error as err:
- log.warning(' Error gathering information on pid %s: %s' % (process_run.coordinator_pid,
- err))
+ log.warning(' Error gathering information on pid %s: %s', process_run.coordinator_pid,
+ err)
if process_run.pid:
try:
@@ -161,15 +161,15 @@
if cls.this_is_really_our_pid(process, uid, user, process_run.start_time):
pid = process.pid
except psutil.NoSuchProcess:
- log.info(' Process %s [pid: %s] completed.' % (process_run.process, process_run.pid))
+ log.info(' Process %s [pid: %s] completed.', process_run.process, process_run.pid)
except psutil.Error as err:
- log.warning(' Error gathering information on pid %s: %s' % (process_run.pid, err))
+ log.warning(' Error gathering information on pid %s: %s', process_run.pid, err)
else:
if pid:
try:
tree = set(child.pid for child in process.children(recursive=True))
except psutil.Error:
- log.warning(' Error gathering information on children of pid %s' % pid)
+ log.warning(' Error gathering information on children of pid %s', pid)
return (coordinator_pid, pid, tree)
@@ -192,9 +192,9 @@
os.kill(pid, sig)
except OSError as e:
if e.errno not in (errno.ESRCH, errno.EPERM):
- log.error('Unexpected error in os.kill: %s' % e)
+ log.error('Unexpected error in os.kill: %s', e)
except Exception as e:
- log.error('Unexpected error in os.kill: %s' % e)
+ log.error('Unexpected error in os.kill: %s', e)
@classmethod
def terminate_pid(cls, pid):
@@ -240,16 +240,16 @@
@classmethod
def terminate_process(cls, state, process_name):
- log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name)
+ log.debug('TaskRunnerHelper.terminate_process(%s)', process_name)
_, pid, _ = cls._get_process_tuple(state, process_name)
if pid:
- log.debug(' => SIGTERM pid %s' % pid)
+ log.debug(' => SIGTERM pid %s', pid)
cls.terminate_pid(pid)
return bool(pid)
@classmethod
def kill_process(cls, state, process_name):
- log.debug('TaskRunnerHelper.kill_process(%s)' % process_name)
+ log.debug('TaskRunnerHelper.kill_process(%s)', process_name)
coordinator_pgid = cls._get_coordinator_group(state, process_name)
coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name)
# This is super dangerous. TODO(wickman) Add a heuristic that determines
@@ -257,16 +257,16 @@
# and 2) those processes have inherited the coordinator checkpoint filehandle
# This way we validate that it is in fact the process group we expect.
if coordinator_pgid:
- log.debug(' => SIGKILL coordinator group %s' % coordinator_pgid)
+ log.debug(' => SIGKILL coordinator group %s', coordinator_pgid)
cls.kill_group(coordinator_pgid)
if coordinator_pid:
- log.debug(' => SIGKILL coordinator %s' % coordinator_pid)
+ log.debug(' => SIGKILL coordinator %s', coordinator_pid)
cls.kill_pid(coordinator_pid)
if pid:
- log.debug(' => SIGKILL pid %s' % pid)
+ log.debug(' => SIGKILL pid %s', pid)
cls.kill_pid(pid)
for child in tree:
- log.debug(' => SIGKILL child %s' % child)
+ log.debug(' => SIGKILL child %s', child)
cls.kill_pid(child)
return bool(coordinator_pid or pid or tree)
@@ -380,11 +380,11 @@
if pid == 0:
break
pids.add(pid)
- log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % (
- pid, status, rusage))
+ log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s',
+ pid, status, rusage)
except OSError as e:
if e.errno != errno.ECHILD:
- log.warning('Unexpected error when calling waitpid: %s' % e)
+ log.warning('Unexpected error when calling waitpid: %s', e)
break
return pids
diff --git a/src/main/python/apache/thermos/core/muxer.py b/src/main/python/apache/thermos/core/muxer.py
index 47e77f7..b095d75 100644
--- a/src/main/python/apache/thermos/core/muxer.py
+++ b/src/main/python/apache/thermos/core/muxer.py
@@ -37,7 +37,7 @@
fp.close()
def register(self, process_name, watermark=0):
- log.debug('registering %s' % process_name)
+ log.debug('registering %s', process_name)
if process_name in self._processes:
raise self.ProcessExists("Process %s is already registered" % process_name)
self._processes[process_name] = None
@@ -47,7 +47,7 @@
for process_name, fp in self._processes.items():
if fp is None:
process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
- log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
+ log.debug('ProcessMuxer binding %s => %s', process_name, process_ckpt)
try:
self._processes[process_name] = open(process_ckpt, 'r') # noqa
except IOError as e:
@@ -55,14 +55,14 @@
log.debug(' => bind failed, checkpoint not available yet.')
continue
else:
- log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+ log.error("Unexpected inability to open %s! %s", process_ckpt, e)
except Exception as e:
- log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+ log.error("Unexpected inability to open %s! %s", process_ckpt, e)
self._fast_forward_stream(process_name)
def _fast_forward_stream(self, process_name):
- log.debug('Fast forwarding %s stream to seq=%s' % (process_name,
- self._watermarks[process_name]))
+ log.debug('Fast forwarding %s stream to seq=%s', process_name,
+ self._watermarks[process_name])
assert self._processes.get(process_name) is not None
fp = self._processes[process_name]
rr = ThriftRecordReader(fp, RunnerCkpt)
@@ -75,25 +75,25 @@
break
new_watermark = record.process_status.seq
if new_watermark > self._watermarks[process_name]:
- log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
- process_name, new_watermark, self._watermarks[process_name]))
+ log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.',
+ process_name, new_watermark, self._watermarks[process_name])
fp.seek(last_pos)
break
current_watermark = new_watermark
records += 1
if current_watermark < self._watermarks[process_name]:
- log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
- process_name, current_watermark, self._watermarks[process_name]))
+ log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s',
+ process_name, current_watermark, self._watermarks[process_name])
if records:
- log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
- current_watermark))
+ log.debug('Fast forwarded %s %s record(s) to seq=%s.', process_name, records,
+ current_watermark)
def unregister(self, process_name):
log.debug('unregistering %s' % process_name)
if process_name not in self._processes:
- raise self.ProcessNotFound("No trace of process: %s" % process_name)
+ raise self.ProcessNotFound("No trace of process: %s", process_name)
else:
self._watermarks.pop(process_name)
fp = self._processes.pop(process_name)
@@ -114,7 +114,7 @@
try:
os.fstat(fp.fileno()).st_size
except OSError:
- log.debug('ProcessMuxer could not fstat for process %s' % process)
+ log.debug('ProcessMuxer could not fstat for process %s', process)
return False
update = rr.try_read()
if update:
@@ -139,10 +139,10 @@
try:
fstat = os.fstat(handle.fileno())
except OSError:
- log.error('Unable to fstat %s!' % handle.name)
+ log.error('Unable to fstat %s!', handle.name)
continue
if handle.tell() > fstat.st_size:
- log.error('Truncated checkpoint record detected on %s!' % handle.name)
+ log.error('Truncated checkpoint record detected on %s!', handle.name)
elif handle.tell() < fstat.st_size:
rr = ThriftRecordReader(handle, RunnerCkpt)
while True:
@@ -152,7 +152,7 @@
else:
break
if len(updates) > 0:
- log.debug('select() returning %s updates:' % len(updates))
+ log.debug('select() returning %s updates:', len(updates))
for update in updates:
- log.debug(' = %s' % update)
+ log.debug(' = %s', update)
return updates
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index 4a4678f..32631e6 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -151,7 +151,7 @@
raise ValueError('Log backups cannot be less than one.')
def _log(self, msg, exc_info=None):
- log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg),
+ log.debug('[process:%5s=%s]: %s', self._pid, self.name(), msg,
exc_info=exc_info)
def _getpwuid(self):
@@ -442,7 +442,7 @@
})
wrapped_cmdline = self.wrapped_cmdline(cwd)
- log.debug('Wrapped cmdline: %s' % wrapped_cmdline)
+ log.debug('Wrapped cmdline: %s', wrapped_cmdline)
real_thermos_profile_path = os.path.join(
os.environ['MESOS_DIRECTORY'],
@@ -452,7 +452,7 @@
if os.path.exists(real_thermos_profile_path):
env.update(BASH_ENV=thermos_profile)
- log.debug('ENV is: %s' % env)
+ log.debug('ENV is: %s', env)
subprocess_args = {
'args': wrapped_cmdline,
'close_fds': self.FD_CLOEXEC,
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 1b63c08..79aa68a 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -100,20 +100,20 @@
self._runner = runner
def on_waiting(self, process_update):
- log.debug('Process on_waiting %s' % process_update)
+ log.debug('Process on_waiting %s', process_update)
self._runner._task_processes[process_update.process] = (
self._runner._task_process_from_process_name(
process_update.process, process_update.seq + 1))
self._runner._watcher.register(process_update.process, process_update.seq - 1)
def on_forked(self, process_update):
- log.debug('Process on_forked %s' % process_update)
+ log.debug('Process on_forked %s', process_update)
task_process = self._runner._task_processes[process_update.process]
task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
self._runner._plan.set_running(process_update.process)
def on_running(self, process_update):
- log.debug('Process on_running %s' % process_update)
+ log.debug('Process on_running %s', process_update)
self._runner._plan.set_running(process_update.process)
def _cleanup(self, process_update):
@@ -121,39 +121,39 @@
TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
def on_success(self, process_update):
- log.debug('Process on_success %s' % process_update)
- log.info('Process(%s) finished successfully [rc=%s]' % (
- process_update.process, process_update.return_code))
+ log.debug('Process on_success %s', process_update)
+ log.info('Process(%s) finished successfully [rc=%s]',
+ process_update.process, process_update.return_code)
self._cleanup(process_update)
self._runner._task_processes.pop(process_update.process)
self._runner._watcher.unregister(process_update.process)
self._runner._plan.add_success(process_update.process)
def _on_abnormal(self, process_update):
- log.info('Process %s had an abnormal termination' % process_update.process)
+ log.info('Process %s had an abnormal termination', process_update.process)
self._runner._task_processes.pop(process_update.process)
self._runner._watcher.unregister(process_update.process)
def on_failed(self, process_update):
- log.debug('Process on_failed %s' % process_update)
- log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
+ log.debug('Process on_failed %s', process_update)
+ log.info('Process(%s) failed [rc=%s]', process_update.process, process_update.return_code)
self._cleanup(process_update)
self._on_abnormal(process_update)
self._runner._plan.add_failure(process_update.process)
if process_update.process in self._runner._plan.failed:
- log.info('Process %s reached maximum failures, marking process run failed.' %
+ log.info('Process %s reached maximum failures, marking process run failed.',
process_update.process)
else:
- log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
+ log.info('Process %s under maximum failure limit, restarting.', process_update.process)
def on_lost(self, process_update):
- log.debug('Process on_lost %s' % process_update)
+ log.debug('Process on_lost %s', process_update)
self._cleanup(process_update)
self._on_abnormal(process_update)
self._runner._plan.lost(process_update.process)
def on_killed(self, process_update):
- log.debug('Process on_killed %s' % process_update)
+ log.debug('Process on_killed %s', process_update)
self._cleanup(process_update)
self._runner._task_processes.pop(process_update.process)
self._runner._watcher.unregister(process_update.process)
@@ -177,7 +177,7 @@
self._pathspec = self._runner._pathspec
def on_active(self, task_update):
- log.debug('Task on_active(%s)' % task_update)
+ log.debug('Task on_active(%s)', task_update)
self._runner._plan = self._runner._regular_plan
if self._runner._recovery:
return
@@ -185,12 +185,12 @@
ThermosTaskWrapper(self._runner._task).to_json())
def on_cleaning(self, task_update):
- log.debug('Task on_cleaning(%s)' % task_update)
+ log.debug('Task on_cleaning(%s)', task_update)
self._runner._finalization_start = task_update.timestamp_ms / 1000.0
self._runner._terminate_plan(self._runner._regular_plan)
def on_finalizing(self, task_update):
- log.debug('Task on_finalizing(%s)' % task_update)
+ log.debug('Task on_finalizing(%s)', task_update)
if not self._runner._recovery:
self._runner._kill()
self._runner._plan = self._runner._finalizing_plan
@@ -198,20 +198,20 @@
self._runner._finalization_start = task_update.timestamp_ms / 1000.0
def on_killed(self, task_update):
- log.debug('Task on_killed(%s)' % task_update)
+ log.debug('Task on_killed(%s)', task_update)
self._cleanup()
def on_success(self, task_update):
- log.debug('Task on_success(%s)' % task_update)
+ log.debug('Task on_success(%s)', task_update)
self._cleanup()
log.info('Task succeeded.')
def on_failed(self, task_update):
- log.debug('Task on_failed(%s)' % task_update)
+ log.debug('Task on_failed(%s)', task_update)
self._cleanup()
def on_lost(self, task_update):
- log.debug('Task on_lost(%s)' % task_update)
+ log.debug('Task on_lost(%s)', task_update)
self._cleanup()
def _cleanup(self):
@@ -235,15 +235,15 @@
self._runner._ckpt_write(record)
def on_process_transition(self, state, process_update):
- log.debug('_on_process_transition: %s' % process_update)
+ log.debug('_on_process_transition: %s', process_update)
self._checkpoint(RunnerCkpt(process_status=process_update))
def on_task_transition(self, state, task_update):
- log.debug('_on_task_transition: %s' % task_update)
+ log.debug('_on_task_transition: %s', task_update)
self._checkpoint(RunnerCkpt(task_status=task_update))
def on_initialization(self, header):
- log.debug('_on_initialization: %s' % header)
+ log.debug('_on_initialization: %s', header)
ThermosTaskValidator.assert_valid_task(self._runner.task)
ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
self._checkpoint(RunnerCkpt(runner_header=header))
@@ -413,7 +413,7 @@
log_dir=checkpoint.header.log_dir, task_id=task_id,
portmap=checkpoint.header.ports, hostname=checkpoint.header.hostname)
except Exception as e:
- log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
+ log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s', e, exc_info=True)
return None
def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
@@ -577,8 +577,8 @@
try:
yield
except Exception as e:
- log.error('Caught exception in self.control(): %s' % e)
- log.error(' %s' % traceback.format_exc())
+ log.error('Caught exception in self.control(): %s', e)
+ log.error(' %s', traceback.format_exc())
self._ckpt.close()
def _resume_task(self):
@@ -612,7 +612,7 @@
with open(ckpt_file, 'r') as fp:
ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
for record in ckpt_recover:
- log.debug('Replaying runner checkpoint record: %s' % record)
+ log.debug('Replaying runner checkpoint record: %s', record)
self._dispatcher.dispatch(self._state, record, recovery=True)
def _replay_process_ckpts(self):
@@ -640,7 +640,7 @@
except KeyError:
# This will cause failures downstream, but they will at least be correctly
# reflected in the process state.
- log.error('Unknown user %s.' % self._user)
+ log.error('Unknown user %s.', self._user)
uid = None
header = RunnerHeader(
@@ -677,8 +677,8 @@
def _set_process_status(self, process_name, process_state, **kw):
if 'sequence_number' in kw:
sequence_number = kw.pop('sequence_number')
- log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
- ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+ log.debug('_set_process_status(%s <= %s, seq=%s[force])', process_name,
+ ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)
else:
current_run = self._current_process_run(process_name)
if not current_run:
@@ -686,8 +686,8 @@
sequence_number = 0
else:
sequence_number = current_run.seq + 1
- log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
- ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+ log.debug('_set_process_status(%s <= %s, seq=%s[auto])', process_name,
+ ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)
runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
process=process_name, state=process_state, seq=sequence_number, **kw))
self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
@@ -781,8 +781,8 @@
running = list(plan.running)
runnable = list(plan.runnable_at(now))
waiting = list(plan.waiting_at(now))
- log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
- len(running), len(runnable), len(waiting), plan.is_complete()))
+ log.debug('running:%d runnable:%d waiting:%d complete:%s',
+ len(running), len(runnable), len(waiting), plan.is_complete())
return len(running + runnable + waiting) == 0 and not plan.is_complete()
def is_healthy(self):
@@ -791,9 +791,9 @@
max_failures = self._task.max_failures().get()
deadlocked = self.deadlocked()
under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
- log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
+ log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s',
max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
- not deadlocked and under_failure_limit))
+ not deadlocked and under_failure_limit)
return not deadlocked and under_failure_limit
def _current_process_run(self, process_name):
@@ -822,9 +822,9 @@
return True
if forked_but_never_came_up() or running_but_coordinator_died():
- log.info('Detected a LOST task: %s' % current_run)
- log.debug(' forked_but_never_came_up: %s' % forked_but_never_came_up())
- log.debug(' running_but_coordinator_died: %s' % running_but_coordinator_died())
+ log.info('Detected a LOST task: %s', current_run)
+ log.debug(' forked_but_never_came_up: %s', forked_but_never_came_up())
+ log.debug(' running_but_coordinator_died: %s', running_but_coordinator_died())
return True
return False
@@ -833,8 +833,8 @@
log.debug('Schedule pass:')
running = list(plan.running)
- log.debug('running: %s' % ' '.join(plan.running))
- log.debug('finished: %s' % ' '.join(plan.finished))
+ log.debug('running: %s', ' '.join(plan.running))
+ log.debug('finished: %s', ' '.join(plan.finished))
launched = []
for process_name in plan.running:
@@ -844,8 +844,8 @@
now = self._clock.time()
runnable = list(plan.runnable_at(now))
waiting = list(plan.waiting_at(now))
- log.debug('runnable: %s' % ' '.join(runnable))
- log.debug('waiting: %s' % ' '.join(
+ log.debug('runnable: %s', ' '.join(runnable))
+ log.debug('waiting: %s', ' '.join(
'%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
def pick_processes(process_list):
@@ -862,12 +862,12 @@
else:
self._set_process_status(process_name, ProcessState.WAITING)
tp = self._task_processes[process_name]
- log.info('Forking Process(%s)' % process_name)
+ log.info('Forking Process(%s)', process_name)
try:
tp.start()
launched.append(tp)
except Process.Error as e:
- log.error('Failed to launch process: %s' % e)
+ log.error('Failed to launch process: %s', e)
self._set_process_status(process_name, ProcessState.FAILED)
return len(launched) > 0
@@ -948,15 +948,15 @@
TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
self._set_task_status(runner.transition_to())
continue
- log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
+ log.debug('Run loop: Work to be done within %.1fs', iteration_wait)
# step 2: check child process checkpoint streams for updates
if not self.collect_updates(iteration_wait):
# If we don't collect any updates, at least 'touch' the checkpoint stream
# so as to prevent garbage collection.
elapsed = self._clock.time() - start
if elapsed < iteration_wait:
- log.debug('Update collection only took %.1fs, idling %.1fs' % (
- elapsed, iteration_wait - elapsed))
+ log.debug('Update collection only took %.1fs, idling %.1fs',
+ elapsed, iteration_wait - elapsed)
self._clock.sleep(iteration_wait - elapsed)
log.debug('Run loop: No updates collected, touching checkpoint.')
os.utime(self._pathspec.getpath('runner_checkpoint'), None)
@@ -969,8 +969,8 @@
Kill all processes associated with this task and set task/process states as terminal_status
(defaults to KILLED)
"""
- log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
- force, preemption_wait))
+ log.debug('Runner issued kill: force:%s, preemption_wait:%s',
+ force, preemption_wait)
assert terminal_status in (TaskState.KILLED, TaskState.LOST)
self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
with self.control(force):
@@ -995,17 +995,17 @@
coordinator_pid, pid, tree = pid_tuple
if TaskRunnerHelper.is_process_terminal(current_run.state):
if coordinator_pid or pid or tree:
- log.warning('Terminal process (%s) still has running pids:' % process)
- log.warning(' coordinator_pid: %s' % coordinator_pid)
- log.warning(' pid: %s' % pid)
- log.warning(' tree: %s' % tree)
+ log.warning('Terminal process (%s) still has running pids:', process)
+ log.warning(' coordinator_pid: %s', coordinator_pid)
+ log.warning(' pid: %s', pid)
+ log.warning(' tree: %s', tree)
TaskRunnerHelper.kill_process(self.state, process)
else:
if coordinator_pid or pid or tree:
- log.info('Transitioning %s to KILLED' % process)
+ log.info('Transitioning %s to KILLED', process)
self._set_process_status(process, ProcessState.KILLED,
stop_time=self._clock.time(), return_code=-1)
else:
- log.info('Transitioning %s to LOST' % process)
+ log.info('Transitioning %s to LOST', process)
if current_run.state != ProcessState.WAITING:
self._set_process_status(process, ProcessState.LOST)
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
index 52c5d74..986d33a 100644
--- a/src/main/python/apache/thermos/monitoring/disk.py
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -40,8 +40,8 @@
def run(self):
start = time.time()
self.value = du(self.path)
- log.debug("DiskCollectorThread: finished collection of %s in %.1fms" % (
- self.path, 1000.0 * (time.time() - start)))
+ log.debug("DiskCollectorThread: finished collection of %s in %.1fms",
+ self.path, 1000.0 * (time.time() - start))
self.event.set()
def finished(self):
diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py
index d77703e..3ab1e48 100644
--- a/src/main/python/apache/thermos/monitoring/monitor.py
+++ b/src/main/python/apache/thermos/monitoring/monitor.py
@@ -80,7 +80,7 @@
try:
self._dispatcher.dispatch(self._runnerstate, runner_update)
except CheckpointDispatcher.InvalidSequenceNumber as e:
- log.error('Checkpoint stream is corrupt: %s' % e)
+ log.error('Checkpoint stream is corrupt: %s', e)
break
new_ckpt_head = fp.tell()
updated = self._ckpt_head != new_ckpt_head
@@ -89,7 +89,7 @@
except OSError as e:
if e.errno == errno.ENOENT:
# The log doesn't yet exist, will retry later.
- log.warning('Could not read from checkpoint %s' % self._runner_ckpt)
+ log.warning('Could not read from checkpoint %s', self._runner_ckpt)
return False
else:
raise
diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
index 3000e95..e13d6dd 100644
--- a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
+++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
@@ -39,7 +39,7 @@
threads = process.num_threads()
return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
except (AccessDenied, NoSuchProcess) as e:
- log.debug('Error during process sampling [pid=%s]: %s' % (process.pid, e))
+ log.debug('Error during process sampling [pid=%s]: %s', process.pid, e)
return ProcessSample.empty()
@@ -72,7 +72,7 @@
new_samples[self._pid] = parent_sample
except (IOError, PsutilError) as e:
- log.debug('Error during process sampling: %s' % e)
+ log.debug('Error during process sampling: %s', e)
self._sample = ProcessSample.empty()
self._rate = 0.0
@@ -90,7 +90,7 @@
new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
- log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
+ log.debug("Calculated rate for pid=%s and children: %s", self._process.pid, self._rate)
self._sampled_tree = new_samples
@property
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index f5e3849..adcdc75 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -137,7 +137,7 @@
history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
if history_length > self.MAX_HISTORY:
raise ValueError("Requested history length too large")
- log.debug("Initialising ResourceHistory of length %s" % history_length)
+ log.debug("Initialising ResourceHistory of length %s", history_length)
return ResourceHistory(history_length)
@@ -166,7 +166,7 @@
"""
self._task_monitor = task_monitor # exposes PIDs, sandbox
self._task_id = task_id
- log.debug('Initialising resource collection for task %s' % self._task_id)
+ log.debug('Initialising resource collection for task %s', self._task_id)
self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
self._disk_collector_class = disk_collector
self._disk_collector = None
@@ -225,7 +225,7 @@
"""Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
collating samples."""
- log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
+ log.debug('Commencing resource monitoring for task "%s"', self._task_id)
next_process_collection = 0
next_disk_collection = 0
@@ -252,7 +252,7 @@
if self._disk_collector:
self._disk_collector.sample()
else:
- log.debug('No sandbox detected yet for %s' % self._task_id)
+ log.debug('No sandbox detected yet for %s', self._task_id)
try:
disk_usage = self._disk_collector.value if self._disk_collector else 0
@@ -264,10 +264,10 @@
self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage))
except ValueError as err:
- log.warning("Error recording resource sample: %s" % err)
+ log.warning("Error recording resource sample: %s", err)
- log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % (
- self._task_id, (time.time() - now)))
+ log.debug("TaskResourceMonitor: finished collection of %s in %.2fs",
+ self._task_id, (time.time() - now))
# Sleep until any of the following conditions are met:
# - it's time for the next disk collection
@@ -288,4 +288,4 @@
log.warning('Task resource collection is backlogged. Consider increasing '
'process_collection_interval and disk_collection_interval.')
- log.debug('Stopping resource monitoring for task "%s"' % self._task_id)
+ log.debug('Stopping resource monitoring for task "%s"', self._task_id)
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
index d099665..dc740fb 100644
--- a/src/main/python/apache/thermos/observer/http/file_browser.py
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -42,7 +42,7 @@
try:
fstat = os.stat(filename)
except Exception as e:
- log.error('Could not read from %s: %s' % (filename, e))
+ log.error('Could not read from %s: %s', filename, e)
return {}
if offset == -1:
@@ -56,7 +56,7 @@
try:
data = fp.read(length)
except IOError as e:
- log.error('Failed to read %s: %s' % (filename, e), exc_info=True)
+ log.error('Failed to read %s: %s', filename, e, exc_info=True)
return {}
if data:
diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py
index 5bfc4f2..c81383c 100644
--- a/src/main/python/apache/thermos/observer/http/http_observer.py
+++ b/src/main/python/apache/thermos/observer/http/http_observer.py
@@ -134,5 +134,5 @@
}
template['process'].update(**all_processes[current_run_number].get('used', {}))
template['runs'] = all_processes
- log.debug('Rendering template is: %s' % template)
+ log.debug('Rendering template is: %s', template)
return template
diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py
index 83adeb3..334c937 100644
--- a/src/main/python/apache/thermos/observer/http/static_assets.py
+++ b/src/main/python/apache/thermos/observer/http/static_assets.py
@@ -35,7 +35,7 @@
assets = pkg_resources.resource_listdir(__name__, 'assets')
cached_assets = {}
for asset in assets:
- log.info(' detected asset: %s' % asset)
+ log.info(' detected asset: %s', asset)
cached_assets[asset] = pkg_resources.resource_string(
__name__, os.path.join('assets', asset))
self._assets = cached_assets
diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py
index 08540e1..cb90c8b 100644
--- a/src/main/python/apache/thermos/observer/observed_task.py
+++ b/src/main/python/apache/thermos/observer/observed_task.py
@@ -60,11 +60,11 @@
if os.path.exists(path):
task = ThermosTaskWrapper.from_file(path)
if task is None:
- log.error('Error reading ThermosTask from %s in observer.' % path)
+ log.error('Error reading ThermosTask from %s in observer.', path)
else:
context = self.context(self._task_id)
if not context:
- log.warning('Task not yet available: %s' % self._task_id)
+ log.warning('Task not yet available: %s', self._task_id)
task = task.task() % Environment(thermos=context)
memoized[self._task_id] = task
@@ -77,7 +77,7 @@
if mtime is None:
mtime = self.safe_mtime(get_path('finished'))
if mtime is None:
- log.error("Couldn't get mtime for task %s!" % self._task_id)
+ log.error("Couldn't get mtime for task %s!", self._task_id)
return mtime
def context(self, task_id):
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index 4bb5d23..a6870d4 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -96,9 +96,9 @@
ExceptionalThread.start(self)
def __on_active(self, root, task_id):
- log.debug('on_active(%r, %r)' % (root, task_id))
+ log.debug('on_active(%r, %r)', root, task_id)
if task_id in self.finished_tasks:
- log.error('Found an active task (%s) in finished tasks?' % task_id)
+ log.error('Found an active task (%s) in finished tasks?', task_id)
return
task_monitor = TaskMonitor(root, task_id)
resource_monitor = TaskResourceMonitor(
@@ -115,14 +115,14 @@
)
def __on_finished(self, root, task_id):
- log.debug('on_finished(%r, %r)' % (root, task_id))
+ log.debug('on_finished(%r, %r)', root, task_id)
active_task = self._active_tasks.pop(task_id, None)
if active_task:
active_task.resource_monitor.kill()
self._finished_tasks[task_id] = FinishedObservedTask(root, task_id)
def __on_removed(self, root, task_id):
- log.debug('on_removed(%r, %r)' % (root, task_id))
+ log.debug('on_removed(%r, %r)', root, task_id)
active_task = self._active_tasks.pop(task_id, None)
if active_task:
active_task.resource_monitor.kill()
@@ -139,7 +139,7 @@
with self.lock:
start = time.time()
self._detector.refresh()
- log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start))
+ log.debug("TaskObserver: finished checkpoint refresh in %.2fs", time.time() - start)
@Lockable.sync
def process_from_name(self, task_id, process_id):
@@ -178,7 +178,7 @@
}.get(type, None)
if tasks is None:
- log.error('Unknown task type %s' % type)
+ log.error('Unknown task type %s', type)
return {}
return tasks
@@ -313,7 +313,7 @@
resource_sample = self.active_tasks[task_id].resource_monitor.sample()[1]
sample = resource_sample.process_sample.to_dict()
sample['disk'] = resource_sample.disk_usage
- log.debug("Got sample for task %s: %s" % (task_id, sample))
+ log.debug("Got sample for task %s: %s", task_id, sample)
return sample
@Lockable.sync
@@ -390,7 +390,7 @@
task = self.all_tasks[task_id].task
if task is None:
# TODO(wickman) Can this happen?
- log.error('Could not find task: %s' % task_id)
+ log.error('Could not find task: %s', task_id)
return {}
state = self.raw_state(task_id)
@@ -425,7 +425,7 @@
if task_id not in self.active_tasks:
return ProcessSample.empty().to_dict()
sample = self.active_tasks[task_id].resource_monitor.sample_by_process(process_name).to_dict()
- log.debug('Resource consumption (%s, %s) => %s' % (task_id, process_name, sample))
+ log.debug('Resource consumption (%s, %s) => %s', task_id, process_name, sample)
return sample
@Lockable.sync
diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py
index fa4f0fb..c434000 100644
--- a/src/main/python/apache/thermos/runner/thermos_runner.py
+++ b/src/main/python/apache/thermos/runner/thermos_runner.py
@@ -176,13 +176,13 @@
def runner_teardown(runner, sig=signal.SIGUSR1, frame=None):
"""Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)"""
op = 'kill' if sig == signal.SIGUSR1 else 'lose'
- log.info('Thermos runner got signal %s, shutting down.' % sig)
+ log.info('Thermos runner got signal %s, shutting down.', sig)
log.info('Interrupted frame:')
if frame:
for line in ''.join(traceback.format_stack(frame)).splitlines():
log.info(line)
runner.close_ckpt()
- log.info('Calling runner.%s()' % op)
+ log.info('Calling runner.%s()', op)
getattr(runner, op)()
sys.exit(0)
@@ -201,7 +201,7 @@
missing_ports = set(thermos_task.ports()) - set(prebound_ports)
if missing_ports:
- log.error('ERROR! Unbound ports: %s' % ' '.join(port for port in missing_ports))
+ log.error('ERROR! Unbound ports: %s', ' '.join(port for port in missing_ports))
sys.exit(INTERNAL_ERROR)
if opts.setuid:
@@ -213,7 +213,7 @@
try:
pwd.getpwnam(user).pw_uid
except KeyError:
- log.error('Unknown user: %s' % user)
+ log.error('Unknown user: %s', user)
sys.exit(UNKNOWN_USER)
task_runner = TaskRunner(
@@ -240,22 +240,22 @@
try:
task_runner.run()
except TaskRunner.InternalError as err:
- log.error('Internal error: %s' % err)
+ log.error('Internal error: %s', err)
sys.exit(INTERNAL_ERROR)
except TaskRunner.InvalidTask as err:
- log.error('Invalid task: %s' % err)
+ log.error('Invalid task: %s', err)
sys.exit(INVALID_TASK)
except TaskRunner.StateError as err:
- log.error('Checkpoint error: %s' % err)
+ log.error('Checkpoint error: %s', err)
sys.exit(TERMINAL_TASK)
except Process.UnknownUserError as err:
- log.error('User ceased to exist: %s' % err)
+ log.error('User ceased to exist: %s', err)
sys.exit(UNKNOWN_USER)
except KeyboardInterrupt:
log.info('Caught ^C, tearing down runner.')
runner_teardown(task_runner)
except Exception as e:
- log.error('Unknown exception: %s' % e)
+ log.error('Unknown exception: %s', e)
for line in traceback.format_exc().splitlines():
log.error(line)
sys.exit(UNKNOWN_ERROR)