[FLINK-35659][testutils] Fix TestFileSystem Connector error setting execution mode
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
index 41e9a39..0f9879f 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -55,6 +56,11 @@
 
     public static DynamicTableSource createTableSource(
             ResolvedSchema schema, Map<String, String> options) {
+        return createTableSource(schema, options, new Configuration());
+    }
+
+    public static DynamicTableSource createTableSource(
+            ResolvedSchema schema, Map<String, String> options, ReadableConfig readableConfig) {
         return FactoryUtil.createDynamicTableSource(
                 null,
                 IDENTIFIER,
@@ -66,7 +72,7 @@
                                 options),
                         schema),
                 Collections.emptyMap(),
-                new Configuration(),
+                readableConfig,
                 FactoryMocks.class.getClassLoader(),
                 false);
     }
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
index 3158f03..4a71fcdc 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
@@ -19,9 +19,7 @@
 package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.TestFileSource;
@@ -42,11 +40,14 @@
 @Internal
 public class TestFileSystemTableSource extends FileSystemTableSource {
 
+    private final boolean isStreamingMode;
+
     public TestFileSystemTableSource(
             ObjectIdentifier tableIdentifier,
             DataType physicalRowDataType,
             List<String> partitionKeys,
             ReadableConfig tableOptions,
+            boolean isStreamingMode,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat) {
         super(
@@ -56,6 +57,7 @@
                 tableOptions,
                 bulkReaderFormat,
                 deserializationFormat);
+        this.isStreamingMode = isStreamingMode;
     }
 
     @Override
@@ -79,8 +81,6 @@
                                                         new NonSplittingRecursiveAllDirEnumerator(
                                                                 regex)));
 
-        boolean isStreamingMode =
-                tableOptions.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
         fileSourceBuilder.setStreamingMode(isStreamingMode);
 
         return SourceProvider.of(fileSourceBuilder.build());
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
index a2fc7a2..98d9caf 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.file.testutils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.connector.file.table.FileSystemTableFactory;
 import org.apache.flink.connector.file.table.TestFileSystemTableSource;
@@ -62,11 +64,15 @@
 
         FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
         return new TestFileSystemTableSource(
                 context.getObjectIdentifier(),
                 context.getPhysicalRowDataType(),
                 context.getCatalogTable().getPartitionKeys(),
                 helper.getOptions(),
+                isStreamingMode,
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, DeserializationFormatFactory.class));
     }
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
index 79eab16..009ede7 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.file.testutils;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.table.FileSystemTableSink;
 import org.apache.flink.connector.file.table.TestFileSystemTableSource;
 import org.apache.flink.table.api.DataTypes;
@@ -25,13 +26,17 @@
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -64,4 +69,43 @@
         DynamicTableSink sink = createTableSink(SCHEMA, options);
         assertThat(sink).isInstanceOf(FileSystemTableSink.class);
     }
+
+    @Test
+    void testCreateUnboundedSource() {
+        Map<String, String> options = new HashMap<>();
+        options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem");
+        options.put("path", "/tmp");
+        options.put("format", "testcsv");
+        options.put("source.monitor-interval", "5S");
+
+        DynamicTableSource source = createTableSource(SCHEMA, options);
+        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+
+        // assert source is unbounded when specify source.monitor-interval
+        ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+                ((TestFileSystemTableSource) source)
+                        .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(scanRuntimeProvider.isBounded()).isFalse();
+    }
+
+    @Test
+    void testCreateBoundedSource() {
+        Map<String, String> options = new HashMap<>();
+        options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem");
+        options.put("path", "/tmp");
+        options.put("format", "testcsv");
+        options.put("source.monitor-interval", "5S");
+
+        Configuration configuration = new Configuration();
+        configuration.set(RUNTIME_MODE, BATCH);
+
+        DynamicTableSource source = createTableSource(SCHEMA, options, configuration);
+        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+
+        // assert source is unbounded when specify source.monitor-interval
+        ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+                ((TestFileSystemTableSource) source)
+                        .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(scanRuntimeProvider.isBounded()).isTrue();
+    }
 }