blob: 007b6aaa3ccbe54e4ab52a4d964be42dcf7e5705 [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Monitor the resource consumption of Thermos tasks
This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
TaskRunner and any other wrappers involved in launching a task).
The ResourceMonitorBase defines the interface for other components (for example, the Thermos
TaskObserver) to interact with and retrieve information about a Task's resource consumption. The
canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
actively monitors resources for a particular task by periodically polling process information and
disk consumption and retaining a limited (FIFO) in-memory history of this data.
"""
import threading
import time
from abc import abstractmethod
from bisect import bisect_left
from collections import namedtuple
from operator import attrgetter
from twitter.common import log
from twitter.common.collections import RingBuffer
from twitter.common.concurrent import EventMuxer
from twitter.common.exceptions import ExceptionalThread
from twitter.common.lang import Interface
from twitter.common.quantity import Amount, Time
from .disk import DiskCollectorSettings, DuDiskCollector, MesosDiskCollector
from .process import ProcessSample
from .process_collector_psutil import ProcessTreeCollector
class ResourceMonitorBase(Interface):
""" Defines the interface for interacting with a ResourceMonitor """
class Error(Exception): pass
class AggregateResourceResult(namedtuple('AggregateResourceResult',
'num_procs process_sample disk_usage')):
""" Class representing task level stats:
num_procs: total number of pids initiated by the task
process_sample: a .process.ProcessSample object representing resources consumed by the task
disk_usage: disk usage consumed in the task's sandbox
"""
class FullResourceResult(namedtuple('FullResourceResult', 'proc_usage disk_usage')):
""" Class representing detailed information on task level stats:
proc_usage: a dictionary mapping ProcessStatus objects to ProcResourceResult objects. One
entry per process in the task
disk_usage: disk usage consumed in the task's sandbox
"""
class ProcResourceResult(namedtuple('ProcResourceResult', 'process_sample num_procs')):
""" Class representing process level stats:
process_sample: a .process.ProcessSample object representing resources consumed by
the process
num_procs: total number of pids initiated by the process
"""
@abstractmethod
def sample(self):
""" Return a sample of the resource consumption of the task right now
Returns a tuple of (timestamp, AggregateResourceResult)
"""
@abstractmethod
def sample_at(self, time):
""" Return a sample of the resource consumption as close as possible to the specified time
Returns a tuple of (timestamp, AggregateResourceResult)
"""
@abstractmethod
def sample_by_process(self, process_name):
""" Return a sample of the resource consumption of a specific process in the task right now
Returns a ProcessSample
"""
class ResourceHistory(object):
""" Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
mapping:
timestamp => ({process_status => (process_sample, number_of_procs)}, disk_usage_in_bytes)
"""
def __init__(self, maxlen, initialize=True):
if not maxlen >= 1:
raise ValueError("maxlen must be greater than 0")
self._maxlen = maxlen
self._values = RingBuffer(maxlen, None)
if initialize:
self.add(time.time(), ResourceMonitorBase.FullResourceResult({}, 0))
def add(self, timestamp, value):
"""Store a new resource sample corresponding to the given timestamp"""
if self._values and not timestamp >= self._values[-1][0]:
raise ValueError("Refusing to add timestamp in the past!")
self._values.append((timestamp, value))
def get(self, timestamp):
"""Get the resource sample nearest to the given timestamp"""
closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
return self._values[closest]
def __iter__(self):
return iter(self._values)
def __len__(self):
return len(self._values)
def __repr__(self):
return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
class HistoryProvider(object):
MAX_HISTORY = 10000 # magic number
def provides(self, history_time, min_collection_interval):
history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
if history_length > self.MAX_HISTORY:
raise ValueError("Requested history length too large")
log.debug("Initialising ResourceHistory of length %s", history_length)
return ResourceHistory(history_length)
class DiskCollectorProvider(object):
DEFAULT_DISK_COLLECTOR_CLASS = DuDiskCollector
def __init__(
self,
enable_mesos_disk_collector=False,
settings=DiskCollectorSettings()):
self.settings = settings
self.disk_collector_class = self.DEFAULT_DISK_COLLECTOR_CLASS
if enable_mesos_disk_collector:
self.disk_collector_class = MesosDiskCollector
def provides(self, sandbox):
return self.disk_collector_class(sandbox, settings=self.settings)
class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
""" Lightweight thread to aggregate resource consumption for a task's constituent processes.
Actual resource calculation is delegated to collectors; this class periodically polls the
collectors and aggregates into a representation for the entire task. Also maintains a limited
history of previous sample results.
"""
PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS)
HISTORY_TIME = Amount(1, Time.HOURS)
def __init__(
self,
task_id,
task_monitor,
disk_collector_provider=DiskCollectorProvider(),
process_collection_interval=PROCESS_COLLECTION_INTERVAL,
disk_collection_interval=DiskCollectorSettings.DISK_COLLECTION_INTERVAL,
history_time=HISTORY_TIME,
history_provider=HistoryProvider()):
"""
task_monitor: TaskMonitor object specifying the task whose resources should be monitored
sandbox: Directory for which to monitor disk utilisation
"""
self._task_monitor = task_monitor # exposes PIDs, sandbox
self._task_id = task_id
log.debug('Initialising resource collection for task %s', self._task_id)
self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
self._disk_collector_provider = disk_collector_provider
self._disk_collector = None
self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
self._history = history_provider.provides(history_time, min_collection_interval)
self._kill_signal = threading.Event()
ExceptionalThread.__init__(self, name='%s[%s]' % (self.__class__.__name__, task_id))
self.daemon = True
def sample(self):
if not self.is_alive():
log.warning("TaskResourceMonitor not running - sample may be inaccurate")
return self.sample_at(time.time())
def sample_at(self, timestamp):
_timestamp, full_resources = self._history.get(timestamp)
aggregated_procs = sum(map(attrgetter('num_procs'), full_resources.proc_usage.values()))
aggregated_sample = sum(map(attrgetter('process_sample'), full_resources.proc_usage.values()),
ProcessSample.empty())
return _timestamp, self.AggregateResourceResult(
aggregated_procs, aggregated_sample, full_resources.disk_usage)
def sample_by_process(self, process_name):
try:
process = [process for process in self._get_active_processes()
if process.process == process_name].pop()
except IndexError:
raise ValueError('No active process found with name "%s" in this task' % process_name)
else:
# Since this might be called out of band (before the main loop is aware of the process)
if process not in self._process_collectors:
self._process_collectors[process] = ProcessTreeCollector(process.pid)
# The sample obtained from history is tuple of (timestamp, FullResourceResult), and per
# process sample can be lookup up from FullResourceResult
_, full_resources = self._history.get(time.time())
if process in full_resources.proc_usage:
return full_resources.proc_usage[process].process_sample
self._process_collectors[process].sample()
return self._process_collectors[process].value
def _get_active_processes(self):
"""Get a list of ProcessStatus objects representing currently-running processes in the task"""
return [process for process, _ in self._task_monitor.get_active_processes()]
def kill(self):
"""Signal that the thread should cease collecting resources and terminate"""
self._kill_signal.set()
def run(self):
"""Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
collating samples."""
log.debug('Commencing resource monitoring for task "%s"', self._task_id)
next_process_collection = 0
next_disk_collection = 0
while not self._kill_signal.is_set():
now = time.time()
if now > next_process_collection:
next_process_collection = now + self._process_collection_interval
actives = set(self._get_active_processes())
current = set(self._process_collectors)
for process in current - actives:
self._process_collectors.pop(process)
for process in actives - current:
self._process_collectors[process] = ProcessTreeCollector(process.pid)
for process, collector in self._process_collectors.items():
collector.sample()
if now > next_disk_collection:
next_disk_collection = now + self._disk_collection_interval
if not self._disk_collector:
sandbox = self._task_monitor.get_sandbox()
if sandbox:
self._disk_collector = self._disk_collector_provider.provides(sandbox)
if self._disk_collector:
self._disk_collector.sample()
else:
log.debug('No sandbox detected yet for %s', self._task_id)
try:
disk_usage = self._disk_collector.value if self._disk_collector else 0
proc_usage_dict = dict()
for process, collector in self._process_collectors.items():
proc_usage_dict.update({process: self.ProcResourceResult(collector.value,
collector.procs)})
self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage))
except ValueError as err:
log.warning("Error recording resource sample: %s", err)
log.debug("TaskResourceMonitor: finished collection of %s in %.2fs",
self._task_id, (time.time() - now))
# Sleep until any of the following conditions are met:
# - it's time for the next disk collection
# - it's time for the next process collection
# - the result from the last disk collection is available via the DiskCollector
# - the TaskResourceMonitor has been killed via self._kill_signal
now = time.time()
next_collection = min(next_process_collection - now, next_disk_collection - now)
if self._disk_collector:
waiter = EventMuxer(self._kill_signal, self._disk_collector.completed_event)
else:
waiter = self._kill_signal
if next_collection > 0:
waiter.wait(timeout=next_collection)
else:
log.warning('Task resource collection is backlogged. Consider increasing '
'process_collection_interval and disk_collection_interval.')
log.debug('Stopping resource monitoring for task "%s"', self._task_id)
class NullTaskResourceMonitor(ResourceMonitorBase):
""" Alternative to TaskResourceMonitor that does not collect any resource metrics at all. It can
be used as fast replacement for TaskResourceMonitor. It is especially useful in setups where
metrics cannot be gathered reliable (e.g. when using PID namespaces).
"""
def sample(self):
return self.sample_at(time.time())
def sample_at(self, timestamp):
return timestamp, self.AggregateResourceResult(0, ProcessSample.empty(), 0)
def sample_by_process(self, process_name):
return ProcessSample.empty()
def start(self):
pass
def kill(self):
pass