blob: 1ad484270a62e67786f6f4f5194379fa32812a92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
/**
* Connector descriptor for the Apache Kafka message queue.
*/
public class Kafka extends ConnectorDescriptor {
private String version;
private String topic;
private StartupMode startupMode;
private Map<Integer, Long> specificOffsets;
private Map<String, String> kafkaProperties;
private String sinkPartitionerType;
private Class<? extends FlinkKafkaPartitioner> sinkPartitionerClass;
/**
* Connector descriptor for the Apache Kafka message queue.
*/
public Kafka() {
super(CONNECTOR_TYPE_VALUE_KAFKA, 1, true);
}
/**
* Sets the Kafka version to be used.
*
* @param version Kafka version. E.g., "0.8", "0.11", etc.
*/
public Kafka version(String version) {
Preconditions.checkNotNull(version);
this.version = version;
return this;
}
/**
* Sets the topic from which the table is read.
*
* @param topic The topic from which the table is read.
*/
public Kafka topic(String topic) {
Preconditions.checkNotNull(topic);
this.topic = topic;
return this;
}
/**
* Sets the configuration properties for the Kafka consumer. Resets previously set properties.
*
* @param properties The configuration properties for the Kafka consumer.
*/
public Kafka properties(Properties properties) {
Preconditions.checkNotNull(properties);
if (this.kafkaProperties == null) {
this.kafkaProperties = new HashMap<>();
}
this.kafkaProperties.clear();
properties.forEach((k, v) -> this.kafkaProperties.put((String) k, (String) v));
return this;
}
/**
* Adds a configuration properties for the Kafka consumer.
*
* @param key property key for the Kafka consumer
* @param value property value for the Kafka consumer
*/
public Kafka property(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
if (this.kafkaProperties == null) {
this.kafkaProperties = new HashMap<>();
}
kafkaProperties.put(key, value);
return this;
}
/**
* Configures to start reading from the earliest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromEarliest()
*/
public Kafka startFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.specificOffsets = null;
return this;
}
/**
* Configures to start reading from the latest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromLatest()
*/
public Kafka startFromLatest() {
this.startupMode = StartupMode.LATEST;
this.specificOffsets = null;
return this;
}
/**
* Configures to start reading from any committed group offsets found in Zookeeper / Kafka brokers.
*
* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
*/
public Kafka startFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificOffsets = null;
return this;
}
/**
* Configures to start reading partitions from specific offsets, set independently for each partition.
* Resets previously set offsets.
*
* @param specificOffsets the specified offsets for partitions
* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
*/
public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.specificOffsets = Preconditions.checkNotNull(specificOffsets);
return this;
}
/**
* Configures to start reading partitions from specific offsets and specifies the given offset for
* the given partition.
*
* @param partition partition index
* @param specificOffset partition offset to start reading from
* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
*/
public Kafka startFromSpecificOffset(int partition, long specificOffset) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
if (this.specificOffsets == null) {
this.specificOffsets = new HashMap<>();
}
this.specificOffsets.put(partition, specificOffset);
return this;
}
/**
* Configures how to partition records from Flink's partitions into Kafka's partitions.
*
* <p>This strategy ensures that each Flink partition ends up in one Kafka partition.
*
* <p>Note: One Kafka partition can contain multiple Flink partitions. Examples:
*
* <p>More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
* the output of more than one flink partition:
* <pre>
* Flink Sinks Kafka Partitions
* 1 ----------------&gt; 1
* 2 --------------/
* 3 -------------/
* 4 ------------/
* </pre>
*
*
* <p>Fewer Flink partitions than Kafka partitions:
* <pre>
* Flink Sinks Kafka Partitions
* 1 ----------------&gt; 1
* 2 ----------------&gt; 2
* 3
* 4
* 5
* </pre>
*
* @see org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
*/
public Kafka sinkPartitionerFixed() {
sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
sinkPartitionerClass = null;
return this;
}
/**
* Configures how to partition records from Flink's partitions into Kafka's partitions.
*
* <p>This strategy ensures that records will be distributed to Kafka partitions in a
* round-robin fashion.
*
* <p>Note: This strategy is useful to avoid an unbalanced partitioning. However, it will
* cause a lot of network connections between all the Flink instances and all the Kafka brokers.
*/
public Kafka sinkPartitionerRoundRobin() {
sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
sinkPartitionerClass = null;
return this;
}
/**
* Configures how to partition records from Flink's partitions into Kafka's partitions.
*
* <p>This strategy allows for a custom partitioner by providing an implementation
* of {@link FlinkKafkaPartitioner}.
*/
public Kafka sinkPartitionerCustom(Class<? extends FlinkKafkaPartitioner> partitionerClass) {
sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
sinkPartitionerClass = Preconditions.checkNotNull(partitionerClass);
return this;
}
/**
* Internal method for connector properties conversion.
*/
@Override
protected Map<String, String> toConnectorProperties() {
final DescriptorProperties properties = new DescriptorProperties();
if (version != null) {
properties.putString(CONNECTOR_VERSION, version);
}
if (topic != null) {
properties.putString(CONNECTOR_TOPIC, topic);
}
if (startupMode != null) {
properties.putString(CONNECTOR_STARTUP_MODE, KafkaValidator.normalizeStartupMode(startupMode));
}
if (specificOffsets != null) {
final List<List<String>> values = new ArrayList<>();
for (Map.Entry<Integer, Long> specificOffset : specificOffsets.entrySet()) {
values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString()));
}
properties.putIndexedFixedProperties(
CONNECTOR_SPECIFIC_OFFSETS,
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
values);
}
if (kafkaProperties != null) {
properties.putIndexedFixedProperties(
CONNECTOR_PROPERTIES,
Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE),
this.kafkaProperties.entrySet().stream()
.map(e -> Arrays.asList(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
if (sinkPartitionerType != null) {
properties.putString(CONNECTOR_SINK_PARTITIONER, sinkPartitionerType);
if (sinkPartitionerClass != null) {
properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
}
}
return properties.asMap();
}
}