Switch Thermos to lazy log formatting

This is the first part of a small series of Thermos observer performance
improvements.

As a first iteration, this switches all logging to use the logger-embedded
formatting rather than doing it eager up front. This has the advantage that
we produce less garbage if debug logging is disabled.

Reviewed at https://reviews.apache.org/r/66136/
diff --git a/src/main/python/apache/thermos/common/ckpt.py b/src/main/python/apache/thermos/common/ckpt.py
index e79ec6a..cb254a4 100644
--- a/src/main/python/apache/thermos/common/ckpt.py
+++ b/src/main/python/apache/thermos/common/ckpt.py
@@ -186,7 +186,7 @@
         builder.dispatch(state, update, truncate=truncate)
       return state
     except cls.Error as e:
-      log.error('Failed to recover from %s: %s' % (filename, e))
+      log.error('Failed to recover from %s: %s', filename, e)
 
   def __init__(self):
     self._task_handlers = []
@@ -325,7 +325,7 @@
         raise self.ErrorRecoveringState(
           "Attempting to rebind task with different parameters!")
       else:
-        log.debug('Initializing TaskRunner header to %s' % runner_ckpt.runner_header)
+        log.debug('Initializing TaskRunner header to %s', runner_ckpt.runner_header)
         state.header = runner_ckpt.runner_header
         self._run_header_dispatch(runner_ckpt.runner_header)
       return
@@ -343,9 +343,9 @@
       else:
         state.statuses = [runner_ckpt.task_status]
       new_state = runner_ckpt.task_status.state
-      log.debug('Flipping task state from %s to %s' % (
+      log.debug('Flipping task state from %s to %s',
         TaskState._VALUES_TO_NAMES.get(old_state, '(undefined)'),
-        TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)')))
+        TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)'))
       self._run_task_dispatch(new_state, runner_ckpt.task_status)
       return
 
@@ -358,11 +358,11 @@
       current_run = state.processes[name][-1] if name in state.processes else None
       if current_run and process_update.seq != current_run.seq + 1:
         if recovery:
-          log.debug('Skipping replayed out-of-order update: %s' % process_update)
+          log.debug('Skipping replayed out-of-order update: %s', process_update)
           return
         else:
           raise self.InvalidSequenceNumber(
-            "Out of order sequence number! %s => %s" % (current_run, process_update))
+            "Out of order sequence number! %s => %s", current_run, process_update)
 
       # One special case for WAITING: Initialize a new target ProcessState.
       if process_update.state == ProcessState.WAITING:
@@ -376,7 +376,7 @@
             state.processes[name] = [ProcessStatus(seq=current_run.seq)]
 
       # Run the process state machine.
-      log.debug('Running state machine for process=%s/seq=%s' % (name, process_update.seq))
+      log.debug('Running state machine for process=%s/seq=%s', name, process_update.seq)
       if not state.processes or name not in state.processes:
         raise self.ErrorRecoveringState("Encountered potentially out of order "
           "process update.  Are you sure this is a full checkpoint stream?")
diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py
index 0811e84..a402498 100644
--- a/src/main/python/apache/thermos/core/helper.py
+++ b/src/main/python/apache/thermos/core/helper.py
@@ -85,8 +85,8 @@
     process_create_time = process.create_time()
 
     if abs(start_time - process_create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
-      log.info("Expected pid %s start time to be %s but it's %s" % (
-          process.pid, start_time, process_create_time))
+      log.info("Expected pid %s start time to be %s but it's %s",
+          process.pid, start_time, process_create_time)
       return False
 
     if uid is not None:
@@ -104,12 +104,12 @@
       elif uid == 0:
         # If the process was launched as root but is now not root, we should
         # kill this because it could have called `setuid` on itself.
-        log.info("pid %s appears to be have launched by root but it's uid is now %s" % (
-            process.pid, process_uid))
+        log.info("pid %s appears to be have launched by root but it's uid is now %s",
+            process.pid, process_uid)
         return True
       else:
-        log.info("Expected pid %s to be ours but the pid uid is %s and we're %s" % (
-            process.pid, process_uid, uid))
+        log.info("Expected pid %s to be ours but the pid uid is %s and we're %s",
+            process.pid, process_uid, uid)
         return False
 
     try:
@@ -121,8 +121,8 @@
       # If the uid was not provided, we must use user -- which is possibly flaky if the
       # user gets deleted from the system, so process_user will be None and we must
       # return False.
-      log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
-          process.pid, process_user, user))
+      log.info("Expected pid %s to be ours but the pid user is %s and we're %s",
+          process.pid, process_user, user)
       return True
 
     return False
@@ -141,7 +141,7 @@
     coordinator_pid, pid, tree = None, None, set()
 
     if uid is None:
-      log.debug('Legacy thermos checkpoint stream detected, user = %s' % user)
+      log.debug('Legacy thermos checkpoint stream detected, user = %s', user)
 
     if process_run.coordinator_pid:
       try:
@@ -149,11 +149,11 @@
         if cls.this_is_really_our_pid(coordinator_process, uid, user, process_run.fork_time):
           coordinator_pid = process_run.coordinator_pid
       except psutil.NoSuchProcess:
