blob: ed7febb22457e36f63a7ca99fc6dd5a63865bb76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka.offset.hdfs;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.requests.ListOffsetRequest;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Simple object to represent a collection of Kafka Topic and Partition offset information to make storing
* this information easier.
*/
@JsonDeserialize(builder = Offsets.Builder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Offsets {
private final long offsetsAsOfTime;
private final List<PartitionOffset> offsets;
private Offsets(long asOfTime, List<PartitionOffset> offsets) {
offsetsAsOfTime = asOfTime;
this.offsets = offsets;
}
/**
* Returns the time in milliseconds since epoch that the offset information was retrieved or valid as of.
*
* @return the time in milliseconds since epoch that the offset information was retrieved or valid as of.
*/
@JsonProperty("asOfTime")
public long getAsOfTime() {
return offsetsAsOfTime;
}
/**
* The collection of offset information for specific topics and partitions.
*
* @return collection of offset information for specific topics and partitions.
*/
@JsonProperty("offsets")
public List<PartitionOffset> getOffsets() {
return offsets;
}
@Override
public int hashCode() {
return Objects.hash(offsetsAsOfTime, offsets);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof Offsets) {
Offsets that = (Offsets) obj;
return this.offsetsAsOfTime == that.offsetsAsOfTime
&& this.offsets.equals(that.offsets);
}
return false;
}
/**
* Builder for the {@link Offsets}.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Builder {
private long asOf = -1;
private List<PartitionOffset> offsets = Collections.emptyList();
/**
* Creates a new Builder instance.
*
* @return a new Builder instance.
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Sets the as of time for the collection of offsets.
*
* @param asOfTime the as of time for the offsets.
* @return builder instance
* @throws IllegalArgumentException if the {@code asOfTime} is less than 0.
*/
@JsonProperty("asOfTime")
public Builder setAsOfTime(long asOfTime) {
if (asOfTime < 0) {
throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0.");
}
this.asOf = asOfTime;
return this;
}
/**
* Sets the collection of offsets.
*
* @param offsets the collection of offsets
* @return builder instance
* @throws IllegalArgumentException if the {@code offsets} is {@code null}.
*/
@JsonProperty("offsets")
public Builder setOffsets(List<PartitionOffset> offsets) {
if (offsets == null) {
throw new IllegalArgumentException("The 'offsets' cannot be 'null'.");
}
List<PartitionOffset> sortedOffsets = new LinkedList<>(offsets);
Collections.sort(sortedOffsets);
this.offsets = Collections.unmodifiableList(sortedOffsets);
return this;
}
/**
* Builds an instance.
*
* @return a built instance
* @throws IllegalStateException if the {@link #setAsOfTime(long) asOfTime} is not set or the specified
* {@link #setOffsets(List) offsets} contains duplicate entries for a topic partition.
*/
public Offsets build() {
if (asOf < 0) {
throw new IllegalStateException("The 'asOfTime' cannot be less than 0.");
}
Set<String> uniqueTopicPartitions = new HashSet<>();
for(PartitionOffset partitionOffset : offsets){
uniqueTopicPartitions.add(partitionOffset.getTopic()+partitionOffset.getPartition());
}
if (uniqueTopicPartitions.size() != offsets.size()) {
throw new IllegalStateException("The 'offsets' contains duplicate entries for a topic and partition.");
}
return new Offsets(asOf, offsets);
}
}
/**
* Simple object that represents a specific topic, partition, and its offset value.
*/
@JsonDeserialize(builder = PartitionOffset.Builder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class PartitionOffset implements Comparable<PartitionOffset> {
private final String topic;
private final int partition;
private final long offset;
private PartitionOffset(String topic, int partition, long offset) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
}
/**
* Returns the topic
*
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* Returns the partition
*
* @return the partition
*/
public int getPartition() {
return partition;
}
/**
* Returns the offset
*
* @return the offset
*/
public long getOffset() {
return offset;
}
@Override
public int compareTo(PartitionOffset other) {
int compare = topic.compareTo(other.topic);
if (compare == 0) {
compare = Integer.compare(partition, other.partition);
if (compare == 0) {
return Long.compare(offset, other.offset);
}
}
return compare;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof PartitionOffset) {
PartitionOffset that = (PartitionOffset) obj;
return compareTo(that) == 0;
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(topic, partition, offset);
}
/**
* Builder for {@link PartitionOffset}
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Builder {
private String topic;
private int partition = -1;
private long offset = ListOffsetRequest.EARLIEST_TIMESTAMP;
/**
* Creates a new builder instance.
*
* @return a new builder instance.
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Set the {@code topic} for the partition offset being built
*
* @param topic the topic for the partition offset being built.
* @return builder instance
* @throws IllegalArgumentException if the {@code topic} is {@code null} or empty.
*/
@JsonProperty("topic")
public Builder setTopic(String topic) {
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("The 'topic' cannot be null or empty.");
}
this.topic = topic;
return this;
}
/**
* Set the {@code partition} for the partition offset being built
*
* @param partition the partition for the partition offset being built.
* @return builder instance
* @throws IllegalArgumentException if the {@code partition} is less than 0.
*/
@JsonProperty("partition")
public Builder setPartition(int partition) {
if (partition < 0) {
throw new IllegalArgumentException("The 'partition' cannot be less than 0.");
}
this.partition = partition;
return this;
}
/**
* Set the {@code offset} for the partition offset being built. If the {@code offset} is not
* set then it defaults to {@link ListOffsetRequest#EARLIEST_TIMESTAMP}.
*
* @param offset the topic for the partition offset being built.
* @return builder instance
*/
@JsonProperty("offset")
public Builder setOffset(long offset) {
this.offset = offset;
return this;
}
/**
* Builds a PartitionOffset instance.
*
* @return the built PartitionOffset instance.
* @throws IllegalStateException if the {@code topic} or {@code partition} are never set or configured
* to invalid values.
*/
public PartitionOffset build() {
if (StringUtils.isBlank(topic)) {
throw new IllegalStateException("The 'topic' cannot be null or empty.");
}
if (partition < 0) {
throw new IllegalStateException("The 'partition' cannot be less than 0.");
}
return new PartitionOffset(topic, partition, offset);
}
}
}
}