blob: de5b32f4203a6a3310aa5c1fe81aa94745e983b0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.doris.flink.sink.writer.LoadConstants.COLUMNS_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
/** DorisDynamicTableSink. */
public class DorisDynamicTableSink implements DynamicTableSink {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;
private final Integer sinkParallelism;
public DorisDynamicTableSink(
DorisOptions options,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
TableSchema tableSchema,
Integer sinkParallelism) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.tableSchema = tableSchema;
this.sinkParallelism = sinkParallelism;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
if (executionOptions.getIgnoreUpdateBefore()) {
return ChangelogMode.upsert();
} else {
return ChangelogMode.all();
}
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
boolean deletable =
executionOptions.getDeletable()
&& RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns =
String.join(
",",
Arrays.stream(fieldNames)
.map(
item ->
String.format(
"`%s`", item.trim().replace("`", "")))
.collect(Collectors.toList()));
if (deletable) {
columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
}
loadProperties.put(COLUMNS_KEY, columns);
}
RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
serializerBuilder
.setFieldNames(tableSchema.getFieldNames())
.setFieldType(tableSchema.getFieldDataTypes())
.setType(loadProperties.getProperty(FORMAT_KEY, CSV))
.enableDelete(deletable)
.setFieldDelimiter(
loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
dorisSinkBuilder
.setDorisOptions(options)
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
}
@Override
public DynamicTableSink copy() {
return new DorisDynamicTableSink(
options, readOptions, executionOptions, tableSchema, sinkParallelism);
}
@Override
public String asSummaryString() {
return "Doris Table Sink";
}
}