blob: cb90c8b01285e5be4aaa2b702bae49a0660750c6 [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import os
from abc import abstractproperty
from pystachio import Environment
from twitter.common import log
from twitter.common.lang import AbstractClass
from apache.thermos.common.ckpt import CheckpointDispatcher
from apache.thermos.common.path import TaskPath
from apache.thermos.config.loader import ThermosTaskWrapper
from apache.thermos.config.schema import ThermosContext
from gen.apache.thermos.ttypes import RunnerState
class ObservedTask(AbstractClass):
""" Represents a Task being observed """
@classmethod
def safe_mtime(cls, path):
try:
return os.path.getmtime(path)
except OSError:
return None
def __init__(self, root, task_id):
self._root = root
self._task_id = task_id
self._pathspec = TaskPath(root=self._root, task_id=self._task_id)
self._mtime = self._get_mtime()
@property
def root(self):
return self._root
@abstractproperty
def type(self):
"""Indicates the type of task (active or finished)"""
def _read_task(self, memoized={}):
"""Read the corresponding task from disk and return a ThermosTask. Memoizes already-read tasks.
"""
if self._task_id not in memoized:
path = self._pathspec.given(state=self.type).getpath('task_path')
if os.path.exists(path):
task = ThermosTaskWrapper.from_file(path)
if task is None:
log.error('Error reading ThermosTask from %s in observer.', path)
else:
context = self.context(self._task_id)
if not context:
log.warning('Task not yet available: %s', self._task_id)
task = task.task() % Environment(thermos=context)
memoized[self._task_id] = task
return memoized.get(self._task_id, None)
def _get_mtime(self):
"""Retrieve the mtime of the task's state directory"""
get_path = lambda state: self._pathspec.given(state=state).getpath('task_path')
mtime = self.safe_mtime(get_path('active'))
if mtime is None:
mtime = self.safe_mtime(get_path('finished'))
if mtime is None:
log.error("Couldn't get mtime for task %s!", self._task_id)
return mtime
def context(self, task_id):
state = self.state
if state.header is None:
return None
return ThermosContext(
ports=state.header.ports if state.header.ports else {},
task_id=state.header.task_id,
user=state.header.user,
)
@property
def task(self):
"""Return a ThermosTask representing this task"""
return self._read_task()
@property
def task_id(self):
"""Return the task's task_id"""
return self._task_id
@property
def mtime(self):
"""Return mtime of task file"""
return self._mtime
@abstractproperty
def state(self):
"""Return state of task (gen.apache.thermos.ttypes.RunnerState)"""
class ActiveObservedTask(ObservedTask):
"""An active Task known by the TaskObserver"""
def __init__(self, root, task_id, task_monitor, resource_monitor):
super(ActiveObservedTask, self).__init__(root, task_id)
self._task_monitor = task_monitor
self._resource_monitor = resource_monitor
@property
def type(self):
return 'active'
@property
def state(self):
"""Return a RunnerState representing the current state of task, retrieved from TaskMonitor"""
return self.task_monitor.get_state()
@property
def task_monitor(self):
"""Return a TaskMonitor monitoring this task"""
return self._task_monitor
@property
def resource_monitor(self):
"""Return a ResourceMonitor implementation monitoring this task's resources"""
return self._resource_monitor
class FinishedObservedTask(ObservedTask):
"""A finished Task known by the TaskObserver"""
def __init__(self, root, task_id):
super(FinishedObservedTask, self).__init__(root, task_id)
self._state = None
@property
def type(self):
return 'finished'
@property
def state(self):
"""Return final state of Task (RunnerState, read from disk and cached for future access)"""
if self._state is None:
path = self._pathspec.getpath('runner_checkpoint')
self._state = CheckpointDispatcher.from_file(path)
return copy.deepcopy(self._state) if self._state else RunnerState(processes={})