blob: e14d59742d5c959ba7ac10e798d669c7ed70aad1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
/** Tracks the current state of a single execution thread. */
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Intentional for performance.")
public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> {
/**
* This allows determining which {@link ExecutionStateTracker} is managing a specific thread. We
* don't use a ThreadLocal to allow testing the implementation of this class without having to run
* from multiple threads.
*/
private static final Map<Thread, ExecutionStateTracker> CURRENT_TRACKERS =
new ConcurrentHashMap<>();
private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
public static final String START_STATE_NAME = "start";
public static final String PROCESS_STATE_NAME = "process";
public static final String PROCESS_TIMERS_STATE_NAME = "process-timers";
public static final String FINISH_STATE_NAME = "finish";
public static final String ABORT_STATE_NAME = "abort";
/** An {@link ExecutionState} represents the current state of an execution thread. */
public abstract static class ExecutionState {
private final String stateName;
/** Whether the current state represents the element processing state. */
public final boolean isProcessElementState;
public ExecutionState(String stateName) {
this.stateName = stateName;
this.isProcessElementState = Objects.equals(stateName, PROCESS_STATE_NAME);
}
/**
* Called periodically by the {@link ExecutionStateSampler} to report time spent in this state.
*
* @param millisSinceLastSample the time since the last sample was reported. As an
* approximation, all of that time should be associated with this state.
*/
public abstract void takeSample(long millisSinceLastSample);
/** Returns the name of this state within the executing step. */
public String getStateName() {
return stateName;
}
/**
* Called when this state becomes the active state (top of the stack). This method will be
* called once when initially activated, and again any time a state it transitioned to is
* deactivated, causing execution to return to this state. This may occur many times along the
* hottest path, so it should it should be as efficient as possible.
*/
public void onActivate(boolean pushing) {}
/**
* Called when a lull has been detected in the given state. This indicates that more than {@link
* #LULL_REPORT_MS} has been spent in the same state without any transitions.
*
* @param trackedThread The execution thread that is in a lull.
* @param millis The milliseconds since the state was most recently entered.
*/
public abstract void reportLull(Thread trackedThread, long millis);
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass()).add("stateName", stateName).toString();
}
public String getDescription() {
return stateName;
}
}
private final ExecutionStateSampler sampler;
/** The thread being managed by this {@link ExecutionStateTracker}. */
@Nullable private Thread trackedThread = null;
/**
* The current state of the thread managed by this {@link ExecutionStateTracker}.
*
* <p>This variable is written by the Execution thread, and read by the sampling and progress
* reporting threads, thus it being marked volatile.
*/
@Nullable private volatile ExecutionState currentState;
/**
* The current number of times that this {@link ExecutionStateTracker} has transitioned state.
*
* <p>This variable is written by the Execution thread, and read by the sampling and progress
* reporting threads, thus it being marked volatile.
*/
private volatile long numTransitions = 0;
/**
* The number of milliseconds since the {@link ExecutionStateTracker} transitioned state.
*
* <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread,
* thus it being marked volatile.
*/
private volatile long millisSinceLastTransition = 0;
private long transitionsAtLastSample = 0;
private long nextLullReportMs = LULL_REPORT_MS;
public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
}
@VisibleForTesting
public static ExecutionStateTracker newForTest() {
return new ExecutionStateTracker(ExecutionStateSampler.newForTest());
}
@Override
public int hashCode() {
return System.identityHashCode(this);
}
// Findbugs warns about use of pointer equality in the `then` clause
// below, because it is critical that the `else` clause below can never
// return 0. Our use of identityHashCode is a special case where this
// holds.
@SuppressFBWarnings("EQ_COMPARETO_USE_OBJECT_EQUALS")
@Override
public int compareTo(ExecutionStateTracker o) {
if (this.equals(o)) {
return 0;
} else {
return System.identityHashCode(this) - System.identityHashCode(o);
}
}
/**
* Return the current {@link ExecutionState} of the current thread, or {@code null} if there
* either is no current state or if the current thread is not currently tracking the state.
*/
@Nullable
public static ExecutionState getCurrentExecutionState() {
ExecutionStateTracker tracker = CURRENT_TRACKERS.get(Thread.currentThread());
return tracker == null ? null : tracker.currentState;
}
/**
* Activates state sampling using this {@link ExecutionStateTracker} to track the current thread.
*
* @return A {@link Closeable} to deactivate state sampling.
*/
public Closeable activate() {
return activate(Thread.currentThread());
}
/**
* Activates state sampling using this {@link ExecutionStateTracker} to track the given thread.
*
* @return A {@link Closeable} to deactivate state sampling.
*/
@VisibleForTesting
public synchronized Closeable activate(Thread thread) {
checkState(
trackedThread == null, "Cannot activate an ExecutionStateTracker that is already in use.");
ExecutionStateTracker other = CURRENT_TRACKERS.put(thread, this);
checkState(
other == null,
"Execution state of thread {} was already being tracked by {}",
thread.getId(),
other);
this.trackedThread = thread;
sampler.addTracker(this);
return this::deactivate;
}
/** Return the execution thread that is being tracked by this {@link ExecutionStateTracker}. */
public Thread getTrackedThread() {
return trackedThread;
}
private synchronized void deactivate() {
sampler.removeTracker(this);
Thread thread = this.trackedThread;
CURRENT_TRACKERS.remove(thread);
this.trackedThread = null;
}
public ExecutionState getCurrentState() {
return currentState;
}
/**
* Indicates that the execution thread has entered the {@code newState}. Returns a {@link
* Closeable} that should be called when that state is completed.
*
* <p>This must be the only place where incTransitions is called, and always called from the
* execution thread.
*/
public Closeable enterState(ExecutionState newState) {
// WARNING: This method is called in the hottest path, and must be kept as efficient as
// possible. Avoid blocking, synchronizing, etc.
final ExecutionState previous = currentState;
currentState = newState;
newState.onActivate(true);
incTransitions();
return () -> {
currentState = previous;
incTransitions();
if (previous != null) {
previous.onActivate(false);
}
};
}
@SuppressWarnings("NonAtomicVolatileUpdate")
// Helper method necessary due to https://github.com/spotbugs/spotbugs/issues/724
@SuppressFBWarnings(
value = "VO_VOLATILE_INCREMENT",
justification = "Intentional for performance.")
private void incTransitions() {
numTransitions++;
}
/** Return the number of transitions that have been observed by this state tracker. */
public long getNumTransitions() {
return numTransitions;
}
/** Return the time since the last transition. */
public long getMillisSinceLastTransition() {
return millisSinceLastTransition;
}
protected void takeSample(long millisSinceLastSample) {
// These variables are read by Sampler thread, and written by Execution and Progress Reporting
// threads.
// Because there is no read/modify/write cycle in the Sampler thread, making them volatile
// provides enough thread security - but one should be very careful when working in this code.
ExecutionState state = currentState;
long transitionsAtThisSample = numTransitions;
if (transitionsAtThisSample != transitionsAtLastSample) {
millisSinceLastTransition = 0;
nextLullReportMs = LULL_REPORT_MS;
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
}
@SuppressWarnings("NonAtomicVolatileUpdate")
private void updateMillisSinceLastTransition(long millisSinceLastSample, ExecutionState state) {
// This variable is written by the Sampler thread, and read by the Progress Reporting thread.
// Because only one thread modifies it, volatile provides enough synchronization.
millisSinceLastTransition += millisSinceLastSample;
if (state != null) {
if (millisSinceLastTransition > nextLullReportMs) {
state.reportLull(trackedThread, millisSinceLastTransition);
nextLullReportMs += LULL_REPORT_MS;
}
state.takeSample(millisSinceLastSample);
}
}
}