Flink: Adds uid-suffix write option to prevent operator UID hash collisions (#14063)

diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md
index 11d37ed..7e09396 100644
--- a/docs/docs/flink-configuration.md
+++ b/docs/docs/flink-configuration.md
@@ -159,6 +159,7 @@
 | compression-level                       | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                                                             |
 | compression-strategy                    | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                                                       |
 | write-parallelism                       | Upstream operator parallelism              | Overrides the writer parallelism                                                                                                                |
+| uid-suffix                              | As per table property                      | Overrides the uid suffix used in the underlying IcebergSink for this table                                                                      |
 
 #### Range distribution statistics type
 
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 222a1e8..66fd098 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -208,6 +208,14 @@
         .parse();
   }
 
+  public String uidSuffix() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.UID_SUFFIX.key())
+        .defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue())
+        .parse();
+  }
+
   public Integer writeParallelism() {
     return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
   }
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 6bdb01c..e68e64a 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -87,4 +87,8 @@
   @Experimental
   public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
       ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();
+
+  //  specify the uidSuffix to be used for the underlying IcebergSink
+  public static final ConfigOption<String> UID_SUFFIX =
+      ConfigOptions.key("uid-suffix").stringType().defaultValue("");
 }
diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 752882a..5932302 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -321,7 +321,6 @@
 
   public static class Builder implements IcebergSinkBuilder<Builder> {
     private TableLoader tableLoader;
-    private String uidSuffix = "";
     private Function<String, DataStream<RowData>> inputCreator = null;
     @Deprecated private TableSchema tableSchema;
     private ResolvedSchema resolvedSchema;
@@ -597,7 +596,7 @@
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder uidSuffix(String newSuffix) {
-      this.uidSuffix = newSuffix;
+      writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix);
       return this;
     }
 
@@ -666,11 +665,12 @@
 
       FlinkMaintenanceConfig flinkMaintenanceConfig =
           new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
+
       return new IcebergSink(
           tableLoader,
           table,
           snapshotSummary,
-          uidSuffix,
+          flinkWriteConf.uidSuffix(),
           SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table),
           resolvedSchema != null
               ? toFlinkRowType(table.schema(), resolvedSchema)
@@ -691,7 +691,7 @@
     @Override
     public DataStreamSink<RowData> append() {
       IcebergSink sink = build();
-      String suffix = defaultSuffix(uidSuffix, table.name());
+      String suffix = defaultSuffix(sink.uidSuffix, table.name());
       DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
       // Please note that V2 sink framework will apply the uid here to the framework created
       // operators like writer,
diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index d99f657..572ff68 100644
--- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -263,4 +263,59 @@
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
   }
+
+  // Test multiple IcebergSink instances writing to the same table in separate DAG branches.
+  // This ensures the v2 sink can handle multiple sink operators for the same table
+  // without naming collisions or operator conflicts when using statement set execution.
+  @TestTemplate
+  public void testIcebergSinkDifferentDAG() throws Exception {
+    assumeThat(useV2Sink).isTrue();
+
+    // Disable sink reuse optimization to force creation of two separate IcebergSink instances
+    getTableEnv()
+        .getConfig()
+        .getConfiguration()
+        .setString("table.optimizer.reuse-sink-enabled", "false");
+
+    // Register the rows into a temporary table.
+    getTableEnv()
+        .createTemporaryView(
+            "sourceTable",
+            getTableEnv()
+                .fromValues(
+                    SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
+                    Expressions.row(1, "hello"),
+                    Expressions.row(2, "world"),
+                    Expressions.row(3, (String) null),
+                    Expressions.row(null, "bar")));
+
+    getTableEnv()
+        .createTemporaryView(
+            "sourceTable1",
+            getTableEnv()
+                .fromValues(
+                    SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
+                    Expressions.row(1, "hello"),
+                    Expressions.row(2, "world"),
+                    Expressions.row(3, (String) null),
+                    Expressions.row(null, "bar")));
+
+    // Redirect the records from source table to destination table.
+    sql(
+        "EXECUTE STATEMENT SET\n"
+            + "BEGIN\n"
+            + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n"
+            + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n"
+            + "END;",
+        TABLE_NAME, TABLE_NAME);
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(
+        icebergTable,
+        Lists.newArrayList(
+            SimpleDataUtil.createRecord(1, "hello"),
+            SimpleDataUtil.createRecord(2, "world"),
+            SimpleDataUtil.createRecord(3, null),
+            SimpleDataUtil.createRecord(null, "bar")));
+  }
 }