-        log.info('  Coordinator %s [pid: %s] completed.' % (process_run.process,
-            process_run.coordinator_pid))
+        log.info('  Coordinator %s [pid: %s] completed.', process_run.process,
+            process_run.coordinator_pid)
       except psutil.Error as err:
-        log.warning('  Error gathering information on pid %s: %s' % (process_run.coordinator_pid,
-            err))
+        log.warning('  Error gathering information on pid %s: %s', process_run.coordinator_pid,
+            err)
 
     if process_run.pid:
       try:
@@ -161,15 +161,15 @@
         if cls.this_is_really_our_pid(process, uid, user, process_run.start_time):
           pid = process.pid
       except psutil.NoSuchProcess:
-        log.info('  Process %s [pid: %s] completed.' % (process_run.process, process_run.pid))
+        log.info('  Process %s [pid: %s] completed.', process_run.process, process_run.pid)
       except psutil.Error as err:
-        log.warning('  Error gathering information on pid %s: %s' % (process_run.pid, err))
+        log.warning('  Error gathering information on pid %s: %s', process_run.pid, err)
       else:
         if pid:
           try:
             tree = set(child.pid for child in process.children(recursive=True))
           except psutil.Error:
-            log.warning('  Error gathering information on children of pid %s' % pid)
+            log.warning('  Error gathering information on children of pid %s', pid)
 
     return (coordinator_pid, pid, tree)
 
@@ -192,9 +192,9 @@
       os.kill(pid, sig)
     except OSError as e:
       if e.errno not in (errno.ESRCH, errno.EPERM):
-        log.error('Unexpected error in os.kill: %s' % e)
+        log.error('Unexpected error in os.kill: %s', e)
     except Exception as e:
-      log.error('Unexpected error in os.kill: %s' % e)
+      log.error('Unexpected error in os.kill: %s', e)
 
   @classmethod
   def terminate_pid(cls, pid):
@@ -240,16 +240,16 @@
 
   @classmethod
   def terminate_process(cls, state, process_name):
-    log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name)
+    log.debug('TaskRunnerHelper.terminate_process(%s)', process_name)
     _, pid, _ = cls._get_process_tuple(state, process_name)
     if pid:
-      log.debug('   => SIGTERM pid %s' % pid)
+      log.debug('   => SIGTERM pid %s', pid)
       cls.terminate_pid(pid)
     return bool(pid)
 
   @classmethod
   def kill_process(cls, state, process_name):
-    log.debug('TaskRunnerHelper.kill_process(%s)' % process_name)
+    log.debug('TaskRunnerHelper.kill_process(%s)', process_name)
     coordinator_pgid = cls._get_coordinator_group(state, process_name)
     coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name)
     # This is super dangerous.  TODO(wickman)  Add a heuristic that determines
@@ -257,16 +257,16 @@
     #  and 2) those processes have inherited the coordinator checkpoint filehandle
     # This way we validate that it is in fact the process group we expect.
     if coordinator_pgid:
-      log.debug('   => SIGKILL coordinator group %s' % coordinator_pgid)
+      log.debug('   => SIGKILL coordinator group %s', coordinator_pgid)
       cls.kill_group(coordinator_pgid)
     if coordinator_pid:
-      log.debug('   => SIGKILL coordinator %s' % coordinator_pid)
+      log.debug('   => SIGKILL coordinator %s', coordinator_pid)
       cls.kill_pid(coordinator_pid)
     if pid:
-      log.debug('   => SIGKILL pid %s' % pid)
+      log.debug('   => SIGKILL pid %s', pid)
       cls.kill_pid(pid)
     for child in tree:
-      log.debug('   => SIGKILL child %s' % child)
+      log.debug('   => SIGKILL child %s', child)
       cls.kill_pid(child)
     return bool(coordinator_pid or pid or tree)
 
@@ -380,11 +380,11 @@
         if pid == 0:
           break
         pids.add(pid)
-        log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % (
-          pid, status, rusage))
+        log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s',
+          pid, status, rusage)
       except OSError as e:
         if e.errno != errno.ECHILD:
-          log.warning('Unexpected error when calling waitpid: %s' % e)
+          log.warning('Unexpected error when calling waitpid: %s', e)
         break
 
     return pids
diff --git a/src/main/python/apache/thermos/core/muxer.py b/src/main/python/apache/thermos/core/muxer.py
index 47e77f7..b095d75 100644
--- a/src/main/python/apache/thermos/core/muxer.py
+++ b/src/main/python/apache/thermos/core/muxer.py
@@ -37,7 +37,7 @@
       fp.close()
 
   def register(self, process_name, watermark=0):
-    log.debug('registering %s' % process_name)
+    log.debug('registering %s', process_name)
     if process_name in self._processes:
       raise self.ProcessExists("Process %s is already registered" % process_name)
     self._processes[process_name] = None
@@ -47,7 +47,7 @@
     for process_name, fp in self._processes.items():
       if fp is None:
         process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
-        log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
+        log.debug('ProcessMuxer binding %s => %s', process_name, process_ckpt)
         try:
           self._processes[process_name] = open(process_ckpt, 'r')  # noqa
         except IOError as e:
