blob: 270c0a9e63c2b834d833941f2c21b9f9c6f78c62 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.runners.dataflow.worker;
import static;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Dataflow-specific implementation of {@link ElementExecutionTracker}.
* <p>Each processed element through each step in the fused stage gets tracked as a {@code
* ElementExecution}. Processing time is sampled using {@link ExecutionStateSampler} and distributed
* amongst all elements which are executed since the last sample period.
* <p>Elements are processed in potentially many fragments of execution as we move up and down the
* stage graph via outputting. Each fragment of execution is counted equally for attributing sampled
* processing time.
* <p>When an element is finished processing it is held in the {@code doneJournal} collection until
* the next sampling round in order to calculate the final processing time. Eventually the total
* element processing time is reported to the counter and the state is cleaned up.
* <p>This is not implemented in {@link ElementExecutionTracker} itself because it uses
* Beam/Dataflow-specific classes, such as {@link PipelineOptions} which cannot be shared with
* Flume.
"nullness" // TODO(
public class DataflowElementExecutionTracker extends ElementExecutionTracker {
public static final String TIME_PER_ELEMENT_EXPERIMENT = "time_per_element_counter";
private static final Logger LOG = LoggerFactory.getLogger(DataflowElementExecutionTracker.class);
public static ElementExecutionTracker create(
CounterFactory counterFactory, PipelineOptions options) {
// TODO: Remove once feature has launched.
if (!hasExperiment(options, TIME_PER_ELEMENT_EXPERIMENT)) {
return ElementExecutionTracker.NO_OP_INSTANCE;
// TODO: Remove log statement when functionality is enabled by default."{} counter enabled.", ElementExecutionTracker.COUNTER_NAME);
return new DataflowElementExecutionTracker(counterFactory);
private static boolean hasExperiment(PipelineOptions options, String experiment) {
List<String> experiments =;
return experiments != null && experiments.contains(experiment);
* Tracking object for the execution of a single input element in a step.
* <p>Each {@link ElementExecution} instance represents a distinct element, implicitly represented
* by the instance identity. As such, {@link ElementExecution} instances compare using reference
* equality rather than value equality.
private static class ElementExecution {
/** Marker execution to represent when there is no element currently being processed. */
static final ElementExecution IDLE = new ElementExecution();
/** Only empty for {@see IDLE}. */
final Optional<NameContext> step;
ElementExecution(NameContext step) {
this.step = Optional.of(step);
/** Only used for {@see IDLE}. */
private ElementExecution() {
step = Optional.empty();
public String toString() {
if (this == ElementExecution.IDLE) {
return MoreObjects.toStringHelper(this)
.add("step", step)
.add("executionId", System.identityHashCode(this))
private final ExecutionJournalWriter executionWriter;
private final ExecutionJournalReader executionReader;
private DataflowElementExecutionTracker(CounterFactory counterFactory) {
ReaderWriterState readerWriterState = new ReaderWriterState();
executionWriter = new ExecutionJournalWriter(readerWriterState);
executionReader = new ExecutionJournalReader(counterFactory, readerWriterState);
public void enter(NameContext step) {
public void exit() {
public void takeSample(long millisSinceLastSample) {
/** State shared between the {@link ExecutionJournalWriter} and {@link ExecutionJournalReader}. */
private static class ReaderWriterState {
* Journal of fragments of execution per element to count for attributing processing time. Each
* time we transition up or down the stage fusion graph we add an execution fragment for the
* currently processing element with an incremented snapshot version. Each snapshot version must
* have a representative value in the {@code executionJournal}, or {@see IDLE_EXECUTION} to
* represent completion of processing.
private final Journal<ElementExecution> executionJournal;
* Elements which have completed processing but are still pending final timing calculation. The
* elements are moved here when they exit the {@code executionStack} and are held here until the
* counter value can be reported in the next sampling round.
private final Journal<ElementExecution> doneJournal;
* Monotonically increasing snapshot version number which tracks the latest execution state
* ready to be reported. Snapshot versions are associated with values in the {@code
* executionJournal} and {@code doneJournal}.
* <p>This value written by the single {@link ExecutionJournalWriter} thread, and read by the
* {@link ExecutionJournalReader} thread.
private volatile long latestSnapshot;
private ReaderWriterState() {
executionJournal = new Journal<>();
doneJournal = new Journal<>();
latestSnapshot = 0L;
* Writes journal entries on element processing state changes.
* <p>Write operations are executed by a single thread inside the stage operation hot loop and
* must be highly performant. {@link ExecutionJournalWriter} methods are thread-safe only for
* state shared with the {@link ExecutionJournalReader} thread.
private static class ExecutionJournalWriter {
* Execution stack of processing elements. Elements are pushed onto the stack when we enter the
* process() function and popped on process() return. This stack mirrors the actual Java runtime
* stack and contains the step + element context in order to attribute sampled execution time.
private final Deque<ElementExecution> executionStack;
private final ReaderWriterState sharedState;
public ExecutionJournalWriter(ReaderWriterState sharedState) {
this.sharedState = sharedState;
this.executionStack = new ArrayDeque<>();
/** Create and journal a new {@link ElementExecution} to track a processing element. */
public void startProcessing(NameContext step) {
ElementExecution execution = new ElementExecution(step);
private void addExecution(ElementExecution execution) {
long nextSnapshot = sharedState.latestSnapshot + 1;
sharedState.executionJournal.add(execution, nextSnapshot);
sharedState.latestSnapshot = nextSnapshot;
* Indicates that the execution thread has exited the process method for an element.
* <p>When an element is finished processing, it is popped from the execution stack, but will be
* tracked in the {@code doneJournal} collection until the next sampling round in order to
* account timing for the final fragment of execution.
public void doneProcessing() {
// The execution stack in initialized with the IDLE execution which should never be removed.
checkState(executionStack.size() > 1, "No processing elements currently tracked.");
ElementExecution execution = executionStack.removeLast();
long nextSnapshot = sharedState.latestSnapshot + 1;
sharedState.doneJournal.add(execution, nextSnapshot);
ElementExecution nextElement = executionStack.getLast();
sharedState.executionJournal.add(nextElement, nextSnapshot);
sharedState.latestSnapshot = nextSnapshot;
* Accounts sampled time to processed elements based on execution journal entries.
* <p>Read operations are executed by a single thread in charge of state sampling, and are
* generally called more infrequently than {@link ExecutionJournalWriter} operations. {@link
* ExecutionJournalReader} methods are thread-safe only for state shared with the {@link
* ExecutionJournalWriter} thread.
private static class ExecutionJournalReader {
* Accumulated execution time per element. Once an element has finished processing and execution
* time has been attributed, the total execution time is reported via the counter and removed
* from the collection.
private final Map<ElementExecution, Duration> executionTimes;
private final CounterFactory counterFactory;
private final ReaderWriterState sharedState;
/** Cache of per-step distribution counters. */
private final Map<NameContext, Counter<Long, ?>> counterCache;
public ExecutionJournalReader(CounterFactory counterFactory, ReaderWriterState sharedState) {
this.sharedState = sharedState;
executionTimes = new HashMap<>();
this.counterFactory = counterFactory;
counterCache = new HashMap<>();
* Account the specified processing time duration to elements which have processed since the
* last sampling round, and report counters for completed elements.
public void takeSample(Duration sampleTime) {
long snapshot = sharedState.latestSnapshot;
attributeProcessingTime(sampleTime, snapshot);
// TODO: If possible, this should report tentative counter values before they are
// finalized. Currently, root steps in a stage fusion will not have counters reported until an
// element is completely processed through the stage subgraph.
* Attribute processing time to elements from {@code executionJournal} up to the specified
* snapshot.
private void attributeProcessingTime(Duration duration, long snapshot) {
// TODO: This algorithm is used to compute "per-element-processing-time" counter
// values, but a slightly different algorithm is used for msec counters. Values for both
// counters should be derived from the same algorithm to avoid unexpected discrepancies.
// Calculate total execution counts
int totalExecutions = 0;
Map<ElementExecution, Integer> executionsPerElement = new HashMap<>();
for (ElementExecution execution : sharedState.executionJournal.readUntil(snapshot)) {
if (execution != ElementExecution.IDLE) {
executionsPerElement.compute(execution, (unused, count) -> count == null ? 1 : count + 1);
// Attribute processing time
final int totalExecutionsFinal = totalExecutions;
for (Map.Entry<ElementExecution, Integer> executionCount : executionsPerElement.entrySet()) {
(unused, total) -> {
int numExecutions = executionCount.getValue();
Duration attributedSampleTime =
return total == null ? attributedSampleTime :;
/** Report counter values for done elements up to the given snapshot. */
private void reportCounters(long snapshot) {
// Report counter values for completed elements
for (ElementExecution execution : sharedState.doneJournal.readUntil(snapshot)) {
"Unexpected execution in doneJournal with empty step: %s",
Counter<Long, ?> counter = getCounter(execution.step.get());
/** Retrieve the per-element processing time counter for the specified step. */
private Counter<Long, ?> getCounter(NameContext step) {
return counterCache.computeIfAbsent(
s ->
* Prune journal entries which have been accounted.
* @param snapshot Snapshot value which has been accounted.
private void pruneJournals(long snapshot) {
// Keep the currently executing element in the journal so its remaining execution time
// is counted in the next sampling round.
sharedState.executionJournal.pruneUntil(snapshot - 1);
* Concurrent queue-based data structure for passing journaled events between a single journal
* reader and single journal writer.
* <p>Each event is journaled with an externally-managed snapshot version. Snapshot versions are
* unique and monotonically increasing.
private static class Journal<T> {
private final ConcurrentLinkedQueue<SnapshottedItem<T>> queue;
private long maxSnapshot;
public Journal() {
queue = new ConcurrentLinkedQueue<>();
maxSnapshot = Long.MIN_VALUE;
/** Add a new event to the journal with the given snapshot. */
public void add(T item, long snapshot) {
if (snapshot <= maxSnapshot) {
throw new IllegalArgumentException(
"Timestamps must be monotonically increasing. "
+ "Specified snapshot '%d' is not greater than max snapshot '%d'",
snapshot, maxSnapshot));
queue.add(new SnapshottedItem<>(item, snapshot));
maxSnapshot = snapshot;
* Retrieve an iterable of events from the journal up to and including the specified snapshot
* version.
* <p>Returns an empty iterable if the journal is empty or there are no events less than the
* specified snapshot version.
* <p>The returned iterable returns a view of the queue at the time of construction, consistent
* with {@link ConcurrentLinkedQueue#iterator()}.
public Iterable<T> readUntil(long snapshot) {
// Implement our own iterator in order to short-circuit early once we've seen a snapshot
// version greater than the value specified.
// Streams will have takeWhile() in JDK9, and Guava may some day have
// Iterables.limit(Predicate<T>):
PeekingIterator<SnapshottedItem<T>> iter = Iterators.peekingIterator(queue.iterator());
return () ->
new Iterator<T>() {
public boolean hasNext() {
return iter.hasNext() && iter.peek().snapshot <= snapshot;
public T next() {
SnapshottedItem<T> next =;
return next.item;
public void remove() {
private void checkValidSnapshot(SnapshottedItem<T> next) {
if (next.snapshot > snapshot) {
throw new NoSuchElementException();
/** Prune events from the journal up to and including the specified snapshot. */
public void pruneUntil(long snapshot) {
Iterator<SnapshottedItem<T>> iterator = queue.iterator();
while (iterator.hasNext()) {
if ( <= snapshot) {
} else {
private static class SnapshottedItem<E> {
private final E item;
private final long snapshot;
public SnapshottedItem(E item, long snapshot) {
this.item = item;
this.snapshot = snapshot;