blob: 70016bc4e9a185dace8614775ccc0f318c5d86a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow-specific implementation of {@link ElementExecutionTracker}.
*
* <p>Each processed element through each step in the fused stage gets tracked as a {@code
* ElementExecution}. Processing time is sampled using {@link ExecutionStateSampler} and distributed
* amongst all elements which are executed since the last sample period.
*
* <p>Elements are processed in potentially many fragments of execution as we move up and down the
* stage graph via outputting. Each fragment of execution is counted equally for attributing sampled
* processing time.
*
* <p>When an element is finished processing it is held in the {@code doneJournal} collection until
* the next sampling round in order to calculate the final processing time. Eventually the total
* element processing time is reported to the counter and the state is cleaned up.
*
* <p>This is not implemented in {@link ElementExecutionTracker} itself because it uses
* Beam/Dataflow-specific classes, such as {@link PipelineOptions} which cannot be shared with
* Flume.
*/
public class DataflowElementExecutionTracker extends ElementExecutionTracker {
@VisibleForTesting
public static final String TIME_PER_ELEMENT_EXPERIMENT = "time_per_element_counter";
private static final Logger LOG = LoggerFactory.getLogger(DataflowElementExecutionTracker.class);
public static ElementExecutionTracker create(
CounterFactory counterFactory, PipelineOptions options) {
// TODO: Remove once feature has launched.
if (!hasExperiment(options, TIME_PER_ELEMENT_EXPERIMENT)) {
return ElementExecutionTracker.NO_OP_INSTANCE;
}
// TODO: Remove log statement when functionality is enabled by default.
LOG.info("{} counter enabled.", ElementExecutionTracker.COUNTER_NAME);
return new DataflowElementExecutionTracker(counterFactory);
}
private static boolean hasExperiment(PipelineOptions options, String experiment) {
List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
return experiments != null && experiments.contains(experiment);
}
/**
* Tracking object for the execution of a single input element in a step.
*
* <p>Each {@link ElementExecution} instance represents a distinct element, implicitly represented
* by the instance identity. As such, {@link ElementExecution} instances compare using reference
* equality rather than value equality.
*/
private static class ElementExecution {
/** Marker execution to represent when there is no element currently being processed. */
static final ElementExecution IDLE = new ElementExecution();
/** Only empty for {@see IDLE}. */
final Optional<NameContext> step;
ElementExecution(NameContext step) {
this.step = Optional.of(step);
}
/** Only used for {@see IDLE}. */
private ElementExecution() {
step = Optional.empty();
}
@Override
public String toString() {
if (this == ElementExecution.IDLE) {
return "IDLE_EXECUTION";
}
return MoreObjects.toStringHelper(this)
.add("step", step)
.add("executionId", System.identityHashCode(this))
.toString();
}
}
private final ExecutionJournalWriter executionWriter;
private final ExecutionJournalReader executionReader;
private DataflowElementExecutionTracker(CounterFactory counterFactory) {
ReaderWriterState readerWriterState = new ReaderWriterState();
executionWriter = new ExecutionJournalWriter(readerWriterState);
executionReader = new ExecutionJournalReader(counterFactory, readerWriterState);
}
@Override
public void enter(NameContext step) {
executionWriter.startProcessing(step);
}
@Override
public void exit() {
executionWriter.doneProcessing();
}
@Override
public void takeSample(long millisSinceLastSample) {
executionReader.takeSample(Duration.ofMillis(millisSinceLastSample));
}
/** State shared between the {@link ExecutionJournalWriter} and {@link ExecutionJournalReader}. */
private static class ReaderWriterState {
/**
* Journal of fragments of execution per element to count for attributing processing time. Each
* time we transition up or down the stage fusion graph we add an execution fragment for the
* currently processing element with an incremented snapshot version. Each snapshot version must
* have a representative value in the {@code executionJournal}, or {@see IDLE_EXECUTION} to
* represent completion of processing.
*/
private final Journal<ElementExecution> executionJournal;
/**
* Elements which have completed processing but are still pending final timing calculation. The
* elements are moved here when they exit the {@code executionStack} and are held here until the
* counter value can be reported in the next sampling round.
*/
private final Journal<ElementExecution> doneJournal;
/**
* Monotonically increasing snapshot version number which tracks the latest execution state
* ready to be reported. Snapshot versions are associated with values in the {@code
* executionJournal} and {@code doneJournal}.
*
* <p>This value written by the single {@link ExecutionJournalWriter} thread, and read by the
* {@link ExecutionJournalReader} thread.
*/
private volatile long latestSnapshot;
private ReaderWriterState() {
executionJournal = new Journal<>();
doneJournal = new Journal<>();
latestSnapshot = 0L;
}
}
/**
* Writes journal entries on element processing state changes.
*
* <p>Write operations are executed by a single thread inside the stage operation hot loop and
* must be highly performant. {@link ExecutionJournalWriter} methods are thread-safe only for
* state shared with the {@link ExecutionJournalReader} thread.
*/
private static class ExecutionJournalWriter {
/**
* Execution stack of processing elements. Elements are pushed onto the stack when we enter the
* process() function and popped on process() return. This stack mirrors the actual Java runtime
* stack and contains the step + element context in order to attribute sampled execution time.
*/
private final Deque<ElementExecution> executionStack;
private final ReaderWriterState sharedState;
public ExecutionJournalWriter(ReaderWriterState sharedState) {
this.sharedState = sharedState;
this.executionStack = new ArrayDeque<>();
addExecution(ElementExecution.IDLE);
}
/** Create and journal a new {@link ElementExecution} to track a processing element. */
public void startProcessing(NameContext step) {
ElementExecution execution = new ElementExecution(step);
addExecution(execution);
}
private void addExecution(ElementExecution execution) {
long nextSnapshot = sharedState.latestSnapshot + 1;
executionStack.addLast(execution);
sharedState.executionJournal.add(execution, nextSnapshot);
sharedState.latestSnapshot = nextSnapshot;
}
/**
* Indicates that the execution thread has exited the process method for an element.
*
* <p>When an element is finished processing, it is popped from the execution stack, but will be
* tracked in the {@code doneJournal} collection until the next sampling round in order to
* account timing for the final fragment of execution.
*/
public void doneProcessing() {
// The execution stack in initialized with the IDLE execution which should never be removed.
checkState(executionStack.size() > 1, "No processing elements currently tracked.");
ElementExecution execution = executionStack.removeLast();
long nextSnapshot = sharedState.latestSnapshot + 1;
sharedState.doneJournal.add(execution, nextSnapshot);
ElementExecution nextElement = executionStack.getLast();
sharedState.executionJournal.add(nextElement, nextSnapshot);
sharedState.latestSnapshot = nextSnapshot;
}
}
/**
* Accounts sampled time to processed elements based on execution journal entries.
*
* <p>Read operations are executed by a single thread in charge of state sampling, and are
* generally called more infrequently than {@link ExecutionJournalWriter} operations. {@link
* ExecutionJournalReader} methods are thread-safe only for state shared with the {@link
* ExecutionJournalWriter} thread.
*/
private static class ExecutionJournalReader {
/**
* Accumulated execution time per element. Once an element has finished processing and execution
* time has been attributed, the total execution time is reported via the counter and removed
* from the collection.
*/
private final Map<ElementExecution, Duration> executionTimes;
private final CounterFactory counterFactory;
private final ReaderWriterState sharedState;
/** Cache of per-step distribution counters. */
private final Map<NameContext, Counter<Long, ?>> counterCache;
public ExecutionJournalReader(CounterFactory counterFactory, ReaderWriterState sharedState) {
this.sharedState = sharedState;
executionTimes = new HashMap<>();
this.counterFactory = counterFactory;
counterCache = new HashMap<>();
}
/**
* Account the specified processing time duration to elements which have processed since the
* last sampling round, and report counters for completed elements.
*/
public void takeSample(Duration sampleTime) {
long snapshot = sharedState.latestSnapshot;
attributeProcessingTime(sampleTime, snapshot);
// TODO: If possible, this should report tentative counter values before they are
// finalized. Currently, root steps in a stage fusion will not have counters reported until an
// element is completely processed through the stage subgraph.
reportCounters(snapshot);
pruneJournals(snapshot);
}
/**
* Attribute processing time to elements from {@code executionJournal} up to the specified
* snapshot.
*/
private void attributeProcessingTime(Duration duration, long snapshot) {
// TODO: This algorithm is used to compute "per-element-processing-time" counter
// values, but a slightly different algorithm is used for msec counters. Values for both
// counters should be derived from the same algorithm to avoid unexpected discrepancies.
// Calculate total execution counts
int totalExecutions = 0;
Map<ElementExecution, Integer> executionsPerElement = new HashMap<>();
for (ElementExecution execution : sharedState.executionJournal.readUntil(snapshot)) {
totalExecutions++;
if (execution != ElementExecution.IDLE) {
executionsPerElement.compute(execution, (unused, count) -> count == null ? 1 : count + 1);
}
}
// Attribute processing time
final int totalExecutionsFinal = totalExecutions;
for (Map.Entry<ElementExecution, Integer> executionCount : executionsPerElement.entrySet()) {
executionTimes.compute(
executionCount.getKey(),
(unused, total) -> {
int numExecutions = executionCount.getValue();
Duration attributedSampleTime =
duration.dividedBy(totalExecutionsFinal).multipliedBy(numExecutions);
return total == null ? attributedSampleTime : total.plus(attributedSampleTime);
});
}
}
/** Report counter values for done elements up to the given snapshot. */
private void reportCounters(long snapshot) {
// Report counter values for completed elements
for (ElementExecution execution : sharedState.doneJournal.readUntil(snapshot)) {
checkState(
execution.step.isPresent(),
"Unexpected execution in doneJournal with empty step: %s",
execution);
Counter<Long, ?> counter = getCounter(execution.step.get());
counter.addValue(executionTimes.get(execution).toMillis());
}
}
/** Retrieve the per-element processing time counter for the specified step. */
private Counter<Long, ?> getCounter(NameContext step) {
return counterCache.computeIfAbsent(
step,
s ->
counterFactory.distribution(
ElementExecutionTracker.COUNTER_NAME.withOriginalName(s)));
}
/**
* Prune journal entries which have been accounted.
*
* @param snapshot Snapshot value which has been accounted.
*/
private void pruneJournals(long snapshot) {
sharedState.doneJournal.pruneUntil(snapshot);
// Keep the currently executing element in the journal so its remaining execution time
// is counted in the next sampling round.
sharedState.executionJournal.pruneUntil(snapshot - 1);
}
}
/**
* Concurrent queue-based data structure for passing journaled events between a single journal
* reader and single journal writer.
*
* <p>Each event is journaled with an externally-managed snapshot version. Snapshot versions are
* unique and monotonically increasing.
*/
private static class Journal<T> {
private final ConcurrentLinkedQueue<SnapshottedItem<T>> queue;
private long maxSnapshot;
public Journal() {
queue = new ConcurrentLinkedQueue<>();
maxSnapshot = Long.MIN_VALUE;
}
/** Add a new event to the journal with the given snapshot. */
public void add(T item, long snapshot) {
if (snapshot <= maxSnapshot) {
throw new IllegalArgumentException(
String.format(
"Timestamps must be monotonically increasing. "
+ "Specified snapshot '%d' is not greater than max snapshot '%d'",
snapshot, maxSnapshot));
}
queue.add(new SnapshottedItem<>(item, snapshot));
maxSnapshot = snapshot;
}
/**
* Retrieve an iterable of events from the journal up to and including the specified snapshot
* version.
*
* <p>Returns an empty iterable if the journal is empty or there are no events less than the
* specified snapshot version.
*
* <p>The returned iterable returns a view of the queue at the time of construction, consistent
* with {@link ConcurrentLinkedQueue#iterator()}.
*/
public Iterable<T> readUntil(long snapshot) {
// Implement our own iterator in order to short-circuit early once we've seen a snapshot
// version greater than the value specified.
// Streams will have takeWhile() in JDK9, and Guava may some day have
// Iterables.limit(Predicate<T>): https://github.com/google/guava/issues/477
PeekingIterator<SnapshottedItem<T>> iter = Iterators.peekingIterator(queue.iterator());
return () ->
new Iterator<T>() {
@Override
public boolean hasNext() {
return iter.hasNext() && iter.peek().snapshot <= snapshot;
}
@Override
public T next() {
SnapshottedItem<T> next = iter.next();
checkValidSnapshot(next);
return next.item;
}
@Override
public void remove() {
iter.remove();
}
private void checkValidSnapshot(SnapshottedItem<T> next) {
if (next.snapshot > snapshot) {
throw new NoSuchElementException();
}
}
};
}
/** Prune events from the journal up to and including the specified snapshot. */
public void pruneUntil(long snapshot) {
Iterator<SnapshottedItem<T>> iterator = queue.iterator();
while (iterator.hasNext()) {
if (iterator.next().snapshot <= snapshot) {
iterator.remove();
} else {
break;
}
}
}
private static class SnapshottedItem<E> {
private final E item;
private final long snapshot;
public SnapshottedItem(E item, long snapshot) {
this.item = item;
this.snapshot = snapshot;
}
}
}
}