@@ -55,14 +55,14 @@
             log.debug('  => bind failed, checkpoint not available yet.')
             continue
           else:
-            log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+            log.error("Unexpected inability to open %s! %s", process_ckpt, e)
         except Exception as e:
-          log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+          log.error("Unexpected inability to open %s! %s", process_ckpt, e)
         self._fast_forward_stream(process_name)
 
   def _fast_forward_stream(self, process_name):
-    log.debug('Fast forwarding %s stream to seq=%s' % (process_name,
-      self._watermarks[process_name]))
+    log.debug('Fast forwarding %s stream to seq=%s', process_name,
+      self._watermarks[process_name])
     assert self._processes.get(process_name) is not None
     fp = self._processes[process_name]
     rr = ThriftRecordReader(fp, RunnerCkpt)
@@ -75,25 +75,25 @@
         break
       new_watermark = record.process_status.seq
       if new_watermark > self._watermarks[process_name]:
-        log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
-          process_name, new_watermark, self._watermarks[process_name]))
+        log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.',
+          process_name, new_watermark, self._watermarks[process_name])
         fp.seek(last_pos)
         break
       current_watermark = new_watermark
       records += 1
 
     if current_watermark < self._watermarks[process_name]:
-      log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
-         process_name, current_watermark, self._watermarks[process_name]))
+      log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s',
+         process_name, current_watermark, self._watermarks[process_name])
 
     if records:
-      log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
-        current_watermark))
+      log.debug('Fast forwarded %s %s record(s) to seq=%s.', process_name, records,
+        current_watermark)
 
   def unregister(self, process_name):
     log.debug('unregistering %s' % process_name)
     if process_name not in self._processes:
-      raise self.ProcessNotFound("No trace of process: %s" % process_name)
+      raise self.ProcessNotFound("No trace of process: %s", process_name)
     else:
       self._watermarks.pop(process_name)
       fp = self._processes.pop(process_name)
@@ -114,7 +114,7 @@
     try:
       os.fstat(fp.fileno()).st_size
     except OSError:
-      log.debug('ProcessMuxer could not fstat for process %s' % process)
+      log.debug('ProcessMuxer could not fstat for process %s', process)
       return False
     update = rr.try_read()
     if update:
@@ -139,10 +139,10 @@
       try:
         fstat = os.fstat(handle.fileno())
       except OSError:
-        log.error('Unable to fstat %s!' % handle.name)
+        log.error('Unable to fstat %s!', handle.name)
         continue
       if handle.tell() > fstat.st_size:
-        log.error('Truncated checkpoint record detected on %s!' % handle.name)
+        log.error('Truncated checkpoint record detected on %s!', handle.name)
       elif handle.tell() < fstat.st_size:
         rr = ThriftRecordReader(handle, RunnerCkpt)
         while True:
@@ -152,7 +152,7 @@
           else:
             break
     if len(updates) > 0:
-      log.debug('select() returning %s updates:' % len(updates))
+      log.debug('select() returning %s updates:', len(updates))
       for update in updates:
-        log.debug('  = %s' % update)
+        log.debug('  = %s', update)
     return updates
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index 4a4678f..32631e6 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -151,7 +151,7 @@
         raise ValueError('Log backups cannot be less than one.')
 
   def _log(self, msg, exc_info=None):
-    log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg),
+    log.debug('[process:%5s=%s]: %s', self._pid, self.name(), msg,
             exc_info=exc_info)
 
   def _getpwuid(self):
@@ -442,7 +442,7 @@
     })
 
     wrapped_cmdline = self.wrapped_cmdline(cwd)
