[FLINK-31895][runtime] Add End-to-end integration tests for failure labels

This closes #23405
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml b/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml
new file mode 100644
index 0000000..d3c6a7f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.19-SNAPSHOT</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-failure-enricher-test</artifactId>
+	<name>Flink : E2E Tests : Failure Enricher Test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-annotations</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>FailureEnricherTest</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>FailureEnricherTest</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.FailureEnricherTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
+
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java
new file mode 100644
index 0000000..e51578e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** The custom enricher for test. */
+public class CustomTestFailureEnricher implements FailureEnricher {
+
+    private final Set<String> outputKeys;
+
+    public CustomTestFailureEnricher() {
+        this.outputKeys = Collections.singleton("type");
+    }
+
+    @Override
+    public Set<String> getOutputKeys() {
+        return outputKeys;
+    }
+
+    @Override
+    public CompletableFuture<Map<String, String>> processFailure(Throwable cause, Context context) {
+        if (cause instanceof FlinkException) {
+            return CompletableFuture.completedFuture(Collections.singletonMap("type", "system"));
+        } else {
+            return CompletableFuture.completedFuture(Collections.singletonMap("type", "user"));
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java
new file mode 100644
index 0000000..b8a8f61
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricherFactory;
+
+/** The factory of {@link CustomTestFailureEnricher}. */
+public class CustomTestFailureEnricherFactory implements FailureEnricherFactory {
+
+    @Override
+    public FailureEnricher createFailureEnricher(Configuration conf) {
+        return new CustomTestFailureEnricher();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java
new file mode 100644
index 0000000..d4585bc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link
+ * org.apache.flink.configuration.JobManagerOptions#FAILURE_ENRICHERS_LIST}. We test this by
+ * creating a {@code CustomTestFailureEnricherFactory} and {@code CustomTestFailureEnricher} which
+ * will add label for the failure. And we will add this jar to plugin/failure-enricher package, then
+ * submit this job and verify the exceptions through restful api in test_failure_enricher.sh script.
+ */
+public class FailureEnricherTestProgram {
+
+    public static void main(String[] args) throws Exception {
+
+        final ParameterTool params = ParameterTool.fromArgs(args);
+
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        env.fromElements("Hello")
+                .map(
+                        (MapFunction<String, String>)
+                                value -> {
+                                    throw new RuntimeException("Expect exception");
+                                })
+                .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+
+        env.execute("Failure Enricher Test");
+    }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory
new file mode 100644
index 0000000..096ef5d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.enricher.CustomTestFailureEnricherFactory
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8a8e84d..27eae87 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -75,6 +75,7 @@
 		<module>flink-end-to-end-tests-jdbc-driver</module>
 		<module>flink-end-to-end-tests-hive</module>
 		<module>flink-end-to-end-tests-restclient</module>
+		<module>flink-failure-enricher-test</module>
     </modules>
 
 	<dependencies>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 7589d28..57f4fa2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -221,6 +221,8 @@
 
     run_test "Shaded Hadoop S3A with credentials provider end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_with_provider"
 
+    run_test "Failure Enricher end-to-end test" "$END_TO_END_DIR/test-scripts/test_failure_enricher.sh" "skip_check_exceptions"
+
     if [[ `uname -i` != 'aarch64' ]]; then
         run_test "PyFlink end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink.sh" "skip_check_exceptions"
     fi
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 27997bd..a7c4ce5 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -691,6 +691,13 @@
   set_config_key "metrics.reporter.slf4j.filter.includes" "*:${METRIC_NAME_PATTERN}"
 }
 
+function get_job_exceptions {
+  local job_id=$1
+  local json=$(curl ${CURL_SSL_ARGS} -s ${REST_PROTOCOL}://${NODENAME}:8081/jobs/${job_id}/exceptions)
+
+  echo ${json}
+}
+
 function get_job_metric {
   local job_id=$1
   local metric_name=$2
diff --git a/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh b/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh
new file mode 100755
index 0000000..40cd485
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh
@@ -0,0 +1,77 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-failure-enricher-test/target/FailureEnricherTest.jar
+
+echo "Moving the custom failure enricher to plugins/failure-enricher package."
+
+mkdir ${FLINK_DIR}/plugins/failure-enricher
+cp $TEST_PROGRAM_JAR ${FLINK_DIR}/plugins/failure-enricher/
+
+set_config_key "jobmanager.failure-enrichers" "org.apache.flink.runtime.enricher.CustomTestFailureEnricher"
+
+echo "Testing FailureEnricher function."
+
+start_cluster
+
+echo "Submitting job."
+
+CLIENT_OUTPUT=$($FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --output $TEST_DATA_DIR/out/cl_out_pf)
+
+# first need get the jobid
+JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+if [[ -z $JOB_ID ]]; then
+  echo "ERROR: Job could not be submitted."
+  echo "${CLIENT_OUTPUT}"
+  exit 1
+fi
+wait_job_terminal_state "${JOB_ID}" "FAILED"
+
+# then call the restful api get the exceptions
+exceptions_json=$(get_job_exceptions ${JOB_ID})
+if [[ -z ${exceptions_json} ]]; then
+  echo "ERROR: Could not get exceptions of ${jobid}."
+  echo "${CLIENT_OUTPUT}"
+  exit 1
+fi
+
+failure_labels=$(echo $exceptions_json | grep -o '"failureLabels":{[^}]*}' | sed 's/"failureLabels":{\([^}]*\)}/\1/')
+if [[ -z ${failure_labels} ]]; then
+  echo "ERROR: Could not get the failure labels from the exceptions(${exceptions_json})."
+  exit 1
+fi
+
+# verify the exception label info
+except_labels='"type":"user"'
+
+if [[ $failure_labels == *"$except_labels"* ]]; then
+  echo "The test is as expected, failure_labels(${failure_labels}) contains the expected label information."
+else
+  echo "ERROR: The failure_labels(${failure_labels}) does not contain the expected label information(${except_labels})."
+  exit 1
+fi
+
+# stop the cluster
+function cleanup {
+    stop_cluster
+}
+on_exit cleanup