[FLINK-26537][statefun] Allow disabling StatefulFunctionsConfigValidator validation for classloader.parent-first-patterns.additional
diff --git a/docs/content/docs/deployment/configurations.md b/docs/content/docs/deployment/configurations.md
index a866225..4deb8e8 100644
--- a/docs/content/docs/deployment/configurations.md
+++ b/docs/content/docs/deployment/configurations.md
@@ -75,5 +75,14 @@
<td>Integer</td>
<td>The max number of async operations per task before backpressure is applied.</td>
</tr>
+ <tr>
+ <td><h5>statefun.embedded</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Set to 'true' if Flink is running this job from an uber jar, rather than using statefun-specific docker images.
+ This disables the validation of whether 'classloader.parent-first-patterns.additional'
+ contains 'org.apache.flink.statefun', 'org.apache.kafka' and 'com.google.protobuf' patterns.
+ It is then up to the creator of the uber jar to ensure that the three dependencies (statefun, kafka and protobuf) don't have version conflicts.</td>
+ </tr>
</tbody>
</table>
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 20f18f4..36a1419 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -107,6 +107,13 @@
.withDescription(
"The name of the remote module entity to look for. Also supported, file:///...");
+ public static final ConfigOption<Boolean> EMBEDDED =
+ ConfigOptions.key("statefun.embedded")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "True if Flink is running this job from an uber jar, rather than using statefun-specific docker images");
+
/**
* Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
* current environment set via the {@code flink-conf.yaml}.
@@ -134,7 +141,9 @@
private String remoteModuleName;
- private Map<String, String> globalConfigurations = new HashMap<>();
+ private boolean embedded;
+
+ private final Map<String, String> globalConfigurations = new HashMap<>();
/**
* Create a new configuration object based on the values set in flink-conf.
@@ -149,6 +158,7 @@
this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
this.maxAsyncOperationsPerTask = configuration.get(ASYNC_MAX_OPERATIONS_PER_TASK);
this.remoteModuleName = configuration.get(REMOTE_MODULE_NAME);
+ this.embedded = configuration.getBoolean(EMBEDDED);
for (String key : configuration.keySet()) {
if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -234,6 +244,19 @@
this.remoteModuleName = Objects.requireNonNull(remoteModuleName);
}
+ /** Returns whether the job was launched in embedded mode (see {@linkplain #EMBEDDED}). */
+ public boolean isEmbedded() {
+ return embedded;
+ }
+
+ /**
+ * Sets the embedded mode. If true, disables certain validation steps. See documentation:
+ * Configurations.
+ */
+ public void setEmbedded(boolean embedded) {
+ this.embedded = embedded;
+ }
+
/**
* Retrieves the universe provider for loading modules.
*
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 0defdf8..aabdf60 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -18,12 +18,7 @@
package org.apache.flink.statefun.flink.core;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
+import java.util.*;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -42,8 +37,10 @@
public static final int MAX_CONCURRENT_CHECKPOINTS = 1;
- static void validate(Configuration configuration) {
- validateParentFirstClassloaderPatterns(configuration);
+ static void validate(boolean isEmbedded, Configuration configuration) {
+ if (!isEmbedded) {
+ validateParentFirstClassloaderPatterns(configuration);
+ }
validateCustomPayloadSerializerClassName(configuration);
validateNoHeapBackedTimers(configuration);
validateUnalignedCheckpointsDisabled(configuration);
@@ -70,10 +67,9 @@
}
private static void validateCustomPayloadSerializerClassName(Configuration configuration) {
-
- MessageFactoryType factoryType =
+ final MessageFactoryType factoryType =
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
- String customPayloadSerializerClassName =
+ final String customPayloadSerializerClassName =
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 3b5dace..c86ef58 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -19,7 +19,6 @@
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -36,19 +35,21 @@
private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
public static void main(String... args) throws Exception {
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Map<String, String> globalConfigurations = parameterTool.toMap();
-
+ ParameterTool argsParameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Configuration flinkConfig = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
- StatefulFunctionsConfigValidator.validate(flinkConfig);
StatefulFunctionsConfig stateFunConfig =
- StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
- stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
+ StatefulFunctionsConfig.fromFlinkConfiguration(
+ ParameterTool.fromMap(flinkConfig.toMap())
+ .mergeWith(argsParameterTool)
+ .getConfiguration());
+
+ stateFunConfig.addAllGlobalConfigurations(argsParameterTool.toMap());
stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+ StatefulFunctionsConfigValidator.validate(stateFunConfig.isEmbedded(), flinkConfig);
+
main(env, stateFunConfig);
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index dc794bb..769cd98 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -91,7 +91,7 @@
Configuration configuration = baseConfiguration();
configuration.set(
StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_CUSTOM_PAYLOADS);
- StatefulFunctionsConfigValidator.validate(configuration);
+ StatefulFunctionsConfigValidator.validate(false, configuration);
}
@Test(expected = StatefulFunctionsInvalidConfigException.class)
@@ -101,6 +101,6 @@
StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
configuration.set(
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS, serializerClassName);
- StatefulFunctionsConfigValidator.validate(configuration);
+ StatefulFunctionsConfigValidator.validate(false, configuration);
}
}