"""Monitor the resource consumption of Thermos tasks
This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
TaskRunner and any other wrappers involved in launching a task).
The ResourceMonitorBase defines the interface for other components (for example, the Thermos
TaskObserver) to interact with and retrieve information about a Task's resource consumption. The
canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
actively monitors resources for a particular task by periodically polling process information and
disk consumption and retaining a limited (FIFO) in-memory history of this data.
import threading
import time
from abc import abstractmethod
from bisect import bisect_left
from collections import namedtuple
from operator import attrgetter
from twitter.common import log
from twitter.common.collections import RingBuffer
from twitter.common.concurrent import EventMuxer
from twitter.common.exceptions import ExceptionalThread
from twitter.common.lang import Interface
from twitter.common.quantity import Amount, Time
from .disk import DiskCollector
from .process import ProcessSample
from .process_collector_psutil import ProcessTreeCollector
class ResourceMonitorBase(Interface):
""" Defines the interface for interacting with a ResourceMonitor """
class Error(Exception): pass
class AggregateResourceResult(namedtuple('AggregateResourceResult',
'num_procs process_sample disk_usage')):
""" Class representing task level stats:
num_procs: total number of pids initiated by the task
process_sample: a .process.ProcessSample object representing resources consumed by the task
disk_usage: disk usage consumed in the task's sandbox
class FullResourceResult(namedtuple('FullResourceResult', 'proc_usage disk_usage')):
""" Class representing detailed information on task level stats:
proc_usage: a dictionary mapping ProcessStatus objects to ProcResourceResult objects. One
entry per process in the task
disk_usage: disk usage consumed in the task's sandbox
class ProcResourceResult(namedtuple('ProcResourceResult', 'process_sample num_procs')):
""" Class representing process level stats:
process_sample: a .process.ProcessSample object representing resources consumed by
the process
num_procs: total number of pids initiated by the process
def sample(self):
""" Return a sample of the resource consumption of the task right now
Returns a tuple of (timestamp, AggregateResourceResult)
def sample_at(self, time):
""" Return a sample of the resource consumption as close as possible to the specified time
Returns a tuple of (timestamp, AggregateResourceResult)
def sample_by_process(self, process_name):
""" Return a sample of the resource consumption of a specific process in the task right now
Returns a ProcessSample
class ResourceHistory(object):
""" Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
timestamp => ({process_status => (process_sample, number_of_procs)}, disk_usage_in_bytes)
def __init__(self, maxlen, initialize=True):
if not maxlen >= 1:
raise ValueError("maxlen must be greater than 0")
self._maxlen = maxlen
self._values = RingBuffer(maxlen, None)
if initialize:
self.add(time.time(), ResourceMonitorBase.FullResourceResult({}, 0))
def add(self, timestamp, value):
"""Store a new resource sample corresponding to the given timestamp"""
if self._values and not timestamp >= self._values[-1][0]:
raise ValueError("Refusing to add timestamp in the past!")
self._values.append((timestamp, value))
def get(self, timestamp):
"""Get the resource sample nearest to the given timestamp"""
closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
return self._values[closest]
def __iter__(self):
return iter(self._values)
def __len__(self):
return len(self._values)
def __repr__(self):
return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
class HistoryProvider(object):
MAX_HISTORY = 10000 # magic number
def provides(self, history_time, min_collection_interval):
history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
if history_length > self.MAX_HISTORY:
raise ValueError("Requested history length too large")
log.debug("Initialising ResourceHistory of length %s", history_length)
return ResourceHistory(history_length)
class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
""" Lightweight thread to aggregate resource consumption for a task's constituent processes.
Actual resource calculation is delegated to collectors; this class periodically polls the
collectors and aggregates into a representation for the entire task. Also maintains a limited
history of previous sample results.
HISTORY_TIME = Amount(1, Time.HOURS)
def __init__(self,
task_monitor: TaskMonitor object specifying the task whose resources should be monitored
sandbox: Directory for which to monitor disk utilisation
self._task_monitor = task_monitor # exposes PIDs, sandbox
self._task_id = task_id
log.debug('Initialising resource collection for task %s', self._task_id)
self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
self._disk_collector_class = disk_collector
self._disk_collector = None
self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
self._history = history_provider.provides(history_time, min_collection_interval)
self._kill_signal = threading.Event()
ExceptionalThread.__init__(self, name='%s[%s]' % (self.__class__.__name__, task_id))
self.daemon = True
def sample(self):
if not self.is_alive():
log.warning("TaskResourceMonitor not running - sample may be inaccurate")
return self.sample_at(time.time())
def sample_at(self, timestamp):
_timestamp, full_resources = self._history.get(timestamp)
aggregated_procs = sum(map(attrgetter('num_procs'), full_resources.proc_usage.values()))
aggregated_sample = sum(map(attrgetter('process_sample'), full_resources.proc_usage.values()),
return _timestamp, self.AggregateResourceResult(
aggregated_procs, aggregated_sample, full_resources.disk_usage)
def sample_by_process(self, process_name):
process = [process for process in self._get_active_processes()
if process.process == process_name].pop()
except IndexError:
raise ValueError('No active process found with name "%s" in this task' % process_name)
# Since this might be called out of band (before the main loop is aware of the process)
if process not in self._process_collectors:
self._process_collectors[process] = ProcessTreeCollector(
# The sample obtained from history is tuple of (timestamp, FullResourceResult), and per
# process sample can be lookup up from FullResourceResult
_, full_resources = self._history.get(time.time())
if process in full_resources.proc_usage:
return full_resources.proc_usage[process].process_sample
return self._process_collectors[process].value
def _get_active_processes(self):
"""Get a list of ProcessStatus objects representing currently-running processes in the task"""
return [process for process, _ in self._task_monitor.get_active_processes()]
def kill(self):
"""Signal that the thread should cease collecting resources and terminate"""
def run(self):
"""Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
collating samples."""
log.debug('Commencing resource monitoring for task "%s"', self._task_id)
next_process_collection = 0
next_disk_collection = 0
while not self._kill_signal.is_set():
now = time.time()
if now > next_process_collection:
next_process_collection = now + self._process_collection_interval
actives = set(self._get_active_processes())
current = set(self._process_collectors)
for process in current - actives:
for process in actives - current:
self._process_collectors[process] = ProcessTreeCollector(
for process, collector in self._process_collectors.items():
if now > next_disk_collection:
next_disk_collection = now + self._disk_collection_interval
if not self._disk_collector:
sandbox = self._task_monitor.get_sandbox()
if sandbox:
self._disk_collector = self._disk_collector_class(sandbox)
if self._disk_collector:
log.debug('No sandbox detected yet for %s', self._task_id)
disk_usage = self._disk_collector.value if self._disk_collector else 0
proc_usage_dict = dict()
for process, collector in self._process_collectors.items():
proc_usage_dict.update({process: self.ProcResourceResult(collector.value,
self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage))
except ValueError as err:
log.warning("Error recording resource sample: %s", err)
log.debug("TaskResourceMonitor: finished collection of %s in %.2fs",
self._task_id, (time.time() - now))
# Sleep until any of the following conditions are met:
# - it's time for the next disk collection
# - it's time for the next process collection
# - the result from the last disk collection is available via the DiskCollector
# - the TaskResourceMonitor has been killed via self._kill_signal
now = time.time()
next_collection = min(next_process_collection - now, next_disk_collection - now)
if self._disk_collector:
waiter = EventMuxer(self._kill_signal, self._disk_collector.completed_event)
waiter = self._kill_signal
if next_collection > 0:
log.warning('Task resource collection is backlogged. Consider increasing '
'process_collection_interval and disk_collection_interval.')
log.debug('Stopping resource monitoring for task "%s"', self._task_id)