[Feature] Support insert overwrite (#544)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 17db7d2..f43d49b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -21,18 +21,23 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
@@ -46,13 +51,14 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
/** DorisDynamicTableSink. */
-public class DorisDynamicTableSink implements DynamicTableSink {
+public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;
private final Integer sinkParallelism;
+ private boolean overwrite = false;
public DorisDynamicTableSink(
DorisOptions options,
@@ -115,13 +121,46 @@
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
- return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
+ DorisSink<RowData> dorisSink = dorisSinkBuilder.build();
+
+ // for insert overwrite
+ if (overwrite) {
+ if (context.isBounded()) {
+ // execute jdbc query to truncate table
+ Preconditions.checkArgument(
+ options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode.");
+ // todo: should be written to a temporary table first,
+ // and then use GlobalCommitter to perform the rename.
+ truncateTable();
+ } else {
+ throw new IllegalStateException("Streaming mode not support overwrite.");
+ }
+ }
+ return SinkV2Provider.of(dorisSink, sinkParallelism);
+ }
+
+ private void truncateTable() {
+ String truncateQuery = "TRUNCATE TABLE " + options.getTableIdentifier();
+ SimpleJdbcConnectionProvider jdbcConnectionProvider =
+ new SimpleJdbcConnectionProvider(options);
+ try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+ Statement statement = connection.createStatement()) {
+ LOG.info("Executing truncate query: {}", truncateQuery);
+ statement.execute(truncateQuery);
+ } catch (Exception e) {
+ LOG.error("Failed to execute truncate query: {}", truncateQuery, e);
+ throw new DorisSystemException(
+ String.format("Failed to execute truncate query: %s", truncateQuery), e);
+ }
}
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(
- options, readOptions, executionOptions, tableSchema, sinkParallelism);
+ DorisDynamicTableSink sink =
+ new DorisDynamicTableSink(
+ options, readOptions, executionOptions, tableSchema, sinkParallelism);
+ sink.overwrite = overwrite;
+ return sink;
}
@Override
@@ -142,11 +181,18 @@
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(executionOptions, that.executionOptions)
&& Objects.equals(tableSchema, that.tableSchema)
- && Objects.equals(sinkParallelism, that.sinkParallelism);
+ && Objects.equals(sinkParallelism, that.sinkParallelism)
+ && Objects.equals(overwrite, that.overwrite);
}
@Override
public int hashCode() {
- return Objects.hash(options, readOptions, executionOptions, tableSchema, sinkParallelism);
+ return Objects.hash(
+ options, readOptions, executionOptions, tableSchema, sinkParallelism, overwrite);
+ }
+
+ @Override
+ public void applyOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 80986ea..dd74bb1 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -25,6 +25,7 @@
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.StringUtils;
@@ -53,6 +54,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -68,6 +70,7 @@
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
+ static final String TABLE_OVERWRITE = "tbl_overwrite";
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
@@ -556,6 +559,59 @@
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
+ @Test
+ public void testTableOverwrite() throws Exception {
+ initializeTable(TABLE_OVERWRITE);
+ // mock data
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format(
+ "INSERT INTO %s.%s values('history-data',12)", DATABASE, TABLE_OVERWRITE));
+
+ List<String> expected_his = Arrays.asList("history-data,12");
+ String query =
+ String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_OVERWRITE);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected_his, query, 2);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_overwrite_sink ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'jdbc-url' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'sink.label-prefix' = '"
+ + UUID.randomUUID()
+ + "'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_OVERWRITE,
+ getDorisQueryUrl(),
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sinkDDL);
+ TableResult tableResult =
+ tEnv.executeSql(
+ "INSERT OVERWRITE doris_overwrite_sink SELECT 'doris',1 union all SELECT 'overwrite',2 union all SELECT 'flink',3");
+
+ tableResult.await(25000, TimeUnit.MILLISECONDS);
+ List<String> expected = Arrays.asList("doris,1", "flink,3", "overwrite,2");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
+ }
+
private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),