blob: 99674044835a478c0f4096c5e6ba0da9ab5ed4ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.util.BitSet;
import org.apache.nifi.flowfile.FlowFile;
/**
* Context object that serves as a bridge between the content of a FlowFile and
* Kafka message(s). It contains all necessary information to allow
* {@link KafkaPublisher} to determine how a each content of the
* {@link FlowFile} must be sent to Kafka.
*/
final class SplittableMessageContext {
private final String topicName;
private final String delimiterPattern;
private final byte[] keyBytes;
private volatile BitSet failedSegments;
/**
* @param topicName
* the name of the Kafka topic
* @param keyBytes
* the instance of byte[] representing the key. Can be null.
* @param delimiterPattern
* the string representing the delimiter regex pattern. Can be
* null. For cases where it is null the EOF pattern will be used
* - "(\\W)\\Z".
*/
SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) {
this.topicName = topicName;
this.keyBytes = keyBytes;
this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z";
}
/**
*
*/
@Override
public String toString() {
return "topic: '" + topicName + "'; delimiter: '" + delimiterPattern + "'";
}
/**
*
*/
void setFailedSegments(int... failedSegments) {
this.failedSegments = new BitSet();
for (int failedSegment : failedSegments) {
this.failedSegments.set(failedSegment);
}
}
/**
*
*/
void setFailedSegmentsAsByteArray(byte[] failedSegments) {
this.failedSegments = BitSet.valueOf(failedSegments);
}
/**
* Returns the list of integers representing the segments (chunks) of the
* delimited content stream that had failed to be sent to Kafka topic.
*/
BitSet getFailedSegments() {
return this.failedSegments;
}
/**
* Returns the name of the Kafka topic
*/
String getTopicName() {
return this.topicName;
}
/**
* Returns the value of the delimiter regex pattern.
*/
String getDelimiterPattern() {
return this.delimiterPattern;
}
/**
* Returns the key bytes as String
*/
String getKeyBytesAsString() {
return new String(this.keyBytes);
}
/**
* Returns the key bytes
*/
byte[] getKeyBytes() {
return this.keyBytes;
}
}