-    log.debug('Wrapped cmdline: %s' % wrapped_cmdline)
+    log.debug('Wrapped cmdline: %s', wrapped_cmdline)
 
     real_thermos_profile_path = os.path.join(
         os.environ['MESOS_DIRECTORY'],
@@ -452,7 +452,7 @@
     if os.path.exists(real_thermos_profile_path):
       env.update(BASH_ENV=thermos_profile)
 
-    log.debug('ENV is: %s' % env)
+    log.debug('ENV is: %s', env)
     subprocess_args = {
       'args': wrapped_cmdline,
       'close_fds': self.FD_CLOEXEC,
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 1b63c08..79aa68a 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -100,20 +100,20 @@
     self._runner = runner
 
   def on_waiting(self, process_update):
-    log.debug('Process on_waiting %s' % process_update)
+    log.debug('Process on_waiting %s', process_update)
     self._runner._task_processes[process_update.process] = (
       self._runner._task_process_from_process_name(
         process_update.process, process_update.seq + 1))
     self._runner._watcher.register(process_update.process, process_update.seq - 1)
 
   def on_forked(self, process_update):
-    log.debug('Process on_forked %s' % process_update)
+    log.debug('Process on_forked %s', process_update)
     task_process = self._runner._task_processes[process_update.process]
     task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
     self._runner._plan.set_running(process_update.process)
 
   def on_running(self, process_update):
-    log.debug('Process on_running %s' % process_update)
+    log.debug('Process on_running %s', process_update)
     self._runner._plan.set_running(process_update.process)
 
   def _cleanup(self, process_update):
@@ -121,39 +121,39 @@
       TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
 
   def on_success(self, process_update):
-    log.debug('Process on_success %s' % process_update)
-    log.info('Process(%s) finished successfully [rc=%s]' % (
-      process_update.process, process_update.return_code))
+    log.debug('Process on_success %s', process_update)
+    log.info('Process(%s) finished successfully [rc=%s]',
+      process_update.process, process_update.return_code)
     self._cleanup(process_update)
     self._runner._task_processes.pop(process_update.process)
     self._runner._watcher.unregister(process_update.process)
     self._runner._plan.add_success(process_update.process)
 
   def _on_abnormal(self, process_update):
-    log.info('Process %s had an abnormal termination' % process_update.process)
+    log.info('Process %s had an abnormal termination', process_update.process)
     self._runner._task_processes.pop(process_update.process)
     self._runner._watcher.unregister(process_update.process)
 
   def on_failed(self, process_update):
-    log.debug('Process on_failed %s' % process_update)
-    log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
+    log.debug('Process on_failed %s', process_update)
+    log.info('Process(%s) failed [rc=%s]', process_update.process, process_update.return_code)
     self._cleanup(process_update)
     self._on_abnormal(process_update)
     self._runner._plan.add_failure(process_update.process)
     if process_update.process in self._runner._plan.failed:
-      log.info('Process %s reached maximum failures, marking process run failed.' %
+      log.info('Process %s reached maximum failures, marking process run failed.',
           process_update.process)
     else:
-      log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
+      log.info('Process %s under maximum failure limit, restarting.', process_update.process)
 
   def on_lost(self, process_update):
-    log.debug('Process on_lost %s' % process_update)
+    log.debug('Process on_lost %s', process_update)
     self._cleanup(process_update)
     self._on_abnormal(process_update)
     self._runner._plan.lost(process_update.process)
 
   def on_killed(self, process_update):
-    log.debug('Process on_killed %s' % process_update)
+    log.debug('Process on_killed %s', process_update)
     self._cleanup(process_update)
     self._runner._task_processes.pop(process_update.process)
     self._runner._watcher.unregister(process_update.process)
@@ -177,7 +177,7 @@
     self._pathspec = self._runner._pathspec
 
   def on_active(self, task_update):
-    log.debug('Task on_active(%s)' % task_update)
+    log.debug('Task on_active(%s)', task_update)
     self._runner._plan = self._runner._regular_plan
     if self._runner._recovery:
       return
@@ -185,12 +185,12 @@
         ThermosTaskWrapper(self._runner._task).to_json())
 
   def on_cleaning(self, task_update):
-    log.debug('Task on_cleaning(%s)' % task_update)
+    log.debug('Task on_cleaning(%s)', task_update)
     self._runner._finalization_start = task_update.timestamp_ms / 1000.0
     self._runner._terminate_plan(self._runner._regular_plan)
 
   def on_finalizing(self, task_update):
-    log.debug('Task on_finalizing(%s)' % task_update)
+    log.debug('Task on_finalizing(%s)', task_update)
     if not self._runner._recovery:
       self._runner._kill()
     self._runner._plan = self._runner._finalizing_plan
@@ -198,20 +198,20 @@
       self._runner._finalization_start = task_update.timestamp_ms / 1000.0
 
   def on_killed(self, task_update):
-    log.debug('Task on_killed(%s)' % task_update)
+    log.debug('Task on_killed(%s)', task_update)
     self._cleanup()
 
   def on_success(self, task_update):
-    log.debug('Task on_success(%s)' % task_update)
+    log.debug('Task on_success(%s)', task_update)
     self._cleanup()
     log.info('Task succeeded.')
 
   def on_failed(self, task_update):
-    log.debug('Task on_failed(%s)' % task_update)
+    log.debug('Task on_failed(%s)', task_update)
     self._cleanup()
 
   def on_lost(self, task_update):
-    log.debug('Task on_lost(%s)' % task_update)
+    log.debug('Task on_lost(%s)', task_update)
     self._cleanup()
 
   def _cleanup(self):
@@ -235,15 +235,15 @@
     self._runner._ckpt_write(record)
 
   def on_process_transition(self, state, process_update):
-    log.debug('_on_process_transition: %s' % process_update)
+    log.debug('_on_process_transition: %s', process_update)
     self._checkpoint(RunnerCkpt(process_status=process_update))
 
   def on_task_transition(self, state, task_update):
-    log.debug('_on_task_transition: %s' % task_update)
+    log.debug('_on_task_transition: %s', task_update)
     self._checkpoint(RunnerCkpt(task_status=task_update))
 
   def on_initialization(self, header):
-    log.debug('_on_initialization: %s' % header)
+    log.debug('_on_initialization: %s', header)
     ThermosTaskValidator.assert_valid_task(self._runner.task)
     ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
     self._checkpoint(RunnerCkpt(runner_header=header))
@@ -413,7 +413,7 @@
                  log_dir=checkpoint.header.log_dir, task_id=task_id,
                  portmap=checkpoint.header.ports, hostname=checkpoint.header.hostname)
     except Exception as e:
-      log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
+      log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s', e, exc_info=True)
       return None
 
   def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
@@ -577,8 +577,8 @@
     try:
       yield
     except Exception as e:
-      log.error('Caught exception in self.control(): %s' % e)
-      log.error('  %s' % traceback.format_exc())
+      log.error('Caught exception in self.control(): %s', e)
+      log.error('  %s', traceback.format_exc())
     self._ckpt.close()
 
   def _resume_task(self):
@@ -612,7 +612,7 @@
       with open(ckpt_file, 'r') as fp:
         ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
         for record in ckpt_recover:
-          log.debug('Replaying runner checkpoint record: %s' % record)
+          log.debug('Replaying runner checkpoint record: %s', record)
           self._dispatcher.dispatch(self._state, record, recovery=True)
 
   def _replay_process_ckpts(self):
@@ -640,7 +640,7 @@
       except KeyError:
         # This will cause failures downstream, but they will at least be correctly
         # reflected in the process state.
-        log.error('Unknown user %s.' % self._user)
+        log.error('Unknown user %s.', self._user)
         uid = None
 
       header = RunnerHeader(
@@ -677,8 +677,8 @@
   def _set_process_status(self, process_name, process_state, **kw):
     if 'sequence_number' in kw:
       sequence_number = kw.pop('sequence_number')
-      log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
-        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+      log.debug('_set_process_status(%s <= %s, seq=%s[force])', process_name,
+        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)
     else:
       current_run = self._current_process_run(process_name)
       if not current_run:
@@ -686,8 +686,8 @@
         sequence_number = 0
       else:
         sequence_number = current_run.seq + 1
-      log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
-        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+      log.debug('_set_process_status(%s <= %s, seq=%s[auto])', process_name,
+        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number)
     runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
         process=process_name, state=process_state, seq=sequence_number, **kw))
     self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
