| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.pulsar.io.kafka; |
| |
| import java.time.Duration; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.concurrent.CompletableFuture; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.kafka.clients.CommonClientConfigs; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.common.config.SaslConfigs; |
| import org.apache.kafka.common.config.SslConfigs; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.common.schema.KeyValue; |
| import org.apache.pulsar.common.schema.KeyValueEncodingType; |
| import org.apache.pulsar.functions.api.KVRecord; |
| import org.apache.pulsar.functions.api.Record; |
| import org.apache.pulsar.io.core.SourceContext; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Simple Kafka Source to transfer messages from a Kafka topic. |
| */ |
| public abstract class KafkaAbstractSource<V> extends KafkaPushSource<V> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class); |
| |
| private volatile Consumer<Object, Object> consumer; |
| private volatile boolean running = false; |
| private KafkaSourceConfig kafkaSourceConfig; |
| private Thread runnerThread; |
| |
| @Override |
| public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { |
| kafkaSourceConfig = KafkaSourceConfig.load(config); |
| Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set"); |
| Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set"); |
| Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer group id is not set"); |
| if (kafkaSourceConfig.getFetchMinBytes() <= 0) { |
| throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : " |
| + kafkaSourceConfig.getFetchMinBytes()); |
| } |
| if (kafkaSourceConfig.isAutoCommitEnabled() && kafkaSourceConfig.getAutoCommitIntervalMs() <= 0) { |
| throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs : " |
| + kafkaSourceConfig.getAutoCommitIntervalMs()); |
| } |
| if (kafkaSourceConfig.getSessionTimeoutMs() <= 0) { |
| throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs : " |
| + kafkaSourceConfig.getSessionTimeoutMs()); |
| } |
| if (kafkaSourceConfig.getHeartbeatIntervalMs() <= 0) { |
| throw new IllegalArgumentException("Invalid Kafka Consumer heartbeatIntervalMs : " |
| + kafkaSourceConfig.getHeartbeatIntervalMs()); |
| } |
| |
| Properties props = new Properties(); |
| if (kafkaSourceConfig.getConsumerConfigProperties() != null) { |
| props.putAll(kafkaSourceConfig.getConsumerConfigProperties()); |
| } |
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers()); |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSecurityProtocol())) { |
| props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSourceConfig.getSecurityProtocol()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSaslMechanism())) { |
| props.put(SaslConfigs.SASL_MECHANISM, kafkaSourceConfig.getSaslMechanism()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSaslJaasConfig())) { |
| props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSourceConfig.getSaslJaasConfig()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSslEnabledProtocols())) { |
| props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, kafkaSourceConfig.getSslEnabledProtocols()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSslEndpointIdentificationAlgorithm())) { |
| props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, |
| kafkaSourceConfig.getSslEndpointIdentificationAlgorithm()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSslTruststoreLocation())) { |
| props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSourceConfig.getSslTruststoreLocation()); |
| } |
| if (StringUtils.isNotEmpty(kafkaSourceConfig.getSslTruststorePassword())) { |
| props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSourceConfig.getSslTruststorePassword()); |
| } |
| props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId()); |
| props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(kafkaSourceConfig.getFetchMinBytes())); |
| props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, |
| String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs())); |
| props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(kafkaSourceConfig.getSessionTimeoutMs())); |
| props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, |
| String.valueOf(kafkaSourceConfig.getHeartbeatIntervalMs())); |
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset()); |
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass()); |
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass()); |
| try { |
| consumer = new KafkaConsumer<>(beforeCreateConsumer(props)); |
| } catch (Exception ex) { |
| throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex); |
| } |
| this.start(); |
| } |
| |
| protected Properties beforeCreateConsumer(Properties props) { |
| return props; |
| } |
| |
| @Override |
| public void close() throws InterruptedException { |
| LOG.info("Stopping kafka source"); |
| running = false; |
| if (runnerThread != null) { |
| runnerThread.interrupt(); |
| runnerThread.join(); |
| runnerThread = null; |
| } |
| if (consumer != null) { |
| consumer.close(); |
| consumer = null; |
| } |
| LOG.info("Kafka source stopped."); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void start() { |
| LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); |
| consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); |
| runnerThread = new Thread(() -> { |
| LOG.info("Kafka source started."); |
| while (running) { |
| try { |
| ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L)); |
| CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()]; |
| int index = 0; |
| for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) { |
| KafkaRecord record = buildRecord(consumerRecord); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); |
| } |
| consume(record); |
| futures[index] = record.getCompletableFuture(); |
| index++; |
| } |
| if (!kafkaSourceConfig.isAutoCommitEnabled()) { |
| CompletableFuture.allOf(futures).get(); |
| consumer.commitSync(); |
| } |
| } catch (Exception e) { |
| LOG.error("Error while processing records", e); |
| notifyError(e); |
| break; |
| } |
| } |
| }); |
| running = true; |
| runnerThread.setName("Kafka Source Thread"); |
| runnerThread.start(); |
| } |
| |
| public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord); |
| |
| @Slf4j |
| protected static class KafkaRecord<V> implements Record<V> { |
| private final ConsumerRecord<String, ?> record; |
| private final V value; |
| private final Schema<V> schema; |
| |
| @Getter |
| private final CompletableFuture<Void> completableFuture = new CompletableFuture<>(); |
| |
| public KafkaRecord(ConsumerRecord<String, ?> record, V value, Schema<V> schema) { |
| this.record = record; |
| this.value = value; |
| this.schema = schema; |
| } |
| @Override |
| public Optional<String> getPartitionId() { |
| return Optional.of(Integer.toString(record.partition())); |
| } |
| |
| @Override |
| public Optional<Integer> getPartitionIndex() { |
| return Optional.of(record.partition()); |
| } |
| |
| @Override |
| public Optional<Long> getRecordSequence() { |
| return Optional.of(record.offset()); |
| } |
| |
| @Override |
| public Optional<String> getKey() { |
| return Optional.ofNullable(record.key()); |
| } |
| |
| @Override |
| public V getValue() { |
| return value; |
| } |
| |
| @Override |
| public void ack() { |
| completableFuture.complete(null); |
| } |
| |
| @Override |
| public Schema<V> getSchema() { |
| return schema; |
| } |
| } |
| protected static class KeyValueKafkaRecord<V> extends KafkaRecord implements KVRecord<Object, Object> { |
| |
| private final Schema<Object> keySchema; |
| private final Schema<Object> valueSchema; |
| |
| public KeyValueKafkaRecord(ConsumerRecord record, KeyValue value, |
| Schema<Object> keySchema, Schema<Object> valueSchema) { |
| super(record, value, null); |
| this.keySchema = keySchema; |
| this.valueSchema = valueSchema; |
| } |
| |
| @Override |
| public Schema<Object> getKeySchema() { |
| return keySchema; |
| } |
| |
| @Override |
| public Schema<Object> getValueSchema() { |
| return valueSchema; |
| } |
| |
| @Override |
| public KeyValueEncodingType getKeyValueEncodingType() { |
| return KeyValueEncodingType.SEPARATED; |
| } |
| } |
| } |