blob: 38b7d7796b1b27b95d059d6f172f923796965f50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.util.TableConnectorUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
/**
* A version-agnostic Kafka {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
public abstract class KafkaTableSourceBase implements
StreamTableSource<Row>,
DefinedProctimeAttribute,
DefinedRowtimeAttributes,
DefinedFieldMapping {
// common table source attributes
/** The schema of the table. */
private final TableSchema schema;
/** Field name of the processing time attribute, null if no processing time field is defined. */
private final Optional<String> proctimeAttribute;
/** Descriptor for a rowtime attribute. */
private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
/** Mapping for the fields of the table schema to fields of the physical returned type. */
private final Optional<Map<String, String>> fieldMapping;
// Kafka-specific attributes
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
/** Deserialization schema for decoding records from Kafka. */
private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
private final StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private final Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param schema Schema of the produced table.
* @param proctimeAttribute Field name of the processing time attribute.
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
* @param fieldMapping Mapping for the fields of the table schema to
* fields of the physical returned type.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
* @param startupMode Startup mode for the contained consumer.
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
protected KafkaTableSourceBase(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
this.proctimeAttribute = validateProctimeAttribute(proctimeAttribute);
this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
this.fieldMapping = fieldMapping;
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.deserializationSchema = Preconditions.checkNotNull(
deserializationSchema, "Deserialization schema must not be null.");
this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
this.specificStartupOffsets = Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
}
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param schema Schema of the produced table.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
*/
protected KafkaTableSourceBase(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema) {
this(
schema,
Optional.empty(),
Collections.emptyList(),
Optional.empty(),
topic, properties,
deserializationSchema,
StartupMode.GROUP_OFFSETS,
Collections.emptyMap());
}
/**
* NOTE: This method is for internal use only for defining a TableSource.
* Do not use it in Table API programs.
*/
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
DeserializationSchema<Row> deserializationSchema = getDeserializationSchema();
// Version-specific Kafka consumer
FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
return env.addSource(kafkaConsumer).name(explainSource());
}
@Override
public TypeInformation<Row> getReturnType() {
return deserializationSchema.getProducedType();
}
@Override
public TableSchema getTableSchema() {
return schema;
}
@Override
public String getProctimeAttribute() {
return proctimeAttribute.orElse(null);
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
return rowtimeAttributeDescriptors;
}
@Override
public Map<String, String> getFieldMapping() {
return fieldMapping.orElse(null);
}
@Override
public String explainSource() {
return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getFieldNames());
}
/**
* Returns the properties for the Kafka consumer.
*
* @return properties for the Kafka consumer.
*/
public Properties getProperties() {
return properties;
}
/**
* Returns the deserialization schema.
*
* @return The deserialization schema
*/
public DeserializationSchema<Row> getDeserializationSchema(){
return deserializationSchema;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaTableSourceBase that = (KafkaTableSourceBase) o;
return Objects.equals(schema, that.schema) &&
Objects.equals(proctimeAttribute, that.proctimeAttribute) &&
Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) &&
Objects.equals(fieldMapping, that.fieldMapping) &&
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Objects.equals(deserializationSchema, that.deserializationSchema) &&
startupMode == that.startupMode &&
Objects.equals(specificStartupOffsets, that.specificStartupOffsets);
}
@Override
public int hashCode() {
return Objects.hash(
schema,
proctimeAttribute,
rowtimeAttributeDescriptors,
fieldMapping,
topic,
properties,
deserializationSchema,
startupMode,
specificStartupOffsets);
}
/**
* Returns a version-specific Kafka consumer with the start position configured.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema) {
FlinkKafkaConsumerBase<Row> kafkaConsumer =
createKafkaConsumer(topic, properties, deserializationSchema);
switch (startupMode) {
case EARLIEST:
kafkaConsumer.setStartFromEarliest();
break;
case LATEST:
kafkaConsumer.setStartFromLatest();
break;
case GROUP_OFFSETS:
kafkaConsumer.setStartFromGroupOffsets();
break;
case SPECIFIC_OFFSETS:
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
}
return kafkaConsumer;
}
//////// VALIDATION FOR PARAMETERS
/**
* Validates a field of the schema to be the processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
*/
private Optional<String> validateProctimeAttribute(Optional<String> proctimeAttribute) {
return proctimeAttribute.map((attribute) -> {
// validate that field exists and is of correct type
Optional<TypeInformation<?>> tpe = schema.getFieldType(attribute);
if (!tpe.isPresent()) {
throw new ValidationException("Processing time attribute '" + attribute + "' is not present in TableSchema.");
} else if (tpe.get() != Types.SQL_TIMESTAMP()) {
throw new ValidationException("Processing time attribute '" + attribute + "' is not of type SQL_TIMESTAMP.");
}
return attribute;
});
}
/**
* Validates a list of fields to be rowtime attributes.
*
* @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
*/
private List<RowtimeAttributeDescriptor> validateRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
Preconditions.checkNotNull(rowtimeAttributeDescriptors, "List of rowtime attributes must not be null.");
// validate that all declared fields exist and are of correct type
for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
String rowtimeAttribute = desc.getAttributeName();
Optional<TypeInformation<?>> tpe = schema.getFieldType(rowtimeAttribute);
if (!tpe.isPresent()) {
throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not present in TableSchema.");
} else if (tpe.get() != Types.SQL_TIMESTAMP()) {
throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not of type SQL_TIMESTAMP.");
}
}
return rowtimeAttributeDescriptors;
}
//////// ABSTRACT METHODS FOR SUBCLASSES
/**
* Creates a version-specific Kafka consumer.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema);
}