blob: 156c0c90ecab20433e7e3c625b22d893aba22460 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
/**
* {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to extend to convert
* between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
*/
public abstract class BeamKafkaTable extends BaseBeamTable {
private String bootstrapServers;
private List<String> topics;
private List<TopicPartition> topicPartitions;
private Map<String, Object> configUpdates;
protected BeamKafkaTable(Schema beamSchema) {
super(beamSchema);
}
public BeamKafkaTable(Schema beamSchema, String bootstrapServers, List<String> topics) {
super(beamSchema);
this.bootstrapServers = bootstrapServers;
this.topics = topics;
}
public BeamKafkaTable(
Schema beamSchema, List<TopicPartition> topicPartitions, String bootstrapServers) {
super(beamSchema);
this.bootstrapServers = bootstrapServers;
this.topicPartitions = topicPartitions;
}
public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
this.configUpdates = configUpdates;
return this;
}
@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.UNBOUNDED;
}
public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
getPTransformForInput();
public abstract PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>
getPTransformForOutput();
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
KafkaIO.Read<byte[], byte[]> kafkaRead = null;
if (topics != null) {
kafkaRead =
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopics(topics)
.withConsumerConfigUpdates(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else if (topicPartitions != null) {
kafkaRead =
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopicPartitions(topicPartitions)
.withConsumerConfigUpdates(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else {
throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
}
return begin
.apply("read", kafkaRead.withoutMetadata())
.apply("in_format", getPTransformForInput())
.setRowSchema(getSchema());
}
@Override
public POutput buildIOWriter(PCollection<Row> input) {
checkArgument(
topics != null && topics.size() == 1, "Only one topic can be acceptable as output.");
assert topics != null;
return input
.apply("out_reformat", getPTransformForOutput())
.apply(
"persistent",
KafkaIO.<byte[], byte[]>write()
.withBootstrapServers(bootstrapServers)
.withTopic(topics.get(0))
.withKeySerializer(ByteArraySerializer.class)
.withValueSerializer(ByteArraySerializer.class));
}
public String getBootstrapServers() {
return bootstrapServers;
}
public List<String> getTopics() {
return topics;
}
}