| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.kafka.pubsub; |
| |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.components.ConfigVerificationResult; |
| import org.apache.nifi.components.ConfigVerificationResult.Outcome; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.serialization.RecordReader; |
| import org.apache.nifi.serialization.RecordReaderFactory; |
| import org.apache.nifi.serialization.RecordSetWriterFactory; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.Closeable; |
| import java.io.InputStream; |
| import java.nio.charset.Charset; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| /** |
| * A pool of Kafka Consumers for a given topic. Consumers can be obtained by |
| * calling 'obtainConsumer'. Once closed the pool is ready to be immediately |
| * used again. |
| */ |
| public class ConsumerPool implements Closeable { |
| |
| private final BlockingQueue<SimpleConsumerLease> pooledLeases; |
| private final List<String> topics; |
| private final Pattern topicPattern; |
| private final Map<String, Object> kafkaProperties; |
| private final long maxWaitMillis; |
| private final ComponentLog logger; |
| private final byte[] demarcatorBytes; |
| private final String keyEncoding; |
| private final String securityProtocol; |
| private final String bootstrapServers; |
| private final boolean honorTransactions; |
| private final RecordReaderFactory readerFactory; |
| private final RecordSetWriterFactory writerFactory; |
| private final Charset headerCharacterSet; |
| private final Pattern headerNamePattern; |
| private final boolean separateByKey; |
| private final int[] partitionsToConsume; |
| private final AtomicLong consumerCreatedCountRef = new AtomicLong(); |
| private final AtomicLong consumerClosedCountRef = new AtomicLong(); |
| private final AtomicLong leasesObtainedCountRef = new AtomicLong(); |
| private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<>(); |
| |
| /** |
| * Creates a pool of KafkaConsumer objects that will grow up to the maximum |
| * indicated threads from the given context. Consumers are lazily |
| * initialized. We may elect to not create up to the maximum number of |
| * configured consumers if the broker reported lag time for all topics is |
| * below a certain threshold. |
| * |
| * @param maxConcurrentLeases max allowable consumers at once |
| * @param demarcator bytes to use as demarcator between messages; null or |
| * empty means no demarcator |
| * @param kafkaProperties properties to use to initialize kafka consumers |
| * @param topics the topics to subscribe to |
| * @param maxWaitMillis maximum time to wait for a given lease to acquire |
| * data before committing |
| * @param keyEncoding the encoding to use for the key of a kafka message if |
| * found |
| * @param securityProtocol the security protocol used |
| * @param bootstrapServers the bootstrap servers |
| * @param logger the logger to report any errors/warnings |
| */ |
| public ConsumerPool( |
| final int maxConcurrentLeases, |
| final byte[] demarcator, |
| final boolean separateByKey, |
| final Map<String, Object> kafkaProperties, |
| final List<String> topics, |
| final long maxWaitMillis, |
| final String keyEncoding, |
| final String securityProtocol, |
| final String bootstrapServers, |
| final ComponentLog logger, |
| final boolean honorTransactions, |
| final Charset headerCharacterSet, |
| final Pattern headerNamePattern, |
| final int[] partitionsToConsume) { |
| this.pooledLeases = new LinkedBlockingQueue<>(); |
| this.maxWaitMillis = maxWaitMillis; |
| this.logger = logger; |
| this.demarcatorBytes = demarcator; |
| this.keyEncoding = keyEncoding; |
| this.securityProtocol = securityProtocol; |
| this.bootstrapServers = bootstrapServers; |
| this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); |
| this.topics = Collections.unmodifiableList(topics); |
| this.topicPattern = null; |
| this.readerFactory = null; |
| this.writerFactory = null; |
| this.honorTransactions = honorTransactions; |
| this.headerCharacterSet = headerCharacterSet; |
| this.headerNamePattern = headerNamePattern; |
| this.separateByKey = separateByKey; |
| this.partitionsToConsume = partitionsToConsume; |
| enqueueAssignedPartitions(partitionsToConsume); |
| } |
| |
| public ConsumerPool( |
| final int maxConcurrentLeases, |
| final byte[] demarcator, |
| final boolean separateByKey, |
| final Map<String, Object> kafkaProperties, |
| final Pattern topics, |
| final long maxWaitMillis, |
| final String keyEncoding, |
| final String securityProtocol, |
| final String bootstrapServers, |
| final ComponentLog logger, |
| final boolean honorTransactions, |
| final Charset headerCharacterSet, |
| final Pattern headerNamePattern, |
| final int[] partitionsToConsume) { |
| this.pooledLeases = new LinkedBlockingQueue<>(); |
| this.maxWaitMillis = maxWaitMillis; |
| this.logger = logger; |
| this.demarcatorBytes = demarcator; |
| this.keyEncoding = keyEncoding; |
| this.securityProtocol = securityProtocol; |
| this.bootstrapServers = bootstrapServers; |
| this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); |
| this.topics = null; |
| this.topicPattern = topics; |
| this.readerFactory = null; |
| this.writerFactory = null; |
| this.honorTransactions = honorTransactions; |
| this.headerCharacterSet = headerCharacterSet; |
| this.headerNamePattern = headerNamePattern; |
| this.separateByKey = separateByKey; |
| this.partitionsToConsume = partitionsToConsume; |
| enqueueAssignedPartitions(partitionsToConsume); |
| } |
| |
| public ConsumerPool( |
| final int maxConcurrentLeases, |
| final RecordReaderFactory readerFactory, |
| final RecordSetWriterFactory writerFactory, |
| final Map<String, Object> kafkaProperties, |
| final Pattern topics, |
| final long maxWaitMillis, |
| final String securityProtocol, |
| final String bootstrapServers, |
| final ComponentLog logger, |
| final boolean honorTransactions, |
| final Charset headerCharacterSet, |
| final Pattern headerNamePattern, |
| final boolean separateByKey, |
| final String keyEncoding, |
| final int[] partitionsToConsume) { |
| this.pooledLeases = new LinkedBlockingQueue<>(); |
| this.maxWaitMillis = maxWaitMillis; |
| this.logger = logger; |
| this.demarcatorBytes = null; |
| this.readerFactory = readerFactory; |
| this.writerFactory = writerFactory; |
| this.securityProtocol = securityProtocol; |
| this.bootstrapServers = bootstrapServers; |
| this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); |
| this.topics = null; |
| this.topicPattern = topics; |
| this.honorTransactions = honorTransactions; |
| this.headerCharacterSet = headerCharacterSet; |
| this.headerNamePattern = headerNamePattern; |
| this.separateByKey = separateByKey; |
| this.keyEncoding = keyEncoding; |
| this.partitionsToConsume = partitionsToConsume; |
| enqueueAssignedPartitions(partitionsToConsume); |
| } |
| |
| public ConsumerPool( |
| final int maxConcurrentLeases, |
| final RecordReaderFactory readerFactory, |
| final RecordSetWriterFactory writerFactory, |
| final Map<String, Object> kafkaProperties, |
| final List<String> topics, |
| final long maxWaitMillis, |
| final String securityProtocol, |
| final String bootstrapServers, |
| final ComponentLog logger, |
| final boolean honorTransactions, |
| final Charset headerCharacterSet, |
| final Pattern headerNamePattern, |
| final boolean separateByKey, |
| final String keyEncoding, |
| final int[] partitionsToConsume) { |
| this.pooledLeases = new LinkedBlockingQueue<>(); |
| this.maxWaitMillis = maxWaitMillis; |
| this.logger = logger; |
| this.demarcatorBytes = null; |
| this.readerFactory = readerFactory; |
| this.writerFactory = writerFactory; |
| this.securityProtocol = securityProtocol; |
| this.bootstrapServers = bootstrapServers; |
| this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); |
| this.topics = topics; |
| this.topicPattern = null; |
| this.honorTransactions = honorTransactions; |
| this.headerCharacterSet = headerCharacterSet; |
| this.headerNamePattern = headerNamePattern; |
| this.separateByKey = separateByKey; |
| this.keyEncoding = keyEncoding; |
| this.partitionsToConsume = partitionsToConsume; |
| enqueueAssignedPartitions(partitionsToConsume); |
| } |
| |
| public int getPartitionCount() { |
| // If using regex for topic names, just return -1 |
| if (topics == null || topics.isEmpty()) { |
| return -1; |
| } |
| |
| int partitionsEachTopic = 0; |
| try (final Consumer<byte[], byte[]> consumer = createKafkaConsumer()) { |
| for (final String topicName : topics) { |
| final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName); |
| final int partitionsThisTopic = partitionInfos.size(); |
| if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) { |
| throw new IllegalStateException("The specific topic names do not have the same number of partitions"); |
| } |
| |
| partitionsEachTopic = partitionsThisTopic; |
| } |
| } |
| |
| return partitionsEachTopic; |
| } |
| |
| public List<ConfigVerificationResult> verifyConfiguration() { |
| final List<ConfigVerificationResult> verificationResults = new ArrayList<>(); |
| |
| // Get a SimpleConsumerLease that we can use to communicate with Kafka |
| SimpleConsumerLease lease = pooledLeases.poll(); |
| if (lease == null) { |
| lease = createConsumerLease(); |
| if (lease == null) { |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Attempt connection") |
| .outcome(Outcome.FAILED) |
| .explanation("Could not obtain a Lease") |
| .build()); |
| |
| return verificationResults; |
| } |
| } |
| |
| try { |
| final Consumer<byte[], byte[]> consumer = lease.consumer; |
| try { |
| consumer.groupMetadata(); |
| } catch (final Exception e) { |
| logger.error("Failed to fetch Consumer Group Metadata in order to verify processor configuration", e); |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Attempt connection") |
| .outcome(Outcome.FAILED) |
| .explanation("Could not fetch Consumer Group Metadata: " + e) |
| .build()); |
| } |
| |
| try { |
| if (topicPattern == null) { |
| final Map<String, Long> messagesToConsumePerTopic = new HashMap<>(); |
| |
| for (final String topicName : topics) { |
| final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName); |
| |
| final Set<TopicPartition> topicPartitions = partitionInfos.stream() |
| .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) |
| .collect(Collectors.toSet()); |
| |
| final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions, Duration.ofSeconds(30)); |
| final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions, Duration.ofSeconds(30)); |
| final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions, Duration.ofSeconds(30)); |
| |
| for (final TopicPartition topicPartition : endOffsets.keySet()) { |
| long endOffset = endOffsets.get(topicPartition); |
| // When no messages have been added to a topic, end offset is 0. However, after the first message is added, |
| // the end offset points to where the next message will be. I.e., it goes from 0 to 2. We want the offset |
| // of the last message, not the offset of where the next one will be. So we subtract one. |
| if (endOffset > 0) { |
| endOffset--; |
| } |
| |
| final long beginningOffset = beginningOffsets.getOrDefault(topicPartition, 0L); |
| if (endOffset <= beginningOffset) { |
| messagesToConsumePerTopic.merge(topicPartition.topic(), 0L, Long::sum); |
| continue; |
| } |
| |
| final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); |
| final long committedOffset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset(); |
| |
| final long currentOffset = Math.max(beginningOffset, committedOffset); |
| final long messagesToConsume = endOffset - currentOffset; |
| |
| messagesToConsumePerTopic.merge(topicPartition.topic(), messagesToConsume, Long::sum); |
| } |
| } |
| |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Check Offsets") |
| .outcome(Outcome.SUCCESSFUL) |
| .explanation("Successfully determined offsets for " + messagesToConsumePerTopic.size() + " topics. Number of messages left to consume per topic: " + messagesToConsumePerTopic) |
| .build()); |
| |
| logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", messagesToConsumePerTopic.size(), messagesToConsumePerTopic); |
| } else { |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Determine Topic Offsets") |
| .outcome(Outcome.SKIPPED) |
| .explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name") |
| .build()); |
| } |
| } catch (final Exception e) { |
| logger.error("Failed to determine Topic Offsets in order to verify configuration", e); |
| |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Determine Topic Offsets") |
| .outcome(Outcome.FAILED) |
| .explanation("Could not fetch Topic Offsets: " + e) |
| .build()); |
| } |
| |
| if (readerFactory != null) { |
| final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease); |
| verificationResults.add(checkDataResult); |
| } |
| |
| return verificationResults; |
| } finally { |
| lease.close(true); |
| } |
| } |
| |
| private ConfigVerificationResult checkRecordIsParsable(final SimpleConsumerLease consumerLease) { |
| final ConsumerRecords<byte[], byte[]> consumerRecords = consumerLease.consumer.poll(Duration.ofSeconds(30)); |
| |
| final Map<String, Integer> parseFailuresPerTopic = new HashMap<>(); |
| final Map<String, String> latestParseFailureDescription = new HashMap<>(); |
| final Map<String, Integer> recordsPerTopic = new HashMap<>(); |
| |
| for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) { |
| recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum); |
| final Map<String, String> attributes = consumerLease.getAttributes(consumerRecord); |
| |
| final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value(); |
| try (final InputStream in = new ByteArrayInputStream(recordBytes)) { |
| final RecordReader reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); |
| while (reader.nextRecord() != null) { |
| } |
| } catch (final Exception e) { |
| parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum); |
| latestParseFailureDescription.put(consumerRecord.topic(), e.toString()); |
| } |
| } |
| |
| // Note here that we do not commit the offsets. We will just let the consumer close without committing the offsets, which |
| // will roll back the consumption of the messages. |
| if (recordsPerTopic.isEmpty()) { |
| return new ConfigVerificationResult.Builder() |
| .verificationStepName("Parse Records") |
| .outcome(Outcome.SKIPPED) |
| .explanation("Received no messages to attempt parsing within the 30 second timeout") |
| .build(); |
| } |
| |
| if (parseFailuresPerTopic.isEmpty()) { |
| return new ConfigVerificationResult.Builder() |
| .verificationStepName("Parse Records") |
| .outcome(Outcome.SUCCESSFUL) |
| .explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + recordsPerTopic) |
| .build(); |
| } else { |
| final Map<String, String> failureDescriptions = new HashMap<>(); |
| for (final String topic : recordsPerTopic.keySet()) { |
| final int records = recordsPerTopic.get(topic); |
| final Integer failures = parseFailuresPerTopic.get(topic); |
| final String failureReason = latestParseFailureDescription.get(topic); |
| final String description = "Failed to parse " + failures + " out of " + records + " records. Sample failure reason: " + failureReason; |
| failureDescriptions.put(topic, description); |
| } |
| |
| return new ConfigVerificationResult.Builder() |
| .verificationStepName("Parse Records") |
| .outcome(Outcome.FAILED) |
| .explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions) |
| .build(); |
| } |
| } |
| |
| /** |
| * Obtains a consumer from the pool if one is available or lazily |
| * initializes a new one if deemed necessary. |
| * |
| * @param session the session for which the consumer lease will be |
| * associated |
| * @param processContext the ProcessContext for which the consumer |
| * lease will be associated |
| * @return consumer to use or null if not available or necessary |
| */ |
| public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) { |
| // If there are any partition assignments that do not have leases in our pool, create the leases and add them to the pool. |
| // This is not necessary for us to handle if using automatic subscriptions because the Kafka protocol will ensure that each consumer |
| // has the appropriate partitions. However, if we are using explicit assignment, it's important to create these leases and add them |
| // to our pool in order to avoid starvation. E.g., if we have only a single concurrent task and 5 partitions assigned, we cannot simply |
| // wait until pooledLeases.poll() returns null to create a new ConsumerLease, as doing so may result in constantly pulling from only a |
| // single partition (since we'd get a Lease for Partition 1, then use it, and put it back in the pool). |
| recreateAssignedConsumers(); |
| |
| SimpleConsumerLease lease = pooledLeases.poll(); |
| if (lease == null) { |
| lease = createConsumerLease(); |
| if (lease == null) { |
| return null; |
| } |
| } |
| |
| lease.setProcessSession(session, processContext); |
| |
| leasesObtainedCountRef.incrementAndGet(); |
| return lease; |
| } |
| |
| private void recreateAssignedConsumers() { |
| List<TopicPartition> topicPartitions; |
| while ((topicPartitions = availableTopicPartitions.poll()) != null) { |
| final SimpleConsumerLease simpleConsumerLease = createConsumerLease(topicPartitions); |
| pooledLeases.add(simpleConsumerLease); |
| } |
| } |
| |
| private SimpleConsumerLease createConsumerLease() { |
| if (partitionsToConsume != null) { |
| logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease."); |
| return null; |
| } |
| |
| final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); |
| consumerCreatedCountRef.incrementAndGet(); |
| |
| /* |
| * For now return a new consumer lease. But we could later elect to |
| * have this return null if we determine the broker indicates that |
| * the lag time on all topics being monitored is sufficiently low. |
| * For now we should encourage conservative use of threads because |
| * having too many means we'll have at best useless threads sitting |
| * around doing frequent network calls and at worst having consumers |
| * sitting idle which could prompt excessive rebalances. |
| */ |
| final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null); |
| |
| // This subscription tightly couples the lease to the given |
| // consumer. They cannot be separated from then on. |
| if (topics == null) { |
| consumer.subscribe(topicPattern, lease); |
| } else { |
| consumer.subscribe(topics, lease); |
| } |
| |
| return lease; |
| } |
| |
| private SimpleConsumerLease createConsumerLease(final List<TopicPartition> topicPartitions) { |
| final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); |
| consumerCreatedCountRef.incrementAndGet(); |
| consumer.assign(topicPartitions); |
| |
| final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions); |
| return lease; |
| } |
| |
| private void enqueueAssignedPartitions(final int[] partitionsToConsume) { |
| if (partitionsToConsume == null) { |
| return; |
| } |
| |
| for (final int partition : partitionsToConsume) { |
| final List<TopicPartition> topicPartitions = createTopicPartitions(partition); |
| availableTopicPartitions.offer(topicPartitions); |
| } |
| } |
| |
| private List<TopicPartition> createTopicPartitions(final int partition) { |
| final List<TopicPartition> topicPartitions = new ArrayList<>(); |
| for (final String topic : topics) { |
| final TopicPartition topicPartition = new TopicPartition(topic, partition); |
| topicPartitions.add(topicPartition); |
| } |
| |
| return topicPartitions; |
| } |
| |
| /** |
| * Exposed as protected method for easier unit testing |
| * |
| * @return consumer |
| * @throws KafkaException if unable to subscribe to the given topics |
| */ |
| protected Consumer<byte[], byte[]> createKafkaConsumer() { |
| final Map<String, Object> properties = new HashMap<>(kafkaProperties); |
| if (honorTransactions) { |
| properties.put("isolation.level", "read_committed"); |
| } else { |
| properties.put("isolation.level", "read_uncommitted"); |
| } |
| final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties); |
| return consumer; |
| } |
| |
| /** |
| * Closes all consumers in the pool. Can be safely called repeatedly. |
| */ |
| @Override |
| public void close() { |
| final List<SimpleConsumerLease> leases = new ArrayList<>(); |
| pooledLeases.drainTo(leases); |
| leases.stream().forEach((lease) -> { |
| lease.close(true); |
| }); |
| } |
| |
| private void closeConsumer(final Consumer<?, ?> consumer) { |
| consumerClosedCountRef.incrementAndGet(); |
| try { |
| consumer.unsubscribe(); |
| } catch (Exception e) { |
| logger.warn("Failed while unsubscribing " + consumer, e); |
| } |
| |
| try { |
| consumer.close(); |
| } catch (Exception e) { |
| logger.warn("Failed while closing " + consumer, e); |
| } |
| } |
| |
| PoolStats getPoolStats() { |
| return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get()); |
| } |
| |
| private class SimpleConsumerLease extends ConsumerLease { |
| private final Consumer<byte[], byte[]> consumer; |
| private final List<TopicPartition> assignedPartitions; |
| private volatile ProcessSession session; |
| private volatile ProcessContext processContext; |
| private volatile boolean closedConsumer; |
| |
| private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) { |
| super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, |
| readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey); |
| this.consumer = consumer; |
| this.assignedPartitions = assignedPartitions; |
| } |
| |
| void setProcessSession(final ProcessSession session, final ProcessContext context) { |
| this.session = session; |
| this.processContext = context; |
| } |
| |
| @Override |
| public List<TopicPartition> getAssignedPartitions() { |
| return assignedPartitions; |
| } |
| |
| @Override |
| public void yield() { |
| if (processContext != null) { |
| processContext.yield(); |
| } |
| } |
| |
| @Override |
| public ProcessSession getProcessSession() { |
| return session; |
| } |
| |
| @Override |
| public void close() { |
| super.close(); |
| close(false); |
| } |
| |
| public void close(final boolean forceClose) { |
| if (closedConsumer) { |
| return; |
| } |
| |
| super.close(); |
| if (session != null) { |
| session.rollback(); |
| setProcessSession(null, null); |
| } |
| |
| if (forceClose || isPoisoned() || !pooledLeases.offer(this)) { |
| closedConsumer = true; |
| closeConsumer(consumer); |
| |
| // If explicit topic/partition assignment is used, make the assignments for this Lease available again. |
| if (assignedPartitions != null) { |
| logger.debug("Adding partitions {} back to the pool", assignedPartitions); |
| availableTopicPartitions.offer(assignedPartitions); |
| } |
| } |
| } |
| } |
| |
| |
| static final class PoolStats { |
| |
| final long consumerCreatedCount; |
| final long consumerClosedCount; |
| final long leasesObtainedCount; |
| |
| PoolStats( |
| final long consumerCreatedCount, |
| final long consumerClosedCount, |
| final long leasesObtainedCount |
| ) { |
| this.consumerCreatedCount = consumerCreatedCount; |
| this.consumerClosedCount = consumerClosedCount; |
| this.leasesObtainedCount = leasesObtainedCount; |
| } |
| |
| @Override |
| public String toString() { |
| return "Created Consumers [" + consumerCreatedCount + "]\n" |
| + "Closed Consumers [" + consumerClosedCount + "]\n" |
| + "Leases Obtained [" + leasesObtainedCount + "]\n"; |
| } |
| |
| } |
| |
| } |