[FLINK-35659][testutils] Fix TestFileSystem Connector error setting execution mode
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
index 41e9a39..0f9879f 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.factories.utils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
@@ -55,6 +56,11 @@
public static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
+ return createTableSource(schema, options, new Configuration());
+ }
+
+ public static DynamicTableSource createTableSource(
+ ResolvedSchema schema, Map<String, String> options, ReadableConfig readableConfig) {
return FactoryUtil.createDynamicTableSource(
null,
IDENTIFIER,
@@ -66,7 +72,7 @@
options),
schema),
Collections.emptyMap(),
- new Configuration(),
+ readableConfig,
FactoryMocks.class.getClassLoader(),
false);
}
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
index 3158f03..4a71fcdc 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
@@ -19,9 +19,7 @@
package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.TestFileSource;
@@ -42,11 +40,14 @@
@Internal
public class TestFileSystemTableSource extends FileSystemTableSource {
+ private final boolean isStreamingMode;
+
public TestFileSystemTableSource(
ObjectIdentifier tableIdentifier,
DataType physicalRowDataType,
List<String> partitionKeys,
ReadableConfig tableOptions,
+ boolean isStreamingMode,
@Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
@Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat) {
super(
@@ -56,6 +57,7 @@
tableOptions,
bulkReaderFormat,
deserializationFormat);
+ this.isStreamingMode = isStreamingMode;
}
@Override
@@ -79,8 +81,6 @@
new NonSplittingRecursiveAllDirEnumerator(
regex)));
- boolean isStreamingMode =
- tableOptions.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
fileSourceBuilder.setStreamingMode(isStreamingMode);
return SourceProvider.of(fileSourceBuilder.build());
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
index a2fc7a2..98d9caf 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.file.testutils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemTableFactory;
import org.apache.flink.connector.file.table.TestFileSystemTableSource;
@@ -62,11 +64,15 @@
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
validate(helper);
+ boolean isStreamingMode =
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
return new TestFileSystemTableSource(
context.getObjectIdentifier(),
context.getPhysicalRowDataType(),
context.getCatalogTable().getPartitionKeys(),
helper.getOptions(),
+ isStreamingMode,
discoverDecodingFormat(context, BulkReaderFormatFactory.class),
discoverDecodingFormat(context, DeserializationFormatFactory.class));
}
diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
index 79eab16..009ede7 100644
--- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.file.testutils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemTableSink;
import org.apache.flink.connector.file.table.TestFileSystemTableSource;
import org.apache.flink.table.api.DataTypes;
@@ -25,13 +26,17 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
@@ -64,4 +69,43 @@
DynamicTableSink sink = createTableSink(SCHEMA, options);
assertThat(sink).isInstanceOf(FileSystemTableSink.class);
}
+
+ @Test
+ void testCreateUnboundedSource() {
+ Map<String, String> options = new HashMap<>();
+ options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem");
+ options.put("path", "/tmp");
+ options.put("format", "testcsv");
+ options.put("source.monitor-interval", "5S");
+
+ DynamicTableSource source = createTableSource(SCHEMA, options);
+ assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+
+ // assert source is unbounded when specify source.monitor-interval
+ ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+ ((TestFileSystemTableSource) source)
+ .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertThat(scanRuntimeProvider.isBounded()).isFalse();
+ }
+
+ @Test
+ void testCreateBoundedSource() {
+ Map<String, String> options = new HashMap<>();
+ options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem");
+ options.put("path", "/tmp");
+ options.put("format", "testcsv");
+ options.put("source.monitor-interval", "5S");
+
+ Configuration configuration = new Configuration();
+ configuration.set(RUNTIME_MODE, BATCH);
+
+ DynamicTableSource source = createTableSource(SCHEMA, options, configuration);
+ assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+
+ // assert source is unbounded when specify source.monitor-interval
+ ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+ ((TestFileSystemTableSource) source)
+ .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertThat(scanRuntimeProvider.isBounded()).isTrue();
+ }
}