| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kafka.table; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.serialization.SerializationSchema; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.connector.base.DeliveryGuarantee; |
| import org.apache.flink.connector.kafka.sink.KafkaSink; |
| import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; |
| import org.apache.flink.streaming.api.datastream.DataStreamSink; |
| import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; |
| import org.apache.flink.table.api.DataTypes; |
| import org.apache.flink.table.connector.ChangelogMode; |
| import org.apache.flink.table.connector.format.EncodingFormat; |
| import org.apache.flink.table.connector.sink.DataStreamSinkProvider; |
| import org.apache.flink.table.connector.sink.DynamicTableSink; |
| import org.apache.flink.table.connector.sink.SinkProvider; |
| import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; |
| import org.apache.flink.table.data.ArrayData; |
| import org.apache.flink.table.data.MapData; |
| import org.apache.flink.table.data.RowData; |
| import org.apache.flink.table.types.DataType; |
| import org.apache.flink.table.types.logical.LogicalType; |
| import org.apache.flink.table.types.utils.DataTypeUtils; |
| |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.common.header.Header; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.function.Function; |
| import java.util.stream.Stream; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** A version-agnostic Kafka {@link DynamicTableSink}. */ |
| @Internal |
| public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata { |
| |
| // -------------------------------------------------------------------------------------------- |
| // Mutable attributes |
| // -------------------------------------------------------------------------------------------- |
| |
| /** Metadata that is appended at the end of a physical sink row. */ |
| protected List<String> metadataKeys; |
| |
| // -------------------------------------------------------------------------------------------- |
| // Format attributes |
| // -------------------------------------------------------------------------------------------- |
| |
| /** Data type of consumed data type. */ |
| protected DataType consumedDataType; |
| |
| /** Data type to configure the formats. */ |
| protected final DataType physicalDataType; |
| |
| /** Optional format for encoding keys to Kafka. */ |
| protected final @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat; |
| |
| /** Format for encoding values to Kafka. */ |
| protected final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat; |
| |
| /** Indices that determine the key fields and the source position in the consumed row. */ |
| protected final int[] keyProjection; |
| |
| /** Indices that determine the value fields and the source position in the consumed row. */ |
| protected final int[] valueProjection; |
| |
| /** Prefix that needs to be removed from fields when constructing the physical data type. */ |
| protected final @Nullable String keyPrefix; |
| |
| // -------------------------------------------------------------------------------------------- |
| // Kafka-specific attributes |
| // -------------------------------------------------------------------------------------------- |
| |
| /** The defined delivery guarantee. */ |
| private final DeliveryGuarantee deliveryGuarantee; |
| |
| /** |
| * If the {@link #deliveryGuarantee} is {@link DeliveryGuarantee#EXACTLY_ONCE} the value is the |
| * prefix for all ids of opened Kafka transactions. |
| */ |
| @Nullable private final String transactionalIdPrefix; |
| |
| /** The Kafka topic to write to. */ |
| protected final String topic; |
| |
| /** Properties for the Kafka producer. */ |
| protected final Properties properties; |
| |
| /** Partitioner to select Kafka partition for each item. */ |
| protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner; |
| |
| /** |
| * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message |
| * to tombstone message. |
| */ |
| protected final boolean upsertMode; |
| |
| /** Sink buffer flush config which only supported in upsert mode now. */ |
| protected final SinkBufferFlushMode flushMode; |
| |
| /** Parallelism of the physical Kafka producer. * */ |
| protected final @Nullable Integer parallelism; |
| |
| public KafkaDynamicSink( |
| DataType consumedDataType, |
| DataType physicalDataType, |
| @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat, |
| EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, |
| int[] keyProjection, |
| int[] valueProjection, |
| @Nullable String keyPrefix, |
| String topic, |
| Properties properties, |
| @Nullable FlinkKafkaPartitioner<RowData> partitioner, |
| DeliveryGuarantee deliveryGuarantee, |
| boolean upsertMode, |
| SinkBufferFlushMode flushMode, |
| @Nullable Integer parallelism, |
| @Nullable String transactionalIdPrefix) { |
| // Format attributes |
| this.consumedDataType = |
| checkNotNull(consumedDataType, "Consumed data type must not be null."); |
| this.physicalDataType = |
| checkNotNull(physicalDataType, "Physical data type must not be null."); |
| this.keyEncodingFormat = keyEncodingFormat; |
| this.valueEncodingFormat = |
| checkNotNull(valueEncodingFormat, "Value encoding format must not be null."); |
| this.keyProjection = checkNotNull(keyProjection, "Key projection must not be null."); |
| this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null."); |
| this.keyPrefix = keyPrefix; |
| this.transactionalIdPrefix = transactionalIdPrefix; |
| // Mutable attributes |
| this.metadataKeys = Collections.emptyList(); |
| // Kafka-specific attributes |
| this.topic = checkNotNull(topic, "Topic must not be null."); |
| this.properties = checkNotNull(properties, "Properties must not be null."); |
| this.partitioner = partitioner; |
| this.deliveryGuarantee = |
| checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be null."); |
| this.upsertMode = upsertMode; |
| this.flushMode = checkNotNull(flushMode); |
| if (flushMode.isEnabled() && !upsertMode) { |
| throw new IllegalArgumentException( |
| "Sink buffer flush is only supported in upsert-kafka."); |
| } |
| this.parallelism = parallelism; |
| } |
| |
| @Override |
| public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { |
| return valueEncodingFormat.getChangelogMode(); |
| } |
| |
| @Override |
| public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { |
| final SerializationSchema<RowData> keySerialization = |
| createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix); |
| |
| final SerializationSchema<RowData> valueSerialization = |
| createSerialization(context, valueEncodingFormat, valueProjection, null); |
| |
| final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder(); |
| final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren(); |
| if (transactionalIdPrefix != null) { |
| sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix); |
| } |
| final KafkaSink<RowData> kafkaSink = |
| sinkBuilder |
| .setDeliverGuarantee(deliveryGuarantee) |
| .setBootstrapServers( |
| properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString()) |
| .setKafkaProducerConfig(properties) |
| .setRecordSerializer( |
| new DynamicKafkaRecordSerializationSchema( |
| topic, |
| partitioner, |
| keySerialization, |
| valueSerialization, |
| getFieldGetters(physicalChildren, keyProjection), |
| getFieldGetters(physicalChildren, valueProjection), |
| hasMetadata(), |
| getMetadataPositions(physicalChildren), |
| upsertMode)) |
| .build(); |
| if (flushMode.isEnabled() && upsertMode) { |
| return (DataStreamSinkProvider) |
| dataStream -> { |
| final boolean objectReuse = |
| dataStream |
| .getExecutionEnvironment() |
| .getConfig() |
| .isObjectReuseEnabled(); |
| final ReducingUpsertSink<?> sink = |
| new ReducingUpsertSink<>( |
| kafkaSink, |
| physicalDataType, |
| keyProjection, |
| flushMode, |
| objectReuse |
| ? createRowDataTypeSerializer( |
| context, |
| dataStream.getExecutionConfig()) |
| ::copy |
| : Function.identity()); |
| final DataStreamSink<RowData> end = dataStream.sinkTo(sink); |
| if (parallelism != null) { |
| end.setParallelism(parallelism); |
| } |
| return end; |
| }; |
| } |
| return SinkProvider.of(kafkaSink, parallelism); |
| } |
| |
| @Override |
| public Map<String, DataType> listWritableMetadata() { |
| final Map<String, DataType> metadataMap = new LinkedHashMap<>(); |
| Stream.of(WritableMetadata.values()) |
| .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); |
| return metadataMap; |
| } |
| |
| @Override |
| public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) { |
| this.metadataKeys = metadataKeys; |
| this.consumedDataType = consumedDataType; |
| } |
| |
| @Override |
| public DynamicTableSink copy() { |
| final KafkaDynamicSink copy = |
| new KafkaDynamicSink( |
| consumedDataType, |
| physicalDataType, |
| keyEncodingFormat, |
| valueEncodingFormat, |
| keyProjection, |
| valueProjection, |
| keyPrefix, |
| topic, |
| properties, |
| partitioner, |
| deliveryGuarantee, |
| upsertMode, |
| flushMode, |
| parallelism, |
| transactionalIdPrefix); |
| copy.metadataKeys = metadataKeys; |
| return copy; |
| } |
| |
| @Override |
| public String asSummaryString() { |
| return "Kafka table sink"; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| final KafkaDynamicSink that = (KafkaDynamicSink) o; |
| return Objects.equals(metadataKeys, that.metadataKeys) |
| && Objects.equals(consumedDataType, that.consumedDataType) |
| && Objects.equals(physicalDataType, that.physicalDataType) |
| && Objects.equals(keyEncodingFormat, that.keyEncodingFormat) |
| && Objects.equals(valueEncodingFormat, that.valueEncodingFormat) |
| && Arrays.equals(keyProjection, that.keyProjection) |
| && Arrays.equals(valueProjection, that.valueProjection) |
| && Objects.equals(keyPrefix, that.keyPrefix) |
| && Objects.equals(topic, that.topic) |
| && Objects.equals(properties, that.properties) |
| && Objects.equals(partitioner, that.partitioner) |
| && Objects.equals(deliveryGuarantee, that.deliveryGuarantee) |
| && Objects.equals(upsertMode, that.upsertMode) |
| && Objects.equals(flushMode, that.flushMode) |
| && Objects.equals(transactionalIdPrefix, that.transactionalIdPrefix) |
| && Objects.equals(parallelism, that.parallelism); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash( |
| metadataKeys, |
| consumedDataType, |
| physicalDataType, |
| keyEncodingFormat, |
| valueEncodingFormat, |
| keyProjection, |
| valueProjection, |
| keyPrefix, |
| topic, |
| properties, |
| partitioner, |
| deliveryGuarantee, |
| upsertMode, |
| flushMode, |
| transactionalIdPrefix, |
| parallelism); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private TypeSerializer<RowData> createRowDataTypeSerializer( |
| Context context, ExecutionConfig executionConfig) { |
| final TypeInformation<RowData> typeInformation = |
| context.createTypeInformation(consumedDataType); |
| return typeInformation.createSerializer(executionConfig); |
| } |
| |
| private int[] getMetadataPositions(List<LogicalType> physicalChildren) { |
| return Stream.of(WritableMetadata.values()) |
| .mapToInt( |
| m -> { |
| final int pos = metadataKeys.indexOf(m.key); |
| if (pos < 0) { |
| return -1; |
| } |
| return physicalChildren.size() + pos; |
| }) |
| .toArray(); |
| } |
| |
| private boolean hasMetadata() { |
| return metadataKeys.size() > 0; |
| } |
| |
| private RowData.FieldGetter[] getFieldGetters( |
| List<LogicalType> physicalChildren, int[] keyProjection) { |
| return Arrays.stream(keyProjection) |
| .mapToObj( |
| targetField -> |
| RowData.createFieldGetter( |
| physicalChildren.get(targetField), targetField)) |
| .toArray(RowData.FieldGetter[]::new); |
| } |
| |
| private @Nullable SerializationSchema<RowData> createSerialization( |
| DynamicTableSink.Context context, |
| @Nullable EncodingFormat<SerializationSchema<RowData>> format, |
| int[] projection, |
| @Nullable String prefix) { |
| if (format == null) { |
| return null; |
| } |
| DataType physicalFormatDataType = |
| DataTypeUtils.projectRow(this.physicalDataType, projection); |
| if (prefix != null) { |
| physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); |
| } |
| return format.createRuntimeEncoder(context, physicalFormatDataType); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Metadata handling |
| // -------------------------------------------------------------------------------------------- |
| |
| enum WritableMetadata { |
| HEADERS( |
| "headers", |
| // key and value of the map are nullable to make handling easier in queries |
| DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()) |
| .nullable(), |
| new MetadataConverter() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object read(RowData row, int pos) { |
| if (row.isNullAt(pos)) { |
| return null; |
| } |
| final MapData map = row.getMap(pos); |
| final ArrayData keyArray = map.keyArray(); |
| final ArrayData valueArray = map.valueArray(); |
| final List<Header> headers = new ArrayList<>(); |
| for (int i = 0; i < keyArray.size(); i++) { |
| if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) { |
| final String key = keyArray.getString(i).toString(); |
| final byte[] value = valueArray.getBinary(i); |
| headers.add(new KafkaHeader(key, value)); |
| } |
| } |
| return headers; |
| } |
| }), |
| |
| TIMESTAMP( |
| "timestamp", |
| DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), |
| new MetadataConverter() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object read(RowData row, int pos) { |
| if (row.isNullAt(pos)) { |
| return null; |
| } |
| return row.getTimestamp(pos, 3).getMillisecond(); |
| } |
| }); |
| |
| final String key; |
| |
| final DataType dataType; |
| |
| final MetadataConverter converter; |
| |
| WritableMetadata(String key, DataType dataType, MetadataConverter converter) { |
| this.key = key; |
| this.dataType = dataType; |
| this.converter = converter; |
| } |
| } |
| |
| interface MetadataConverter extends Serializable { |
| Object read(RowData consumedRow, int pos); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private static class KafkaHeader implements Header { |
| |
| private final String key; |
| |
| private final byte[] value; |
| |
| KafkaHeader(String key, byte[] value) { |
| this.key = key; |
| this.value = value; |
| } |
| |
| @Override |
| public String key() { |
| return key; |
| } |
| |
| @Override |
| public byte[] value() { |
| return value; |
| } |
| } |
| } |