[FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state

This closes #23447
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index ed53e20..be6657f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -194,7 +194,8 @@
                                 taskVertex.getCurrentExecutionAttempt().getAttemptNumber(),
                                 taskVertex.getCurrentExecutions().stream()
                                         .map(AccessExecution::getAttemptNumber)
-                                        .collect(Collectors.toSet())));
+                                        .collect(Collectors.toSet()),
+                                state.isTerminal()));
             }
 
             if (!vertexAttempts.isEmpty()) {
@@ -355,16 +356,21 @@
 
     /**
      * The CurrentAttempts holds the attempt number of the current representative execution attempt,
-     * and the attempt numbers of all the running attempts.
+     * the attempt numbers of all the running attempts, and whether the current execution has
+     * reached terminal state.
      */
     public static final class CurrentAttempts implements Serializable {
         private final int representativeAttempt;
 
         private final Set<Integer> currentAttempts;
 
-        public CurrentAttempts(int representativeAttempt, Set<Integer> currentAttempts) {
+        private final boolean isTerminalState;
+
+        public CurrentAttempts(
+                int representativeAttempt, Set<Integer> currentAttempts, boolean isTerminalState) {
             this.representativeAttempt = representativeAttempt;
             this.currentAttempts = Collections.unmodifiableSet(currentAttempts);
+            this.isTerminalState = isTerminalState;
         }
 
         public int getRepresentativeAttempt() {
@@ -374,5 +380,9 @@
         public Set<Integer> getCurrentAttempts() {
             return currentAttempts;
         }
+
+        public boolean isTerminalState() {
+            return isTerminalState;
+        }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 448b3fe..8b1f156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.util.CollectionUtil;
@@ -30,8 +31,10 @@
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -57,6 +60,18 @@
 public class MetricStore {
     private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
 
+    /**
+     * The set holds the names of the transient metrics which are no longer useful after a subtask
+     * reaches terminal state and shall be removed to avoid misleading users. Note that there may be
+     * other transient metrics, we currently only support cleaning these three.
+     */
+    private static final Set<String> TRANSIENT_METRIC_NAMES =
+            new HashSet<>(
+                    Arrays.asList(
+                            MetricNames.TASK_IDLE_TIME,
+                            MetricNames.TASK_BACK_PRESSURED_TIME,
+                            MetricNames.TASK_BUSY_TIME));
+
     private final ComponentMetricStore jobManager = new ComponentMetricStore();
     private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
@@ -129,6 +144,13 @@
                                                     subtaskMetricStore ->
                                                             subtaskMetricStore.retainAttempts(
                                                                     attempts.getCurrentAttempts()));
+                                    // Remove transient metrics for terminal subtasks
+                                    if (attempts.isTerminalState()) {
+                                        taskMetricStoreOptional.ifPresent(
+                                                taskMetricStore ->
+                                                        taskMetricStore.removeTransientMetrics(
+                                                                subtaskIndex));
+                                    }
                                 });
                     });
         }
@@ -435,6 +457,11 @@
         }
     }
 
+    private static boolean isTransientMetric(String fullMetricName) {
+        String metricName = fullMetricName.substring(fullMetricName.lastIndexOf('.') + 1);
+        return TRANSIENT_METRIC_NAMES.contains(metricName);
+    }
+
     // -----------------------------------------------------------------------------------------------------------------
     // sub MetricStore classes
     // -----------------------------------------------------------------------------------------------------------------
@@ -551,6 +578,19 @@
             subtasks.keySet().retainAll(activeSubtasks);
         }
 
+        void removeTransientMetrics(int subtaskIndex) {
+            if (subtasks.containsKey(subtaskIndex)) {
+                // Remove in both places as task metrics are duplicated in task metric store and
+                // subtask metric store.
+                metrics.keySet()
+                        .removeIf(
+                                key ->
+                                        key.startsWith(subtaskIndex + ".")
+                                                && isTransientMetric(key));
+                subtasks.get(subtaskIndex).removeTransientMetrics();
+            }
+        }
+
         public ComponentMetricStore getJobManagerOperatorMetricStores(String operatorName) {
             return jmOperators.get(operatorName);
         }
