[FLINK-31895][runtime] Add End-to-end integration tests for failure labels
This closes #23405
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml b/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml
new file mode 100644
index 0000000..d3c6a7f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.19-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-failure-enricher-test</artifactId>
+ <name>Flink : E2E Tests : Failure Enricher Test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-annotations</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>FailureEnricherTest</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>FailureEnricherTest</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.FailureEnricherTestProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
+
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java
new file mode 100644
index 0000000..e51578e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** The custom enricher for test. */
+public class CustomTestFailureEnricher implements FailureEnricher {
+
+ private final Set<String> outputKeys;
+
+ public CustomTestFailureEnricher() {
+ this.outputKeys = Collections.singleton("type");
+ }
+
+ @Override
+ public Set<String> getOutputKeys() {
+ return outputKeys;
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> processFailure(Throwable cause, Context context) {
+ if (cause instanceof FlinkException) {
+ return CompletableFuture.completedFuture(Collections.singletonMap("type", "system"));
+ } else {
+ return CompletableFuture.completedFuture(Collections.singletonMap("type", "user"));
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java
new file mode 100644
index 0000000..b8a8f61
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricherFactory;
+
+/** The factory of {@link CustomTestFailureEnricher}. */
+public class CustomTestFailureEnricherFactory implements FailureEnricherFactory {
+
+ @Override
+ public FailureEnricher createFailureEnricher(Configuration conf) {
+ return new CustomTestFailureEnricher();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java
new file mode 100644
index 0000000..d4585bc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link
+ * org.apache.flink.configuration.JobManagerOptions#FAILURE_ENRICHERS_LIST}. We test this by
+ * creating a {@code CustomTestFailureEnricherFactory} and {@code CustomTestFailureEnricher} which
+ * will add label for the failure. And we will add this jar to plugin/failure-enricher package, then
+ * submit this job and verify the exceptions through restful api in test_failure_enricher.sh script.
+ */
+public class FailureEnricherTestProgram {
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool params = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.fromElements("Hello")
+ .map(
+ (MapFunction<String, String>)
+ value -> {
+ throw new RuntimeException("Expect exception");
+ })
+ .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+
+ env.execute("Failure Enricher Test");
+ }
+}
diff --git a/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory
new file mode 100644
index 0000000..096ef5d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-failure-enricher-test/src/main/resources/META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.enricher.CustomTestFailureEnricherFactory
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8a8e84d..27eae87 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -75,6 +75,7 @@
<module>flink-end-to-end-tests-jdbc-driver</module>
<module>flink-end-to-end-tests-hive</module>
<module>flink-end-to-end-tests-restclient</module>
+ <module>flink-failure-enricher-test</module>
</modules>
<dependencies>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 7589d28..57f4fa2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -221,6 +221,8 @@
run_test "Shaded Hadoop S3A with credentials provider end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_with_provider"
+ run_test "Failure Enricher end-to-end test" "$END_TO_END_DIR/test-scripts/test_failure_enricher.sh" "skip_check_exceptions"
+
if [[ `uname -i` != 'aarch64' ]]; then
run_test "PyFlink end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink.sh" "skip_check_exceptions"
fi
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 27997bd..a7c4ce5 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -691,6 +691,13 @@
set_config_key "metrics.reporter.slf4j.filter.includes" "*:${METRIC_NAME_PATTERN}"
}
+function get_job_exceptions {
+ local job_id=$1
+ local json=$(curl ${CURL_SSL_ARGS} -s ${REST_PROTOCOL}://${NODENAME}:8081/jobs/${job_id}/exceptions)
+
+ echo ${json}
+}
+
function get_job_metric {
local job_id=$1
local metric_name=$2
diff --git a/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh b/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh
new file mode 100755
index 0000000..40cd485
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_failure_enricher.sh
@@ -0,0 +1,77 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-failure-enricher-test/target/FailureEnricherTest.jar
+
+echo "Moving the custom failure enricher to plugins/failure-enricher package."
+
+mkdir ${FLINK_DIR}/plugins/failure-enricher
+cp $TEST_PROGRAM_JAR ${FLINK_DIR}/plugins/failure-enricher/
+
+set_config_key "jobmanager.failure-enrichers" "org.apache.flink.runtime.enricher.CustomTestFailureEnricher"
+
+echo "Testing FailureEnricher function."
+
+start_cluster
+
+echo "Submitting job."
+
+CLIENT_OUTPUT=$($FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --output $TEST_DATA_DIR/out/cl_out_pf)
+
+# first need get the jobid
+JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+if [[ -z $JOB_ID ]]; then
+ echo "ERROR: Job could not be submitted."
+ echo "${CLIENT_OUTPUT}"
+ exit 1
+fi
+wait_job_terminal_state "${JOB_ID}" "FAILED"
+
+# then call the restful api get the exceptions
+exceptions_json=$(get_job_exceptions ${JOB_ID})
+if [[ -z ${exceptions_json} ]]; then
+ echo "ERROR: Could not get exceptions of ${jobid}."
+ echo "${CLIENT_OUTPUT}"
+ exit 1
+fi
+
+failure_labels=$(echo $exceptions_json | grep -o '"failureLabels":{[^}]*}' | sed 's/"failureLabels":{\([^}]*\)}/\1/')
+if [[ -z ${failure_labels} ]]; then
+ echo "ERROR: Could not get the failure labels from the exceptions(${exceptions_json})."
+ exit 1
+fi
+
+# verify the exception label info
+except_labels='"type":"user"'
+
+if [[ $failure_labels == *"$except_labels"* ]]; then
+ echo "The test is as expected, failure_labels(${failure_labels}) contains the expected label information."
+else
+ echo "ERROR: The failure_labels(${failure_labels}) does not contain the expected label information(${except_labels})."
+ exit 1
+fi
+
+# stop the cluster
+function cleanup {
+ stop_cluster
+}
+on_exit cleanup