blob: ddf6391fc1e7cda08b35bea2cc2e0dfc050666a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;
import java.io.Serializable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
public class KafkaSpoutMessageId implements Serializable {
private final TopicPartition topicPart;
private final long offset;
private int numFails = 0;
/**
* false if the record was emitted using a form of collector.emit(...). true
* when skipping null tuples as configured by the user in KafkaSpoutConfig
*/
private boolean nullTuple;
public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
this(consumerRecord, false);
}
public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean nullTuple) {
this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), nullTuple);
}
public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
this(topicPart, offset, false);
}
/**
* Creates a new KafkaSpoutMessageId.
* @param topicPart The topic partition this message belongs to
* @param offset The offset of this message
* @param nullTuple True if this message is being skipped as a null tuple
*/
public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean nullTuple) {
this.topicPart = topicPart;
this.offset = offset;
this.nullTuple = nullTuple;
}
public int partition() {
return topicPart.partition();
}
public String topic() {
return topicPart.topic();
}
public long offset() {
return offset;
}
public int numFails() {
return numFails;
}
public void incrementNumFails() {
++numFails;
}
public TopicPartition getTopicPartition() {
return topicPart;
}
public boolean isNullTuple() {
return nullTuple;
}
public void setNullTuple(boolean nullTuple) {
this.nullTuple = nullTuple;
}
@Override
public String toString() {
return "{"
+ "topic-partition=" + topicPart
+ ", offset=" + offset
+ ", numFails=" + numFails
+ ", nullTuple=" + nullTuple
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o;
if (offset != messageId.offset) {
return false;
}
return topicPart.equals(messageId.topicPart);
}
@Override
public int hashCode() {
int result = topicPart.hashCode();
result = 31 * result + (int) (offset ^ (offset >>> 32));
return result;
}
}