blob: 3ab1e48dccd5c7d5477383b3004ee2346e7005a3 [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Monitor the state of Thermos tasks on a system
This module contains the TaskMonitor, used to reconstruct the state of active or finished Thermos
tasks based on their checkpoint streams. It exposes two key pieces of information about a Task, both
as their corresponding Thrift structs:
- a RunnerState, representing the latest state of the Task
- a list of ProcessStates, representing the processes currently running within the Task
"""
import copy
import errno
import os
import threading
from twitter.common import log
from twitter.common.recordio import ThriftRecordReader
from apache.thermos.common.ckpt import CheckpointDispatcher
from apache.thermos.common.path import TaskPath
from gen.apache.thermos.ttypes import ProcessState, RunnerCkpt, RunnerState, TaskState
class TaskMonitor(object):
"""
Class responsible for reconstructing and monitoring the state of an individual Thermos task via
its runner checkpoint. Also exports information on active processes in the task.
"""
def __init__(self, root, task_id):
"""Construct a TaskMonitor.
:param root: The checkpoint root of the task.
:param task_id: The task id of the task.
"""
pathspec = TaskPath(root=root, task_id=task_id)
self._dispatcher = CheckpointDispatcher()
self._runnerstate = RunnerState(processes={})
self._runner_ckpt = pathspec.getpath('runner_checkpoint')
self._active_file, self._finished_file = (pathspec.given(state=state).getpath('task_path')
for state in ('active', 'finished'))
self._ckpt_head = 0
self._apply_states()
self._lock = threading.Lock()
def _apply_states(self):
"""
os.stat() the corresponding checkpoint stream of this task and determine if there are new ckpt
records. Attempt to read those records and update the high watermark for that stream.
Returns True if new states were applied, False otherwise.
"""
ckpt_offset = None
try:
ckpt_offset = os.stat(self._runner_ckpt).st_size
updated = False
if self._ckpt_head < ckpt_offset:
with open(self._runner_ckpt, 'r') as fp:
fp.seek(self._ckpt_head)
rr = ThriftRecordReader(fp, RunnerCkpt)
while True:
runner_update = rr.try_read()
if not runner_update:
break
try:
self._dispatcher.dispatch(self._runnerstate, runner_update)
except CheckpointDispatcher.InvalidSequenceNumber as e:
log.error('Checkpoint stream is corrupt: %s', e)
break
new_ckpt_head = fp.tell()
updated = self._ckpt_head != new_ckpt_head
self._ckpt_head = new_ckpt_head
return updated
except OSError as e:
if e.errno == errno.ENOENT:
# The log doesn't yet exist, will retry later.
log.warning('Could not read from checkpoint %s', self._runner_ckpt)
return False
else:
raise
def refresh(self):
"""
Check to see if there are new updates and apply them. Return true if
updates were applied, false otherwise.
"""
with self._lock:
return self._apply_states()
def get_sandbox(self):
"""Get the sandbox of this task, or None if it has not yet been discovered."""
state = self.get_state()
if state.header:
return state.header.sandbox
def get_state(self):
"""Get the latest state of this Task."""
with self._lock:
self._apply_states()
return copy.deepcopy(self._runnerstate)
def task_state(self):
state = self.get_state()
return state.statuses[-1].state if state.statuses else TaskState.ACTIVE
@property
def active(self):
return os.path.exists(self._active_file)
@property
def finished(self):
return os.path.exists(self._finished_file)
def get_active_processes(self):
"""
Get active processes. Returned is a list of tuples of the form:
(ProcessStatus object of running object, its run number)
"""
active_processes = []
with self._lock:
self._apply_states()
state = self._runnerstate
for process, runs in state.processes.items():
if len(runs) == 0:
continue
last_run = runs[-1]
if last_run.state == ProcessState.RUNNING:
active_processes.append((last_run, len(runs) - 1))
return active_processes