Flink: Adds uid-suffix write option to prevent operator UID hash collisions (#14063)
diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md
index 11d37ed..7e09396 100644
--- a/docs/docs/flink-configuration.md
+++ b/docs/docs/flink-configuration.md
@@ -159,6 +159,7 @@
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
+| uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table |
#### Range distribution statistics type
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 222a1e8..66fd098 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -208,6 +208,14 @@
.parse();
}
+ public String uidSuffix() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.UID_SUFFIX.key())
+ .defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue())
+ .parse();
+ }
+
public Integer writeParallelism() {
return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 6bdb01c..e68e64a 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -87,4 +87,8 @@
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();
+
+ // specify the uidSuffix to be used for the underlying IcebergSink
+ public static final ConfigOption<String> UID_SUFFIX =
+ ConfigOptions.key("uid-suffix").stringType().defaultValue("");
}
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 752882a..5932302 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -321,7 +321,6 @@
public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
- private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
@Deprecated private TableSchema tableSchema;
private ResolvedSchema resolvedSchema;
@@ -597,7 +596,7 @@
* @return {@link Builder} to connect the iceberg table.
*/
public Builder uidSuffix(String newSuffix) {
- this.uidSuffix = newSuffix;
+ writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix);
return this;
}
@@ -666,11 +665,12 @@
FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
+
return new IcebergSink(
tableLoader,
table,
snapshotSummary,
- uidSuffix,
+ flinkWriteConf.uidSuffix(),
SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table),
resolvedSchema != null
? toFlinkRowType(table.schema(), resolvedSchema)
@@ -691,7 +691,7 @@
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
- String suffix = defaultSuffix(uidSuffix, table.name());
+ String suffix = defaultSuffix(sink.uidSuffix, table.name());
DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
// Please note that V2 sink framework will apply the uid here to the framework created
// operators like writer,
diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index d99f657..572ff68 100644
--- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -263,4 +263,59 @@
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}
+
+ // Test multiple IcebergSink instances writing to the same table in separate DAG branches.
+ // This ensures the v2 sink can handle multiple sink operators for the same table
+ // without naming collisions or operator conflicts when using statement set execution.
+ @TestTemplate
+ public void testIcebergSinkDifferentDAG() throws Exception {
+ assumeThat(useV2Sink).isTrue();
+
+ // Disable sink reuse optimization to force creation of two separate IcebergSink instances
+ getTableEnv()
+ .getConfig()
+ .getConfiguration()
+ .setString("table.optimizer.reuse-sink-enabled", "false");
+
+ // Register the rows into a temporary table.
+ getTableEnv()
+ .createTemporaryView(
+ "sourceTable",
+ getTableEnv()
+ .fromValues(
+ SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
+ Expressions.row(1, "hello"),
+ Expressions.row(2, "world"),
+ Expressions.row(3, (String) null),
+ Expressions.row(null, "bar")));
+
+ getTableEnv()
+ .createTemporaryView(
+ "sourceTable1",
+ getTableEnv()
+ .fromValues(
+ SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
+ Expressions.row(1, "hello"),
+ Expressions.row(2, "world"),
+ Expressions.row(3, (String) null),
+ Expressions.row(null, "bar")));
+
+ // Redirect the records from source table to destination table.
+ sql(
+ "EXECUTE STATEMENT SET\n"
+ + "BEGIN\n"
+ + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n"
+ + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n"
+ + "END;",
+ TABLE_NAME, TABLE_NAME);
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(
+ icebergTable,
+ Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "hello"),
+ SimpleDataUtil.createRecord(2, "world"),
+ SimpleDataUtil.createRecord(3, null),
+ SimpleDataUtil.createRecord(null, "bar")));
+ }
}