[FLINK-26537][statefun] Allow disabling StatefulFunctionsConfigValidator validation for classloader.parent-first-patterns.additional
diff --git a/docs/content/docs/deployment/configurations.md b/docs/content/docs/deployment/configurations.md
index a866225..4deb8e8 100644
--- a/docs/content/docs/deployment/configurations.md
+++ b/docs/content/docs/deployment/configurations.md
@@ -75,5 +75,14 @@
             <td>Integer</td>
             <td>The max number of async operations per task before backpressure is applied.</td>
         </tr>
+        <tr>
+            <td><h5>statefun.embedded</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Set to 'true' if Flink is running this job from an uber jar, rather than using statefun-specific docker images.
+                This disables the validation of whether 'classloader.parent-first-patterns.additional' 
+                contains 'org.apache.flink.statefun', 'org.apache.kafka' and 'com.google.protobuf' patterns.
+                It is then up to the creator of the uber jar to ensure that the three dependencies (statefun, kafka and protobuf) don't have version conflicts.</td>
+        </tr>
 	</tbody>
 </table>
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 20f18f4..36a1419 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -107,6 +107,13 @@
           .withDescription(
               "The name of the remote module entity to look for. Also supported, file:///...");
 
+  public static final ConfigOption<Boolean> EMBEDDED =
+      ConfigOptions.key("statefun.embedded")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "True if Flink is running this job from an uber jar, rather than using statefun-specific docker images");
+
   /**
    * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
    * current environment set via the {@code flink-conf.yaml}.
@@ -134,7 +141,9 @@
 
   private String remoteModuleName;
 
-  private Map<String, String> globalConfigurations = new HashMap<>();
+  private boolean embedded;
+
+  private final Map<String, String> globalConfigurations = new HashMap<>();
 
   /**
    * Create a new configuration object based on the values set in flink-conf.
@@ -149,6 +158,7 @@
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
     this.maxAsyncOperationsPerTask = configuration.get(ASYNC_MAX_OPERATIONS_PER_TASK);
     this.remoteModuleName = configuration.get(REMOTE_MODULE_NAME);
+    this.embedded = configuration.getBoolean(EMBEDDED);
 
     for (String key : configuration.keySet()) {
       if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -234,6 +244,19 @@
     this.remoteModuleName = Objects.requireNonNull(remoteModuleName);
   }
 
+  /** Returns whether the job was launched in embedded mode (see {@linkplain #EMBEDDED}). */
+  public boolean isEmbedded() {
+    return embedded;
+  }
+
+  /**
+   * Sets the embedded mode. If true, disables certain validation steps. See documentation:
+   * Configurations.
+   */
+  public void setEmbedded(boolean embedded) {
+    this.embedded = embedded;
+  }
+
   /**
    * Retrieves the universe provider for loading modules.
    *
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 0defdf8..aabdf60 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.statefun.flink.core;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
+import java.util.*;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -42,8 +37,10 @@
 
   public static final int MAX_CONCURRENT_CHECKPOINTS = 1;
 
-  static void validate(Configuration configuration) {
-    validateParentFirstClassloaderPatterns(configuration);
+  static void validate(boolean isEmbedded, Configuration configuration) {
+    if (!isEmbedded) {
+      validateParentFirstClassloaderPatterns(configuration);
+    }
     validateCustomPayloadSerializerClassName(configuration);
     validateNoHeapBackedTimers(configuration);
     validateUnalignedCheckpointsDisabled(configuration);
@@ -70,10 +67,9 @@
   }
 
   private static void validateCustomPayloadSerializerClassName(Configuration configuration) {
-
-    MessageFactoryType factoryType =
+    final MessageFactoryType factoryType =
         configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
-    String customPayloadSerializerClassName =
+    final String customPayloadSerializerClassName =
         configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
 
     if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 3b5dace..c86ef58 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -19,7 +19,6 @@
 
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -36,19 +35,21 @@
   private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
 
   public static void main(String... args) throws Exception {
-    ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Map<String, String> globalConfigurations = parameterTool.toMap();
-
+    ParameterTool argsParameterTool = ParameterTool.fromArgs(args);
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
     Configuration flinkConfig = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
-    StatefulFunctionsConfigValidator.validate(flinkConfig);
 
     StatefulFunctionsConfig stateFunConfig =
-        StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
-    stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
+        StatefulFunctionsConfig.fromFlinkConfiguration(
+            ParameterTool.fromMap(flinkConfig.toMap())
+                .mergeWith(argsParameterTool)
+                .getConfiguration());
+
+    stateFunConfig.addAllGlobalConfigurations(argsParameterTool.toMap());
     stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
+    StatefulFunctionsConfigValidator.validate(stateFunConfig.isEmbedded(), flinkConfig);
+
     main(env, stateFunConfig);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index dc794bb..769cd98 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -91,7 +91,7 @@
     Configuration configuration = baseConfiguration();
     configuration.set(
         StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_CUSTOM_PAYLOADS);
-    StatefulFunctionsConfigValidator.validate(configuration);
+    StatefulFunctionsConfigValidator.validate(false, configuration);
   }
 
   @Test(expected = StatefulFunctionsInvalidConfigException.class)
@@ -101,6 +101,6 @@
         StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
     configuration.set(
         StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS, serializerClassName);
-    StatefulFunctionsConfigValidator.validate(configuration);
+    StatefulFunctionsConfigValidator.validate(false, configuration);
   }
 }