blob: 300d54daeb0ce8349ed7fda93da73f09c27709cf [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.streaming.connectors.kafka.v2;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.v2.common.TableBaseInfo;
import org.apache.flink.streaming.connectors.kafka.v2.input.Kafka08TableSource;
import org.apache.flink.streaming.connectors.kafka.v2.sink.Kafka08OutputFormat;
import org.apache.flink.streaming.connectors.kafka.v2.sink.Kafka08TableSink;
import org.apache.flink.table.api.RichTableSchema;
import org.apache.flink.table.api.types.TypeConverters;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.factories.BatchCompatibleTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.BatchCompatibleStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.util.TableProperties;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.streaming.connectors.kafka.v2.common.util.SourceUtils.toRowTypeInfo;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
/** Kafka08 TableFactory. **/
public class Kafka08TableFactory extends KafkaBaseTableFactory implements
StreamTableSourceFactory<GenericRow>,
StreamTableSinkFactory<Tuple2<Boolean, Row>>,
BatchTableSourceFactory<GenericRow>,
BatchCompatibleTableSinkFactory<Tuple2<Boolean, Row>> {
private Kafka08TableSource createSource(Map<String, String> props) {
TableProperties properties = new TableProperties();
properties.putProperties(props);
RichTableSchema schema = properties.readSchemaFromProperties(null);
String topicStr = properties.getString(KafkaOptions.TOPIC);
String topicPatternStr = properties.getString(KafkaOptions.TOPIC_PATTERN);
Properties prop = getProperties(
Kafka08Options.ESSENTIAL_CONSUMER_KEYS,
Kafka08Options.OPTIONAL_CONSUMER_KEYS,
properties);
long partitionDiscoveryIntervalMs = properties.getLong(KafkaOptions.PARTITION_DISCOVERY_INTERVAL_MS);
prop.put(KafkaOptions.KAFKA_KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, partitionDiscoveryIntervalMs);
// TODO: support batch mode.
boolean isBatchMode = false;
if (!StringUtils.isNullOrWhitespaceOnly(topicStr)) {
List<String> topics = Arrays.asList(topicStr.split(","));
return new Kafka08TableSource(topics, null, prop, getStartupMode(properties), -1, isBatchMode,
TypeConverters.toBaseRowTypeInfo(schema.getResultType()));
} else if (!StringUtils.isNullOrWhitespaceOnly(topicPatternStr)) {
return new Kafka08TableSource(null, topicPatternStr, prop, getStartupMode(properties), -1, isBatchMode,
TypeConverters.toBaseRowTypeInfo(schema.getResultType()));
} else {
throw new RuntimeException("No sufficient parameters for Kafka08." +
"topic or topic pattern needed.");
}
}
private Kafka08TableSink createSink(Map<String, String> props) {
TableProperties properties = new TableProperties();
properties.putProperties(props);
RichTableSchema schema = properties.readSchemaFromProperties(null);
String topic = properties.getString(KafkaOptions.TOPIC);
Properties prop = getProperties(
Kafka08Options.ESSENTIAL_PRODUCER_KEYS,
Kafka08Options.OPTIONAL_PRODUCER_KEYS,
properties);
KafkaConverter kafkaConverter;
String convertClassStr = properties.getString(KafkaOptions.OPTIONAL_CONVERTER_CLASS);
if (null != convertClassStr && !convertClassStr.isEmpty()) {
try {
Class converterClass = Thread.currentThread()
.getContextClassLoader().loadClass(convertClassStr);
kafkaConverter = (KafkaConverter) converterClass.newInstance();
} catch (Exception e) {
throw new RuntimeException("Load Kafka converter class exception!", e);
}
} else {
kafkaConverter = new DefaultKafkaConverter();
}
if (kafkaConverter instanceof TableBaseInfo) {
TableBaseInfo tableBaseInfo = (TableBaseInfo) kafkaConverter;
tableBaseInfo.setHeaderFields(schema.getHeaderFields())
.setRowTypeInfo(toRowTypeInfo(schema.getResultRowType()))
.setPrimaryKeys(schema.getPrimaryKeys())
.setUserParamsMap(properties.toMap());
}
Kafka08OutputFormat.Builder builder = new Kafka08OutputFormat.Builder();
builder.setKafkaConverter(kafkaConverter)
.setProperties(prop)
.setTopic(topic)
.setRowTypeInfo(toRowTypeInfo(schema.getResultRowType()));
return (Kafka08TableSink) new Kafka08TableSink(builder, schema)
.configure(schema.getColumnNames(), schema.getColumnTypes());
}
@Override
public List<String> supportedProperties() {
List<String> ret = new ArrayList<>();
ret.addAll(Kafka08Options.ESSENTIAL_CONSUMER_KEYS);
ret.addAll(Kafka08Options.ESSENTIAL_PRODUCER_KEYS);
ret.addAll(Kafka08Options.OPTIONAL_CONSUMER_KEYS);
ret.addAll(Kafka08Options.OPTIONAL_PRODUCER_KEYS);
ret.addAll(KafkaOptions.SUPPORTED_KEYS);
return ret;
}
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE, "KAFKA08"); // KAFKA08
context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
return context;
}
@Override
public BatchCompatibleStreamTableSink<Tuple2<Boolean, Row>> createBatchCompatibleTableSink(Map<String, String> properties) {
return createSink(properties);
}
@Override
public BatchTableSource<GenericRow> createBatchTableSource(Map<String, String> properties) {
return createSource(properties);
}
@Override
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
return createSink(properties);
}
@Override
public StreamTableSource<GenericRow> createStreamTableSource(Map<String, String> properties) {
return createSource(properties);
}
}