@@ -597,6 +637,16 @@
                                     attempt < latestAttempt && !currentAttempts.contains(attempt));
         }
 
+        void removeTransientMetrics() {
+            attempts.values()
+                    .forEach(
+                            attempt ->
+                                    attempt.metrics
+                                            .keySet()
+                                            .removeIf(MetricStore::isTransientMetric));
+            metrics.keySet().removeIf(MetricStore::isTransientMetric);
+        }
+
         private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
             if (source == null) {
                 return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 72cd8d1..29f71fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails.CurrentAttempts;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
@@ -156,7 +157,7 @@
                         Collections.singletonMap(
                                 "taskid",
                                 Collections.singletonMap(
-                                        1, new CurrentAttempts(1, new HashSet<>()))));
+                                        1, new CurrentAttempts(1, new HashSet<>(), false))));
         assertThatCode(
                         () ->
                                 metricStore.updateCurrentExecutionAttempts(
@@ -177,7 +178,8 @@
         Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
                 Collections.singletonMap(
                         "taskid",
-                        Collections.singletonMap(1, new CurrentAttempts(1, new HashSet<>())));
+                        Collections.singletonMap(
+                                1, new CurrentAttempts(1, new HashSet<>(), false)));
         JobDetails jobDetail =
                 new JobDetails(
                         JOB_ID,
@@ -199,6 +201,38 @@
                 .containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
     }
 
+    @Test
+    void testRemoveTransientMetricsForTerminalSubtasks() {
+        MetricStore store = setupStore(new MetricStore());
+        MetricStore.TaskMetricStore taskMetricStore =
+                store.getTaskMetricStore(JOB_ID.toString(), "taskid");
+
+        Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
+                Collections.singletonMap(
+                        "taskid",
+                        Collections.singletonMap(8, new CurrentAttempts(2, new HashSet<>(), true)));
+        JobDetails jobDetail =
+                new JobDetails(
+                        JOB_ID,
+                        "jobname",
+                        0,
+                        0,
+                        0,
+                        JobStatus.RUNNING,
+                        0,
+                        new int[10],
+                        8,
+                        currentExecutionAttempts);
+
+        MetricStore.SubtaskMetricStore subtaskMetricStore =
+                taskMetricStore.getSubtaskMetricStore(8);
+        assertThat(taskMetricStore.getMetric("8.abc." + MetricNames.TASK_BUSY_TIME)).isNotNull();
+        assertThat(subtaskMetricStore.getMetric("abc." + MetricNames.TASK_BUSY_TIME)).isNotNull();
+        store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));
+        assertThat(taskMetricStore.getMetric("8.abc." + MetricNames.TASK_BUSY_TIME)).isNull();
+        assertThat(subtaskMetricStore.getMetric("abc." + MetricNames.TASK_BUSY_TIME)).isNull();
+    }
+
     @Nonnull
     private static Set<Integer> getTaskMetricStoreIndexes(
             MetricStore.TaskMetricStore taskMetricStore) {
@@ -226,7 +260,8 @@
         Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
                 Collections.singletonMap(
                         "taskid",
-                        Collections.singletonMap(8, new CurrentAttempts(1, currentAttempts)));
+                        Collections.singletonMap(
+                                8, new CurrentAttempts(1, currentAttempts, false)));
         JobDetails jobDetail =
                 new JobDetails(
                         JOB_ID,
@@ -286,6 +321,8 @@
         QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
                 new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), "taskid", 8, 2, "abc");
         MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14);
+        MetricDump.CounterDump cd52BusyTime =
+                new MetricDump.CounterDump(speculativeTask, MetricNames.TASK_BUSY_TIME, 400);
 
         QueryScopeInfo.OperatorQueryScopeInfo operator =
                 new QueryScopeInfo.OperatorQueryScopeInfo(
@@ -346,6 +383,8 @@
         store.add(jmCd8);
         store.add(jmCd9);
 
+        store.add(cd52BusyTime);
+
         return store;
     }
 }