[FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state
This closes #23447
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index ed53e20..be6657f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -194,7 +194,8 @@
taskVertex.getCurrentExecutionAttempt().getAttemptNumber(),
taskVertex.getCurrentExecutions().stream()
.map(AccessExecution::getAttemptNumber)
- .collect(Collectors.toSet())));
+ .collect(Collectors.toSet()),
+ state.isTerminal()));
}
if (!vertexAttempts.isEmpty()) {
@@ -355,16 +356,21 @@
/**
* The CurrentAttempts holds the attempt number of the current representative execution attempt,
- * and the attempt numbers of all the running attempts.
+ * the attempt numbers of all the running attempts, and whether the current execution has
+ * reached terminal state.
*/
public static final class CurrentAttempts implements Serializable {
private final int representativeAttempt;
private final Set<Integer> currentAttempts;
- public CurrentAttempts(int representativeAttempt, Set<Integer> currentAttempts) {
+ private final boolean isTerminalState;
+
+ public CurrentAttempts(
+ int representativeAttempt, Set<Integer> currentAttempts, boolean isTerminalState) {
this.representativeAttempt = representativeAttempt;
this.currentAttempts = Collections.unmodifiableSet(currentAttempts);
+ this.isTerminalState = isTerminalState;
}
public int getRepresentativeAttempt() {
@@ -374,5 +380,9 @@
public Set<Integer> getCurrentAttempts() {
return currentAttempts;
}
+
+ public boolean isTerminalState() {
+ return isTerminalState;
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 448b3fe..8b1f156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.util.CollectionUtil;
@@ -30,8 +31,10 @@
import javax.annotation.concurrent.ThreadSafe;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -57,6 +60,18 @@
public class MetricStore {
private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
+ /**
+ * The set holds the names of the transient metrics which are no longer useful after a subtask
+ * reaches terminal state and shall be removed to avoid misleading users. Note that there may be
+ * other transient metrics, we currently only support cleaning these three.
+ */
+ private static final Set<String> TRANSIENT_METRIC_NAMES =
+ new HashSet<>(
+ Arrays.asList(
+ MetricNames.TASK_IDLE_TIME,
+ MetricNames.TASK_BACK_PRESSURED_TIME,
+ MetricNames.TASK_BUSY_TIME));
+
private final ComponentMetricStore jobManager = new ComponentMetricStore();
private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
@@ -129,6 +144,13 @@
subtaskMetricStore ->
subtaskMetricStore.retainAttempts(
attempts.getCurrentAttempts()));
+ // Remove transient metrics for terminal subtasks
+ if (attempts.isTerminalState()) {
+ taskMetricStoreOptional.ifPresent(
+ taskMetricStore ->
+ taskMetricStore.removeTransientMetrics(
+ subtaskIndex));
+ }
});
});
}
@@ -435,6 +457,11 @@
}
}
+ private static boolean isTransientMetric(String fullMetricName) {
+ String metricName = fullMetricName.substring(fullMetricName.lastIndexOf('.') + 1);
+ return TRANSIENT_METRIC_NAMES.contains(metricName);
+ }
+
// -----------------------------------------------------------------------------------------------------------------
// sub MetricStore classes
// -----------------------------------------------------------------------------------------------------------------
@@ -551,6 +578,19 @@
subtasks.keySet().retainAll(activeSubtasks);
}
+ void removeTransientMetrics(int subtaskIndex) {
+ if (subtasks.containsKey(subtaskIndex)) {
+ // Remove in both places as task metrics are duplicated in task metric store and
+ // subtask metric store.
+ metrics.keySet()
+ .removeIf(
+ key ->
+ key.startsWith(subtaskIndex + ".")
+ && isTransientMetric(key));
+ subtasks.get(subtaskIndex).removeTransientMetrics();
+ }
+ }
+
public ComponentMetricStore getJobManagerOperatorMetricStores(String operatorName) {
return jmOperators.get(operatorName);
}
@@ -597,6 +637,16 @@
attempt < latestAttempt && !currentAttempts.contains(attempt));
}
+ void removeTransientMetrics() {
+ attempts.values()
+ .forEach(
+ attempt ->
+ attempt.metrics
+ .keySet()
+ .removeIf(MetricStore::isTransientMetric));
+ metrics.keySet().removeIf(MetricStore::isTransientMetric);
+ }
+
private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
if (source == null) {
return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 72cd8d1..29f71fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -156,7 +157,7 @@
Collections.singletonMap(
"taskid",
Collections.singletonMap(
- 1, new CurrentAttempts(1, new HashSet<>()))));
+ 1, new CurrentAttempts(1, new HashSet<>(), false))));
assertThatCode(
() ->
metricStore.updateCurrentExecutionAttempts(
@@ -177,7 +178,8 @@
Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
Collections.singletonMap(
"taskid",
- Collections.singletonMap(1, new CurrentAttempts(1, new HashSet<>())));
+ Collections.singletonMap(
+ 1, new CurrentAttempts(1, new HashSet<>(), false)));
JobDetails jobDetail =
new JobDetails(
JOB_ID,
@@ -199,6 +201,38 @@
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
}
+ @Test
+ void testRemoveTransientMetricsForTerminalSubtasks() {
+ MetricStore store = setupStore(new MetricStore());
+ MetricStore.TaskMetricStore taskMetricStore =
+ store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+
+ Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
+ Collections.singletonMap(
+ "taskid",
+ Collections.singletonMap(8, new CurrentAttempts(2, new HashSet<>(), true)));
+ JobDetails jobDetail =
+ new JobDetails(
+ JOB_ID,
+ "jobname",
+ 0,
+ 0,
+ 0,
+ JobStatus.RUNNING,
+ 0,
+ new int[10],
+ 8,
+ currentExecutionAttempts);
+
+ MetricStore.SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(8);
+ assertThat(taskMetricStore.getMetric("8.abc." + MetricNames.TASK_BUSY_TIME)).isNotNull();
+ assertThat(subtaskMetricStore.getMetric("abc." + MetricNames.TASK_BUSY_TIME)).isNotNull();
+ store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
+ assertThat(taskMetricStore.getMetric("8.abc." + MetricNames.TASK_BUSY_TIME)).isNull();
+ assertThat(subtaskMetricStore.getMetric("abc." + MetricNames.TASK_BUSY_TIME)).isNull();
+ }
+
@Nonnull
private static Set<Integer> getTaskMetricStoreIndexes(
MetricStore.TaskMetricStore taskMetricStore) {
@@ -226,7 +260,8 @@
Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
Collections.singletonMap(
"taskid",
- Collections.singletonMap(8, new CurrentAttempts(1, currentAttempts)));
+ Collections.singletonMap(
+ 8, new CurrentAttempts(1, currentAttempts, false)));
JobDetails jobDetail =
new JobDetails(
JOB_ID,
@@ -286,6 +321,8 @@
QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), "taskid", 8, 2, "abc");
MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14);
+ MetricDump.CounterDump cd52BusyTime =
+ new MetricDump.CounterDump(speculativeTask, MetricNames.TASK_BUSY_TIME, 400);
QueryScopeInfo.OperatorQueryScopeInfo operator =
new QueryScopeInfo.OperatorQueryScopeInfo(
@@ -346,6 +383,8 @@
store.add(jmCd8);
store.add(jmCd9);
+ store.add(cd52BusyTime);
+
return store;
}
}