@@ -781,8 +781,8 @@
     running = list(plan.running)
     runnable = list(plan.runnable_at(now))
     waiting = list(plan.waiting_at(now))
-    log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
-      len(running), len(runnable), len(waiting), plan.is_complete()))
+    log.debug('running:%d runnable:%d waiting:%d complete:%s',
+      len(running), len(runnable), len(waiting), plan.is_complete())
     return len(running + runnable + waiting) == 0 and not plan.is_complete()
 
   def is_healthy(self):
@@ -791,9 +791,9 @@
     max_failures = self._task.max_failures().get()
     deadlocked = self.deadlocked()
     under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
-    log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
+    log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s',
       max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
-      not deadlocked and under_failure_limit))
+      not deadlocked and under_failure_limit)
     return not deadlocked and under_failure_limit
 
   def _current_process_run(self, process_name):
@@ -822,9 +822,9 @@
       return True
 
     if forked_but_never_came_up() or running_but_coordinator_died():
-      log.info('Detected a LOST task: %s' % current_run)
-      log.debug('  forked_but_never_came_up: %s' % forked_but_never_came_up())
-      log.debug('  running_but_coordinator_died: %s' % running_but_coordinator_died())
+      log.info('Detected a LOST task: %s', current_run)
+      log.debug('  forked_but_never_came_up: %s', forked_but_never_came_up())
+      log.debug('  running_but_coordinator_died: %s', running_but_coordinator_died())
       return True
 
     return False
@@ -833,8 +833,8 @@
     log.debug('Schedule pass:')
 
     running = list(plan.running)
-    log.debug('running: %s' % ' '.join(plan.running))
-    log.debug('finished: %s' % ' '.join(plan.finished))
+    log.debug('running: %s', ' '.join(plan.running))
+    log.debug('finished: %s', ' '.join(plan.finished))
 
     launched = []
     for process_name in plan.running:
@@ -844,8 +844,8 @@
     now = self._clock.time()
     runnable = list(plan.runnable_at(now))
     waiting = list(plan.waiting_at(now))
