blob: 7cfb30fc04dfed6dac5e9f6df89f310b45e55fc0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.types.TypeConverters;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.util.TableSchemaUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
/**
* Factory for creating configured instances of {@link KafkaTableSource}.
*/
public abstract class KafkaTableSourceSinkFactoryBase implements
StreamTableSourceFactory<Row>,
StreamTableSinkFactory<Row> {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA); // kafka
context.put(CONNECTOR_VERSION, kafkaVersion()); // version
context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
return context;
}
@Override
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();
// kafka
properties.add(CONNECTOR_TOPIC);
properties.add(CONNECTOR_PROPERTIES);
properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
properties.add(CONNECTOR_STARTUP_MODE);
properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
properties.add(CONNECTOR_SINK_PARTITIONER);
properties.add(CONNECTOR_SINK_PARTITIONER_CLASS);
// schema
properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
// time attributes
properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
// format wildcard
properties.add(FORMAT + ".*");
return properties;
}
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
return createKafkaTableSource(
schema,
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
SchemaValidator.deriveFieldMapping(
descriptorProperties,
Optional.of(deserializationSchema.getProducedType())),
topic,
getKafkaProperties(descriptorProperties),
deserializationSchema,
startupOptions.startupMode,
startupOptions.specificOffsets);
}
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(descriptorProperties);
final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors =
SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
// see also FLINK-9870
if (proctime.isPresent() || !rowtimeAttributeDescriptors.isEmpty() ||
checkForCustomFieldMapping(descriptorProperties, schema)) {
throw new TableException("Time attributes and custom field mappings are not supported yet.");
}
return createKafkaTableSink(
schema,
topic,
getKafkaProperties(descriptorProperties),
getFlinkKafkaPartitioner(descriptorProperties),
getSerializationSchema(properties));
}
// --------------------------------------------------------------------------------------------
// For version-specific factories
// --------------------------------------------------------------------------------------------
/**
* Returns the Kafka version.
*/
protected abstract String kafkaVersion();
/**
* True if the Kafka source supports Kafka timestamps, false otherwise.
*
* @return True if the Kafka source supports Kafka timestamps, false otherwise.
*/
protected abstract boolean supportsKafkaTimestamps();
/**
* Constructs the version-specific Kafka table source.
*
* @param schema Schema of the produced table.
* @param proctimeAttribute Field name of the processing time attribute.
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
* @param fieldMapping Mapping for the fields of the table schema to
* fields of the physical returned type.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
* @param startupMode Startup mode for the contained consumer.
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
protected abstract KafkaTableSource createKafkaTableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Map<String, String> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets);
/**
* Constructs the version-specific Kafka table sink.
*
* @param schema Schema of the produced table.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param partitioner Partitioner to select Kafka partition for each item.
*/
protected abstract KafkaTableSink createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema);
// --------------------------------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------------------------------
private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
// allow Kafka timestamps to be used, watermarks can not be received from source
new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(descriptorProperties);
new KafkaValidator().validate(descriptorProperties);
return descriptorProperties;
}
private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) {
@SuppressWarnings("unchecked")
final DeserializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
DeserializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
return formatFactory.createDeserializationSchema(properties);
}
private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
@SuppressWarnings("unchecked")
final SerializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
SerializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
return formatFactory.createSerializationSchema(properties);
}
private Properties getKafkaProperties(DescriptorProperties descriptorProperties) {
final Properties kafkaProperties = new Properties();
final List<Map<String, String>> propsList = descriptorProperties.getFixedIndexedProperties(
CONNECTOR_PROPERTIES,
Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
propsList.forEach(kv -> kafkaProperties.put(
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
));
return kafkaProperties;
}
private StartupOptions getStartupOptions(
DescriptorProperties descriptorProperties,
String topic) {
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
final StartupMode startupMode = descriptorProperties
.getOptionalString(CONNECTOR_STARTUP_MODE)
.map(modeString -> {
switch (modeString) {
case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
return StartupMode.EARLIEST;
case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
return StartupMode.LATEST;
case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
return StartupMode.GROUP_OFFSETS;
case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
final List<Map<String, String>> offsetList = descriptorProperties.getFixedIndexedProperties(
CONNECTOR_SPECIFIC_OFFSETS,
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
offsetList.forEach(kv -> {
final int partition = descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
final long offset = descriptorProperties.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
specificOffsets.put(topicPartition, offset);
});
return StartupMode.SPECIFIC_OFFSETS;
default:
throw new TableException("Unsupported startup mode. Validator should have checked that.");
}
}).orElse(StartupMode.GROUP_OFFSETS);
final StartupOptions options = new StartupOptions();
options.startupMode = startupMode;
options.specificOffsets = specificOffsets;
return options;
}
@SuppressWarnings("unchecked")
private Optional<FlinkKafkaPartitioner<Row>> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
return descriptorProperties
.getOptionalString(CONNECTOR_SINK_PARTITIONER)
.flatMap((String partitionerString) -> {
switch (partitionerString) {
case CONNECTOR_SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new FlinkFixedPartitioner<>());
case CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
case CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM:
final Class<? extends FlinkKafkaPartitioner> partitionerClass =
descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, FlinkKafkaPartitioner.class);
return Optional.of(InstantiationUtil.instantiate(partitionerClass));
default:
throw new TableException("Unsupported sink partitioner. Validator should have checked that.");
}
});
}
private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(
descriptorProperties,
// until FLINK-9870 is fixed we assume that the table schema is the output type
Optional.of(TypeConverters.createExternalTypeInfoFromDataType(TableSchemaUtil.toRowType(schema))));
return fieldMapping.size() != schema.getColumnNames().length ||
!fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue()));
}
private static class StartupOptions {
private StartupMode startupMode;
private Map<KafkaTopicPartition, Long> specificOffsets;
}
}