blob: 2bba98bf2c60b79f7b0001e9ef452f04b79967b3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.doris.flink.sink.writer.LoadConstants.COLUMNS_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
/**
* DorisDynamicTableSink
**/
public class DorisDynamicTableSink implements DynamicTableSink {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;
private final Integer sinkParallelism;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
TableSchema tableSchema,
Integer sinkParallelism) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.tableSchema = tableSchema;
this.sinkParallelism = sinkParallelism;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
boolean deletable = RestService.isUniqueKeyType(options, readOptions, LOG) && executionOptions.getDeletable();
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns = String.join(",", Arrays.stream(fieldNames).map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()));
if (deletable) {
columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
}
loadProperties.put(COLUMNS_KEY, columns);
}
RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
serializerBuilder.setFieldNames(tableSchema.getFieldNames())
.setFieldType(tableSchema.getFieldDataTypes())
.setType(loadProperties.getProperty(FORMAT_KEY, CSV))
.enableDelete(deletable)
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
dorisSinkBuilder.setDorisOptions(options)
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
}
@Override
public DynamicTableSink copy() {
return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema, sinkParallelism);
}
@Override
public String asSummaryString() {
return "Doris Table Sink";
}
}