-    log.debug('runnable: %s' % ' '.join(runnable))
-    log.debug('waiting: %s' % ' '.join(
+    log.debug('runnable: %s', ' '.join(runnable))
+    log.debug('waiting: %s', ' '.join(
         '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
 
     def pick_processes(process_list):
@@ -862,12 +862,12 @@
       else:
         self._set_process_status(process_name, ProcessState.WAITING)
         tp = self._task_processes[process_name]
-      log.info('Forking Process(%s)' % process_name)
+      log.info('Forking Process(%s)', process_name)
       try:
         tp.start()
         launched.append(tp)
       except Process.Error as e:
-        log.error('Failed to launch process: %s' % e)
+        log.error('Failed to launch process: %s', e)
         self._set_process_status(process_name, ProcessState.FAILED)
 
     return len(launched) > 0
@@ -948,15 +948,15 @@
             TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
         self._set_task_status(runner.transition_to())
         continue
-      log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
+      log.debug('Run loop: Work to be done within %.1fs', iteration_wait)
       # step 2: check child process checkpoint streams for updates
       if not self.collect_updates(iteration_wait):
         # If we don't collect any updates, at least 'touch' the checkpoint stream
         # so as to prevent garbage collection.
         elapsed = self._clock.time() - start
         if elapsed < iteration_wait:
-          log.debug('Update collection only took %.1fs, idling %.1fs' % (
-              elapsed, iteration_wait - elapsed))
+          log.debug('Update collection only took %.1fs, idling %.1fs',
+              elapsed, iteration_wait - elapsed)
           self._clock.sleep(iteration_wait - elapsed)
         log.debug('Run loop: No updates collected, touching checkpoint.')
         os.utime(self._pathspec.getpath('runner_checkpoint'), None)
@@ -969,8 +969,8 @@
       Kill all processes associated with this task and set task/process states as terminal_status
       (defaults to KILLED)
     """
-    log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
-      force, preemption_wait))
+    log.debug('Runner issued kill: force:%s, preemption_wait:%s',
+      force, preemption_wait)
     assert terminal_status in (TaskState.KILLED, TaskState.LOST)
     self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
     with self.control(force):
@@ -995,17 +995,17 @@
       coordinator_pid, pid, tree = pid_tuple
       if TaskRunnerHelper.is_process_terminal(current_run.state):
         if coordinator_pid or pid or tree:
-          log.warning('Terminal process (%s) still has running pids:' % process)
-          log.warning('  coordinator_pid: %s' % coordinator_pid)
-          log.warning('              pid: %s' % pid)
-          log.warning('             tree: %s' % tree)
+          log.warning('Terminal process (%s) still has running pids:', process)
+          log.warning('  coordinator_pid: %s', coordinator_pid)
+          log.warning('              pid: %s', pid)
+          log.warning('             tree: %s', tree)
         TaskRunnerHelper.kill_process(self.state, process)
       else:
         if coordinator_pid or pid or tree:
-          log.info('Transitioning %s to KILLED' % process)
+          log.info('Transitioning %s to KILLED', process)
           self._set_process_status(process, ProcessState.KILLED,
             stop_time=self._clock.time(), return_code=-1)
         else:
-          log.info('Transitioning %s to LOST' % process)
+          log.info('Transitioning %s to LOST', process)
           if current_run.state != ProcessState.WAITING:
             self._set_process_status(process, ProcessState.LOST)
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
index 52c5d74..986d33a 100644
--- a/src/main/python/apache/thermos/monitoring/disk.py
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -40,8 +40,8 @@
   def run(self):
     start = time.time()
     self.value = du(self.path)
-    log.debug("DiskCollectorThread: finished collection of %s in %.1fms" % (
-        self.path, 1000.0 * (time.time() - start)))
+    log.debug("DiskCollectorThread: finished collection of %s in %.1fms",
+        self.path, 1000.0 * (time.time() - start))
     self.event.set()
 
   def finished(self):
diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py
index d77703e..3ab1e48 100644
--- a/src/main/python/apache/thermos/monitoring/monitor.py
+++ b/src/main/python/apache/thermos/monitoring/monitor.py
@@ -80,7 +80,7 @@
             try:
               self._dispatcher.dispatch(self._runnerstate, runner_update)
             except CheckpointDispatcher.InvalidSequenceNumber as e:
-              log.error('Checkpoint stream is corrupt: %s' % e)
+              log.error('Checkpoint stream is corrupt: %s', e)
               break
           new_ckpt_head = fp.tell()
           updated = self._ckpt_head != new_ckpt_head
@@ -89,7 +89,7 @@
     except OSError as e:
       if e.errno == errno.ENOENT:
         # The log doesn't yet exist, will retry later.
-        log.warning('Could not read from checkpoint %s' % self._runner_ckpt)
+        log.warning('Could not read from checkpoint %s', self._runner_ckpt)
         return False
       else:
         raise
diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
index 3000e95..e13d6dd 100644
--- a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
+++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
@@ -39,7 +39,7 @@
     threads = process.num_threads()
     return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
   except (AccessDenied, NoSuchProcess) as e:
-    log.debug('Error during process sampling [pid=%s]: %s' % (process.pid, e))
+    log.debug('Error during process sampling [pid=%s]: %s', process.pid, e)
     return ProcessSample.empty()
 
 
@@ -72,7 +72,7 @@
       new_samples[self._pid] = parent_sample
 
     except (IOError, PsutilError) as e:
-      log.debug('Error during process sampling: %s' % e)
+      log.debug('Error during process sampling: %s', e)
       self._sample = ProcessSample.empty()
       self._rate = 0.0
 
@@ -90,7 +90,7 @@
         new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
         old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
         self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
-        log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
+        log.debug("Calculated rate for pid=%s and children: %s", self._process.pid, self._rate)
       self._sampled_tree = new_samples
 
   @property
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index f5e3849..adcdc75 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -137,7 +137,7 @@
     history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
     if history_length > self.MAX_HISTORY:
       raise ValueError("Requested history length too large")
-    log.debug("Initialising ResourceHistory of length %s" % history_length)
+    log.debug("Initialising ResourceHistory of length %s", history_length)
     return ResourceHistory(history_length)
 
 
@@ -166,7 +166,7 @@
     """
     self._task_monitor = task_monitor  # exposes PIDs, sandbox
     self._task_id = task_id
-    log.debug('Initialising resource collection for task %s' % self._task_id)
+    log.debug('Initialising resource collection for task %s', self._task_id)
     self._process_collectors = dict()  # ProcessStatus => ProcessTreeCollector
     self._disk_collector_class = disk_collector
     self._disk_collector = None
@@ -225,7 +225,7 @@
     """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
     collating samples."""
 
-    log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
+    log.debug('Commencing resource monitoring for task "%s"', self._task_id)
     next_process_collection = 0
     next_disk_collection = 0
 
@@ -252,7 +252,7 @@
         if self._disk_collector:
           self._disk_collector.sample()
         else:
-          log.debug('No sandbox detected yet for %s' % self._task_id)
+          log.debug('No sandbox detected yet for %s', self._task_id)
 
       try:
         disk_usage = self._disk_collector.value if self._disk_collector else 0
@@ -264,10 +264,10 @@
 
         self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage))
       except ValueError as err:
-        log.warning("Error recording resource sample: %s" % err)
+        log.warning("Error recording resource sample: %s", err)
 
-      log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % (
-          self._task_id, (time.time() - now)))
+      log.debug("TaskResourceMonitor: finished collection of %s in %.2fs",
+          self._task_id, (time.time() - now))
 
       # Sleep until any of the following conditions are met:
       # - it's time for the next disk collection
@@ -288,4 +288,4 @@
         log.warning('Task resource collection is backlogged. Consider increasing '
                     'process_collection_interval and disk_collection_interval.')
 
-    log.debug('Stopping resource monitoring for task "%s"' % self._task_id)
+    log.debug('Stopping resource monitoring for task "%s"', self._task_id)
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
index d099665..dc740fb 100644
--- a/src/main/python/apache/thermos/observer/http/file_browser.py
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -42,7 +42,7 @@
   try:
     fstat = os.stat(filename)
   except Exception as e:
-    log.error('Could not read from %s: %s' % (filename, e))
+    log.error('Could not read from %s: %s', filename, e)
     return {}
 
   if offset == -1:
@@ -56,7 +56,7 @@
     try:
       data = fp.read(length)
     except IOError as e:
-      log.error('Failed to read %s: %s' % (filename, e), exc_info=True)
+      log.error('Failed to read %s: %s', filename, e, exc_info=True)
       return {}
 
   if data:
diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py
index 5bfc4f2..c81383c 100644
--- a/src/main/python/apache/thermos/observer/http/http_observer.py
+++ b/src/main/python/apache/thermos/observer/http/http_observer.py
@@ -134,5 +134,5 @@
     }
     template['process'].update(**all_processes[current_run_number].get('used', {}))
     template['runs'] = all_processes
