blob: 8d2346aa1434849ed4dba03533c852051ab2a429 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cython: profile=True
"""State sampler for tracking time spent in execution steps.
The state sampler profiles the time spent in each step of a pipeline.
Operations (defined in executor.py) which are executed as part of a MapTask are
instrumented with context managers provided by StateSampler.scoped_state().
These context managers change the internal state of the StateSampler during each
relevant Operation's .start(), .process() and .finish() methods. State is
sampled by a raw C thread, not holding the Python Global Interpreter Lock, which
queries the StateSampler's internal state at a defined sampling frequency. In a
common example, a ReadOperation during its .start() method reads an element and
calls a DoOperation's .process() method, which can call a WriteOperation's
.process() method. Each element processed causes the current state to
transition between these states of different Operations. Each time the sampling
thread queries the current state, the time spent since the previous sample is
attributed to that state and accumulated. Over time, this allows a granular
runtime profile to be produced.
"""
import threading
from apache_beam.utils.counters import CounterName
from apache_beam.metrics.execution cimport MetricsContainer
cimport cython
from cpython cimport pythread
from libc cimport math
from libc.stdint cimport int32_t, int64_t
cdef extern from "Python.h":
# This typically requires the GIL, but we synchronize the list modifications
# we use this on via our own lock.
cdef void* PyList_GET_ITEM(list, Py_ssize_t index) nogil
cdef extern from "unistd.h" nogil:
void usleep(int)
cdef extern from "<time.h>" nogil:
struct timespec:
long tv_sec # seconds
long tv_nsec # nanoseconds
int clock_gettime(int clock_id, timespec *result)
cdef inline int64_t get_nsec_time() nogil:
"""Get current time as microseconds since Unix epoch."""
cdef timespec current_time
# First argument value of 0 corresponds to CLOCK_REALTIME.
clock_gettime(0, &current_time)
return (
(<int64_t> current_time.tv_sec) * 1000000000 + # second to nanoseconds
current_time.tv_nsec)
cdef class StateSampler(object):
"""Tracks time spent in states during pipeline execution."""
def __init__(self,
sampling_period_ms,
sampling_period_ms_start=None,
sampling_period_ratio=1.2):
self._sampling_period_ms = sampling_period_ms
# Slowly ramp up to avoid excessive waiting for short stages, as well
# as more precise information in that case.
self._sampling_period_ms_start = (
sampling_period_ms_start or max(1, sampling_period_ms // 100))
self._sampling_period_ratio = sampling_period_ratio
self.started = False
self.finished = False
self.lock = pythread.PyThread_allocate_lock()
self.current_state_index = 0
self.time_since_transition = 0
self.state_transition_count = 0
unknown_state = ScopedState(self,
CounterName('unknown'),
None,
self.current_state_index,
None,
None)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.scoped_states_by_index = [unknown_state]
pythread.PyThread_release_lock(self.lock)
# Assert that the compiler correctly aligned the current_state field. This
# is necessary for reads and writes to this variable to be atomic across
# threads without additional synchronization.
# States are referenced via an index rather than, say, a pointer because
# of better support for 32-bit atomic reads and writes.
assert (<int64_t> &self.current_state_index) % sizeof(int32_t) == 0, (
'Address of StateSampler.current_state_index is not word-aligned.')
def __dealloc__(self):
pythread.PyThread_free_lock(self.lock)
def run(self):
cdef int64_t last_nsecs = get_nsec_time()
cdef int64_t elapsed_nsecs
cdef int64_t latest_transition_count = self.state_transition_count
cdef int64_t sampling_period_us = self._sampling_period_ms_start * 1000
with nogil:
while True:
usleep(<int>sampling_period_us)
sampling_period_us = <int64_t>math.fmin(
sampling_period_us * self._sampling_period_ratio,
self._sampling_period_ms * 1000)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
try:
if self.finished:
break
elapsed_nsecs = get_nsec_time() - last_nsecs
# Take an address as we can't create a reference to the scope
# without the GIL.
nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
self.scoped_states_by_index, self.current_state_index))._nsecs
nsecs_ptr[0] += elapsed_nsecs
if latest_transition_count != self.state_transition_count:
self.time_since_transition = 0
latest_transition_count = self.state_transition_count
self.time_since_transition += elapsed_nsecs
last_nsecs += elapsed_nsecs
finally:
pythread.PyThread_release_lock(self.lock)
def start(self):
assert not self.started
self.sampling_thread = threading.Thread(target=self.run)
self.sampling_thread.start()
def stop(self):
assert not self.finished
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.finished = True
pythread.PyThread_release_lock(self.lock)
# May have to wait up to sampling_period_ms, but the platform-independent
# pythread doesn't support conditions.
self.sampling_thread.join()
def reset(self):
for state in self.scoped_states_by_index:
(<ScopedState>state)._nsecs = 0
self.started = self.finished = False
cpdef ScopedState current_state(self):
return self.current_state_c()
cdef inline ScopedState current_state_c(self):
# Faster than cpdef due to self always being a Python subclass.
return <ScopedState>self.scoped_states_by_index[self.current_state_index]
cpdef _scoped_state(self, counter_name, name_context, output_counter,
metrics_container):
"""Returns a context manager managing transitions for a given state.
Args:
counter_name: A CounterName object with information about the execution
state.
output_counter: A Beam Counter to which msecs are committed for reporting.
metrics_container: A MetricsContainer for the current step.
Returns:
A ScopedState for the set of step-state-io_target.
"""
new_state_index = len(self.scoped_states_by_index)
scoped_state = ScopedState(self,
counter_name,
name_context,
new_state_index,
output_counter,
metrics_container)
# Both scoped_states_by_index and scoped_state.nsecs are accessed
# by the sampling thread; initialize them under the lock.
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.scoped_states_by_index.append(scoped_state)
scoped_state._nsecs = 0
pythread.PyThread_release_lock(self.lock)
return scoped_state
def update_metric(self, typed_metric_name, value):
# Each of these is a cdef lookup.
self.current_state_c().metrics_container.get_metric_cell(
typed_metric_name).update(value)
cdef class ScopedState(object):
"""Context manager class managing transitions for a given sampler state."""
def __init__(self,
sampler,
name,
step_name_context,
state_index,
counter,
metrics_container):
self.sampler = sampler
self.name = name
self.name_context = step_name_context
self.state_index = state_index
self.counter = counter
self.metrics_container = metrics_container
@property
def nsecs(self):
return self._nsecs
def sampled_seconds(self):
return 1e-9 * self.nsecs
def sampled_msecs_int(self):
return int(1e-6 * self.nsecs)
def __repr__(self):
return "ScopedState[%s, %s]" % (self.name, self.nsecs)
cpdef __enter__(self):
self.old_state_index = self.sampler.current_state_index
pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK)
self.sampler.current_state_index = self.state_index
self.sampler.state_transition_count += 1
pythread.PyThread_release_lock(self.sampler.lock)
cpdef __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK)
self.sampler.current_state_index = self.old_state_index
self.sampler.state_transition_count += 1
pythread.PyThread_release_lock(self.sampler.lock)