blob: 13fd81127b8984f87b6c7ce11ff998bc158f8437 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they "
+ "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.6"})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso({ConsumeKafka_2_6.class, PublishKafka_2_6.class, PublishKafkaRecord_2_6.class})
public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements VerifiableProcessor {
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
static final PropertyDescriptor TOPICS = new Builder()
.name("topic")
.displayName("Topic Name(s)")
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new Builder()
.name("topic_type")
.displayName("Topic Name Format")
.description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
.required(true)
.allowableValues(TOPIC_NAME, TOPIC_PATTERN)
.defaultValue(TOPIC_NAME.getValue())
.build();
static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor GROUP_ID = new Builder()
.name("group.id")
.displayName("Group ID")
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new Builder()
.name("auto.offset.reset")
.displayName("Offset Reset")
.description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any "
+ "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
.required(true)
.allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
.defaultValue(OFFSET_LATEST.getValue())
.build();
static final PropertyDescriptor MAX_POLL_RECORDS = new Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
.description("Specifies the maximum number of records Kafka should return in a single poll.")
.required(false)
.defaultValue("10000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new Builder()
.name("max-uncommit-offset-wait")
.displayName("Max Uncommitted Time")
.description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ "than when we're not as there is much less for us to keep track of in memory.")
.required(false)
.defaultValue("1 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor COMMS_TIMEOUT = new Builder()
.name("Communications Timeout")
.displayName("Communications Timeout")
.description("Specifies the timeout that the consumer should use when communicating with the Kafka Broker")
.required(true)
.defaultValue("60 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor HONOR_TRANSACTIONS = new Builder()
.name("honor-transactions")
.displayName("Honor Transactions")
.description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
+ "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of pulling as the messages become available.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
.name("message-header-encoding")
.displayName("Message Header Encoding")
.description("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. "
+ "This property indicates the Character Encoding to use for deserializing the headers.")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.required(false)
.build();
static final PropertyDescriptor HEADER_NAME_REGEX = new Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
.description("A Regular Expression that is matched against all message headers. "
+ "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+ "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+ "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
.name("separate-by-key")
.displayName("Separate By Key")
.description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY +
"'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
.build();
static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message from Kafka cannot be parsed using the configured Record Reader, the contents of the "
+ "message will be routed to this Relationship as its own individual FlowFile.")
.build();
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
private volatile ConsumerPool consumerPool = null;
private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
static {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
descriptors.add(TOPICS);
descriptors.add(TOPIC_TYPE);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(HONOR_TRANSACTIONS);
descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
descriptors.add(KafkaProcessorUtils.SASL_MECHANISM);
descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
descriptors.add(KafkaProcessorUtils.USERNAME);
descriptors.add(KafkaProcessorUtils.PASSWORD);
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(MAX_UNCOMMITTED_TIME);
descriptors.add(COMMS_TIMEOUT);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_PARSE_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(rels);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@OnStopped
public void close() {
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
pool.close();
}
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName)
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
validationResults.add(consumerPartitionsResult);
final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
if (explicitPartitionMapping) {
final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
if (TOPIC_PATTERN.getValue().equals(topicType)) {
validationResults.add(new ValidationResult.Builder()
.subject(TOPIC_TYPE.getDisplayName())
.input(TOPIC_PATTERN.getDisplayName())
.valid(false)
.explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+ "Topic Partitions may be assigned only if explicitly specifying topic names also.")
.build());
}
}
return validationResults;
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
ConsumerPool pool = consumerPool;
if (pool != null) {
return pool;
}
final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
if (explicitAssignment) {
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
// Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
// all of the partitions assigned.
final int partitionCount = consumerPool.getPartitionCount();
if (partitionCount != numAssignedPartitions) {
context.yield();
consumerPool.close();
throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
" topic(s) have " + partitionCount + " partitions");
}
}
this.consumerPool = consumerPool;
return consumerPool;
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Map<String, Object> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafkaRecord_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean();
final int commsTimeoutMillis = context.getProperty(COMMS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, commsTimeoutMillis);
final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final int[] partitionsToConsume;
try {
partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
} catch (final UnknownHostException uhe) {
throw new ProcessException("Could not determine localhost's hostname", uhe);
}
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
}
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
}
}
@OnUnscheduled
public void interruptActiveThreads() {
// There are known issues with the Kafka client library that result in the client code hanging
// indefinitely when unable to communicate with the broker. In order to address this, we will wait
// up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
// thread to wakeup when it is blocked, waiting on a response.
final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
final long start = System.nanoTime();
while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
if (!activeLeases.isEmpty()) {
int count = 0;
for (final ConsumerLease lease : activeLeases) {
getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
lease.wakeup();
count++;
}
getLogger().info("Woke up {} consumers", new Object[] {count});
}
activeLeases.clear();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
try (final ConsumerLease lease = pool.obtainConsumer(session, context)) {
if (lease == null) {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
try (final ConsumerPool consumerPool = createConsumerPool(context, verificationLogger)) {
return consumerPool.verifyConfiguration();
}
}
}