-    log.debug('Rendering template is: %s' % template)
+    log.debug('Rendering template is: %s', template)
     return template
diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py
index 83adeb3..334c937 100644
--- a/src/main/python/apache/thermos/observer/http/static_assets.py
+++ b/src/main/python/apache/thermos/observer/http/static_assets.py
@@ -35,7 +35,7 @@
     assets = pkg_resources.resource_listdir(__name__, 'assets')
     cached_assets = {}
     for asset in assets:
-      log.info('  detected asset: %s' % asset)
+      log.info('  detected asset: %s', asset)
       cached_assets[asset] = pkg_resources.resource_string(
         __name__, os.path.join('assets', asset))
     self._assets = cached_assets
diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py
index 08540e1..cb90c8b 100644
--- a/src/main/python/apache/thermos/observer/observed_task.py
+++ b/src/main/python/apache/thermos/observer/observed_task.py
@@ -60,11 +60,11 @@
       if os.path.exists(path):
         task = ThermosTaskWrapper.from_file(path)
         if task is None:
-          log.error('Error reading ThermosTask from %s in observer.' % path)
+          log.error('Error reading ThermosTask from %s in observer.', path)
         else:
           context = self.context(self._task_id)
           if not context:
-            log.warning('Task not yet available: %s' % self._task_id)
+            log.warning('Task not yet available: %s', self._task_id)
           task = task.task() % Environment(thermos=context)
           memoized[self._task_id] = task
 
@@ -77,7 +77,7 @@
     if mtime is None:
       mtime = self.safe_mtime(get_path('finished'))
     if mtime is None:
-      log.error("Couldn't get mtime for task %s!" % self._task_id)
+      log.error("Couldn't get mtime for task %s!", self._task_id)
     return mtime
 
   def context(self, task_id):
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index 4bb5d23..a6870d4 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -96,9 +96,9 @@
     ExceptionalThread.start(self)
 
   def __on_active(self, root, task_id):
-    log.debug('on_active(%r, %r)' % (root, task_id))
+    log.debug('on_active(%r, %r)', root, task_id)
     if task_id in self.finished_tasks:
-      log.error('Found an active task (%s) in finished tasks?' % task_id)
+      log.error('Found an active task (%s) in finished tasks?', task_id)
       return
     task_monitor = TaskMonitor(root, task_id)
     resource_monitor = TaskResourceMonitor(
@@ -115,14 +115,14 @@
     )
 
   def __on_finished(self, root, task_id):
