blob: bdb9b3df45d03d7c291cf99647e88691405c0a5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
* for the clean model of a try w/resources whereby non-exceptional cases mean
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
private final long maxWaitMillis;
private final Consumer<byte[], byte[]> kafkaConsumer;
private final ComponentLog logger;
private final byte[] demarcatorBytes;
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private final RecordSetWriterFactory writerFactory;
private final RecordReaderFactory readerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
private final boolean separateByKey;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
private long leaseStartNanos = -1;
private boolean lastPollEmpty = false;
private int totalMessages = 0;
ConsumerLease(
final long maxWaitMillis,
final Consumer<byte[], byte[]> kafkaConsumer,
final byte[] demarcatorBytes,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final RecordReaderFactory readerFactory,
final RecordSetWriterFactory writerFactory,
final ComponentLog logger,
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.logger = logger;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
}
/**
* clears out internal state elements excluding session and consumer as
* those are managed by the pool itself
*/
private void resetInternalState() {
bundleMap.clear();
uncommittedOffsetsMap.clear();
leaseStartNanos = -1;
lastPollEmpty = false;
totalMessages = 0;
}
/**
* Kafka will call this method whenever it is about to rebalance the
* consumers for the given partitions. We'll simply take this to mean that
* we need to quickly commit what we've got and will return the consumer to
* the pool. This method will be called during the poll() method call of
* this class and will be called by the same thread calling poll according
* to the Kafka API docs. After this method executes the session and kafka
* offsets are committed and this lease is closed.
*
* @param partitions partitions being reassigned
*/
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
//force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
commit();
}
/**
* This will be called by Kafka when the rebalance has completed. We don't
* need to do anything with this information other than optionally log it as
* by this point we've committed what we've got and moved on.
*
* @param partitions topic partition set being reassigned
*/
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
}
public List<TopicPartition> getAssignedPartitions() {
return null;
}
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
}
}
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
* pairs to the specified offsets w/the given metadata. This can offer
* higher performance than the other commitOffsets call as it allows the
* kafka client to collect more data from Kafka before committing the
* offsets.
*
* if false then we didn't do anything and should probably yield if true
* then we committed new data
*
*/
boolean commit() {
if (uncommittedOffsetsMap.isEmpty()) {
resetInternalState();
return false;
}
try {
/**
* Committing the nifi session then the offsets means we have an at
* least once guarantee here. If we reversed the order we'd have at
* most once.
*/
final Collection<FlowFile> bundledFlowFiles = getBundles();
if (!bundledFlowFiles.isEmpty()) {
getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
}
getProcessSession().commitAsync(() -> {
final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap;
kafkaConsumer.commitSync(offsetsMap);
resetInternalState();
});
return true;
} catch (final IOException ioe) {
poison();
logger.error("Failed to finish writing out FlowFile bundle", ioe);
throw new ProcessException(ioe);
} catch (final KafkaException kex) {
poison();
logger.warn("Duplicates are likely as we were able to commit the process"
+ " session but received an exception from Kafka while committing"
+ " offsets.");
throw kex;
} catch (final Throwable t) {
poison();
throw t;
}
}
/**
* Indicates whether we should continue polling for data. If we are not
* writing data with a demarcator then we're writing individual flow files
* per kafka message therefore we must be very mindful of memory usage for
* the flow file objects (not their content) being held in memory. The
* content of kafka messages will be written to the content repository
* immediately upon each poll call but we must still be mindful of how much
* memory can be used in each poll call. We will indicate that we should
* stop polling our last poll call produced no new results or if we've
* polling and processing data longer than the specified maximum polling
* time or if we have reached out specified max flow file limit or if a
* rebalance has been initiated for one of the partitions we're watching;
* otherwise true.
*
* @return true if should keep polling; false otherwise
*/
boolean continuePolling() {
//stop if the last poll produced new no data
if (lastPollEmpty) {
return false;
}
//stop if we've gone past our desired max uncommitted wait time
if (leaseStartNanos < 0) {
leaseStartNanos = System.nanoTime();
}
final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
if (durationMillis > maxWaitMillis) {
return false;
}
//stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
return false;
} else {
return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
}
}
/**
* Indicates that the underlying session and consumer should be immediately
* considered invalid. Once closed the session will be rolled back and the
* pool should destroy the underlying consumer. This is useful if due to
* external reasons, such as the processor no longer being scheduled, this
* lease should be terminated immediately.
*/
private void poison() {
poisoned = true;
}
/**
* @return true if this lease has been poisoned; false otherwise
*/
boolean isPoisoned() {
return poisoned;
}
/**
* Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
*/
public void wakeup() {
kafkaConsumer.wakeup();
}
/**
* Abstract method that is intended to be extended by the pool that created
* this ConsumerLease object. It should ensure that the session given to
* create this session is rolled back and that the underlying kafka consumer
* is either returned to the pool for continued use or destroyed if this
* lease has been poisoned. It can only be called once. Calling it more than
* once can result in undefined and non threadsafe behavior.
*/
@Override
public void close() {
resetInternalState();
}
public abstract ProcessSession getProcessSession();
public abstract void yield();
private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
records.partitions().stream().forEach(partition -> {
List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
if (!messages.isEmpty()) {
//update maximum offset map for this topic partition
long maxOffset = messages.stream()
.mapToLong(record -> record.offset())
.max()
.getAsLong();
//write records to content repository and session
if (demarcatorBytes != null) {
writeDemarcatedData(getProcessSession(), messages, partition);
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
}
totalMessages += messages.size();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
}
});
}
private static String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private Collection<FlowFile> getBundles() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
final boolean includeBundle = processBundle(tracker);
if (includeBundle) {
flowFiles.add(tracker.flowFile);
}
}
return flowFiles;
}
private boolean processBundle(final BundleTracker bundle) throws IOException {
final RecordSetWriter writer = bundle.recordWriter;
if (writer != null) {
final WriteResult writeResult;
try {
writeResult = writer.finishRecordSet();
} finally {
writer.close();
}
if (writeResult.getRecordCount() == 0) {
getProcessSession().remove(bundle.flowFile);
return false;
}
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
}
populateAttributes(bundle);
return true;
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> {
out.write(value);
});
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS);
}
private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// Group the Records by their BundleInformation
final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
.collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null)));
for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
final BundleInformation bundleInfo = entry.getKey();
final List<ConsumerRecord<byte[], byte[]>> recordList = entry.getValue();
final boolean demarcateFirstRecord;
BundleTracker tracker = bundleMap.get(bundleInfo);
FlowFile flowFile;
if (tracker == null) {
tracker = new BundleTracker(recordList.get(0), topicPartition, keyEncoding);
flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
tracker.updateFlowFile(flowFile);
demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
} else {
demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
}
flowFile = tracker.flowFile;
tracker.incrementRecordCount(recordList.size());
flowFile = session.append(flowFile, out -> {
boolean useDemarcator = demarcateFirstRecord;
for (final ConsumerRecord<byte[], byte[]> record : recordList) {
if (useDemarcator) {
out.write(demarcatorBytes);
}
final byte[] value = record.value();
if (value != null) {
out.write(record.value());
}
useDemarcator = true;
}
});
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
}
}
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
}
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
FlowFile failureFlowFile = session.create();
final byte[] value = consumerRecord.value();
if (value != null) {
failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
}
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
if (cause == null) {
logger.error(message);
} else {
logger.error(message, cause);
}
session.adjustCounter("Parse Failures", 1, false);
}
protected Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) {
final Map<String, String> attributes = new HashMap<>();
if (headerNamePattern == null) {
return attributes;
}
for (final Header header : consumerRecord.headers()) {
final String attributeName = header.key();
final byte[] attributeValue = header.value();
if (headerNamePattern.matcher(attributeName).matches() && attributeValue != null) {
attributes.put(attributeName, new String(attributeValue, headerCharacterSet));
}
}
return attributes;
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we receive, so we will just create
// a "temporary flowfile" that will be removed in the finally block below and use that to pass to
// the createRecordReader method.
RecordSetWriter writer = null;
try {
for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
final Map<String, String> attributes = getAttributes(consumerRecord);
final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader;
try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);
handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
closeWriter(writer);
return;
} catch (final Exception e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
try {
Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
rollback(topicPartition);
yield();
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1L, false);
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
}
}
} catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
closeWriter(writer);
rollback(topicPartition);
throw new ProcessException(e);
}
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
writer.close();
}
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
}
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
// If we have a kafka key, we will add it as an attribute only if
// the FlowFile contains a single Record, or if the Records have been separated by Key,
// because we then know that even though there are multiple Records, they all have the same key.
if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) {
if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
}
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
// Add a record.count attribute to remain consistent with other record-oriented processors. If not
// reading/writing records, then use "kafka.count" attribute.
if (tracker.recordWriter == null) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
} else {
kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
}
}
final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
tracker.updateFlowFile(newFlowFile);
}
private static class BundleTracker {
final long initialOffset;
final long initialTimestamp;
final int partition;
final String topic;
final String key;
final RecordSetWriter recordWriter;
FlowFile flowFile;
long totalRecords = 0;
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
this(initialRecord, topicPartition, keyEncoding, null);
}
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
}
private void incrementRecordCount(final long count) {
totalRecords += count;
}
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}
}
private static class BundleInformation {
private final TopicPartition topicPartition;
private final RecordSchema schema;
private final Map<String, String> attributes;
private final byte[] messageKey;
public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes, final byte[] messageKey) {
this.topicPartition = topicPartition;
this.schema = schema;
this.attributes = attributes;
this.messageKey = messageKey;
}
@Override
public int hashCode() {
return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey);
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof BundleInformation)) {
return false;
}
final BundleInformation other = (BundleInformation) obj;
return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes)
&& Arrays.equals(this.messageKey, other.messageKey);
}
}
}