-    log.debug('on_finished(%r, %r)' % (root, task_id))
+    log.debug('on_finished(%r, %r)', root, task_id)
     active_task = self._active_tasks.pop(task_id, None)
     if active_task:
       active_task.resource_monitor.kill()
     self._finished_tasks[task_id] = FinishedObservedTask(root, task_id)
 
   def __on_removed(self, root, task_id):
-    log.debug('on_removed(%r, %r)' % (root, task_id))
+    log.debug('on_removed(%r, %r)', root, task_id)
     active_task = self._active_tasks.pop(task_id, None)
     if active_task:
       active_task.resource_monitor.kill()
@@ -139,7 +139,7 @@
       with self.lock:
         start = time.time()
         self._detector.refresh()
-        log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start))
+        log.debug("TaskObserver: finished checkpoint refresh in %.2fs", time.time() - start)
 
   @Lockable.sync
   def process_from_name(self, task_id, process_id):
@@ -178,7 +178,7 @@
     }.get(type, None)
 
     if tasks is None:
-      log.error('Unknown task type %s' % type)
+      log.error('Unknown task type %s', type)
       return {}
 
     return tasks
@@ -313,7 +313,7 @@
       resource_sample = self.active_tasks[task_id].resource_monitor.sample()[1]
       sample = resource_sample.process_sample.to_dict()
       sample['disk'] = resource_sample.disk_usage
-      log.debug("Got sample for task %s: %s" % (task_id, sample))
+      log.debug("Got sample for task %s: %s", task_id, sample)
     return sample
 
   @Lockable.sync
@@ -390,7 +390,7 @@
     task = self.all_tasks[task_id].task
     if task is None:
       # TODO(wickman)  Can this happen?
-      log.error('Could not find task: %s' % task_id)
+      log.error('Could not find task: %s', task_id)
       return {}
 
     state = self.raw_state(task_id)
@@ -425,7 +425,7 @@
     if task_id not in self.active_tasks:
       return ProcessSample.empty().to_dict()
     sample = self.active_tasks[task_id].resource_monitor.sample_by_process(process_name).to_dict()
-    log.debug('Resource consumption (%s, %s) => %s' % (task_id, process_name, sample))
+    log.debug('Resource consumption (%s, %s) => %s', task_id, process_name, sample)
     return sample
 
   @Lockable.sync
diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py
index fa4f0fb..c434000 100644
--- a/src/main/python/apache/thermos/runner/thermos_runner.py
+++ b/src/main/python/apache/thermos/runner/thermos_runner.py
@@ -176,13 +176,13 @@
 def runner_teardown(runner, sig=signal.SIGUSR1, frame=None):
   """Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)"""
   op = 'kill' if sig == signal.SIGUSR1 else 'lose'
-  log.info('Thermos runner got signal %s, shutting down.' % sig)
+  log.info('Thermos runner got signal %s, shutting down.', sig)
   log.info('Interrupted frame:')
   if frame:
     for line in ''.join(traceback.format_stack(frame)).splitlines():
       log.info(line)
   runner.close_ckpt()
-  log.info('Calling runner.%s()' % op)
+  log.info('Calling runner.%s()', op)
   getattr(runner, op)()
   sys.exit(0)
 
@@ -201,7 +201,7 @@
   missing_ports = set(thermos_task.ports()) - set(prebound_ports)
 
   if missing_ports:
-    log.error('ERROR!  Unbound ports: %s' % ' '.join(port for port in missing_ports))
+    log.error('ERROR!  Unbound ports: %s', ' '.join(port for port in missing_ports))
     sys.exit(INTERNAL_ERROR)
 
   if opts.setuid:
@@ -213,7 +213,7 @@
   try:
     pwd.getpwnam(user).pw_uid
   except KeyError:
-    log.error('Unknown user: %s' % user)
+    log.error('Unknown user: %s', user)
     sys.exit(UNKNOWN_USER)
 
   task_runner = TaskRunner(
@@ -240,22 +240,22 @@
   try:
     task_runner.run()
   except TaskRunner.InternalError as err:
-    log.error('Internal error: %s' % err)
+    log.error('Internal error: %s', err)
     sys.exit(INTERNAL_ERROR)
   except TaskRunner.InvalidTask as err:
-    log.error('Invalid task: %s' % err)
+    log.error('Invalid task: %s', err)
     sys.exit(INVALID_TASK)
   except TaskRunner.StateError as err:
-    log.error('Checkpoint error: %s' % err)
+    log.error('Checkpoint error: %s', err)
     sys.exit(TERMINAL_TASK)
   except Process.UnknownUserError as err:
-    log.error('User ceased to exist: %s' % err)
+    log.error('User ceased to exist: %s', err)
     sys.exit(UNKNOWN_USER)
   except KeyboardInterrupt:
     log.info('Caught ^C, tearing down runner.')
     runner_teardown(task_runner)
   except Exception as e:
-    log.error('Unknown exception: %s' % e)
+    log.error('Unknown exception: %s', e)
     for line in traceback.format_exc().splitlines():
       log.error(line)
     sys.exit(UNKNOWN_ERROR)