| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.common.requests; |
| |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.UnsupportedVersionException; |
| import org.apache.kafka.common.protocol.ApiKeys; |
| import org.apache.kafka.common.protocol.Errors; |
| import org.apache.kafka.common.protocol.types.Struct; |
| import org.apache.kafka.common.utils.CollectionUtils; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| public class ListOffsetRequest extends AbstractRequest { |
| public static final long EARLIEST_TIMESTAMP = -2L; |
| public static final long LATEST_TIMESTAMP = -1L; |
| |
| public static final int CONSUMER_REPLICA_ID = -1; |
| public static final int DEBUGGING_REPLICA_ID = -2; |
| |
| private static final String REPLICA_ID_KEY_NAME = "replica_id"; |
| private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level"; |
| private static final String TOPICS_KEY_NAME = "topics"; |
| |
| // topic level field names |
| private static final String TOPIC_KEY_NAME = "topic"; |
| private static final String PARTITIONS_KEY_NAME = "partitions"; |
| |
| // partition level field names |
| private static final String PARTITION_KEY_NAME = "partition"; |
| private static final String TIMESTAMP_KEY_NAME = "timestamp"; |
| private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets"; |
| |
| private final int replicaId; |
| private final IsolationLevel isolationLevel; |
| private final Map<TopicPartition, PartitionData> offsetData; |
| private final Map<TopicPartition, Long> partitionTimestamps; |
| private final Set<TopicPartition> duplicatePartitions; |
| |
| public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> { |
| private final int replicaId; |
| private final short minVersion; |
| private final IsolationLevel isolationLevel; |
| private Map<TopicPartition, PartitionData> offsetData = null; |
| private Map<TopicPartition, Long> partitionTimestamps = null; |
| |
| public static Builder forReplica(short desiredVersion, int replicaId) { |
| return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); |
| } |
| |
| public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { |
| // If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK. |
| short minVersion = 0; |
| if (isolationLevel == IsolationLevel.READ_COMMITTED) |
| minVersion = 2; |
| else if (requireTimestamp) |
| minVersion = 1; |
| return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel); |
| } |
| |
| private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) { |
| super(ApiKeys.LIST_OFFSETS, desiredVersion); |
| this.minVersion = minVersion; |
| this.replicaId = replicaId; |
| this.isolationLevel = isolationLevel; |
| } |
| |
| public Builder setOffsetData(Map<TopicPartition, PartitionData> offsetData) { |
| this.offsetData = offsetData; |
| return this; |
| } |
| |
| public Builder setTargetTimes(Map<TopicPartition, Long> partitionTimestamps) { |
| this.partitionTimestamps = partitionTimestamps; |
| return this; |
| } |
| |
| @Override |
| public ListOffsetRequest build(short version) { |
| if (version < minVersion) { |
| throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest because " + |
| "we require features supported only in " + minVersion + " or later."); |
| } |
| if (version == 0) { |
| if (offsetData == null) { |
| if (partitionTimestamps == null) { |
| throw new RuntimeException("Must set partitionTimestamps or offsetData when creating a v0 " + |
| "ListOffsetRequest"); |
| } else { |
| offsetData = new HashMap<>(); |
| for (Map.Entry<TopicPartition, Long> entry: partitionTimestamps.entrySet()) { |
| offsetData.put(entry.getKey(), |
| new PartitionData(entry.getValue(), 1)); |
| } |
| this.partitionTimestamps = null; |
| } |
| } |
| } else { |
| if (offsetData != null) { |
| throw new RuntimeException("Cannot create a v" + version + " ListOffsetRequest with v0 " + |
| "PartitionData."); |
| } else if (partitionTimestamps == null) { |
| throw new RuntimeException("Must set partitionTimestamps when creating a v" + |
| version + " ListOffsetRequest"); |
| } |
| } |
| Map<TopicPartition, ?> m = (version == 0) ? offsetData : partitionTimestamps; |
| return new ListOffsetRequest(replicaId, m, isolationLevel, version); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder bld = new StringBuilder(); |
| bld.append("(type=ListOffsetRequest") |
| .append(", replicaId=").append(replicaId); |
| if (offsetData != null) { |
| bld.append(", offsetData=").append(offsetData); |
| } |
| if (partitionTimestamps != null) { |
| bld.append(", partitionTimestamps=").append(partitionTimestamps); |
| } |
| bld.append(", minVersion=").append(minVersion); |
| bld.append(")"); |
| return bld.toString(); |
| } |
| } |
| |
| /** |
| * This class is only used by ListOffsetRequest v0 which has been deprecated. |
| */ |
| @Deprecated |
| public static final class PartitionData { |
| public final long timestamp; |
| public final int maxNumOffsets; |
| |
| public PartitionData(long timestamp, int maxNumOffsets) { |
| this.timestamp = timestamp; |
| this.maxNumOffsets = maxNumOffsets; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder bld = new StringBuilder(); |
| bld.append("{timestamp: ").append(timestamp). |
| append(", maxNumOffsets: ").append(maxNumOffsets). |
| append("}"); |
| return bld.toString(); |
| } |
| } |
| |
| /** |
| * Private constructor with a specified version. |
| */ |
| @SuppressWarnings("unchecked") |
| private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, IsolationLevel isolationLevel, short version) { |
| super(version); |
| this.replicaId = replicaId; |
| this.isolationLevel = isolationLevel; |
| this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null; |
| this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null; |
| this.duplicatePartitions = Collections.emptySet(); |
| } |
| |
| public ListOffsetRequest(Struct struct, short version) { |
| super(version); |
| Set<TopicPartition> duplicatePartitions = new HashSet<>(); |
| replicaId = struct.getInt(REPLICA_ID_KEY_NAME); |
| isolationLevel = struct.hasField(ISOLATION_LEVEL_KEY_NAME) ? |
| IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME)) : |
| IsolationLevel.READ_UNCOMMITTED; |
| offsetData = new HashMap<>(); |
| partitionTimestamps = new HashMap<>(); |
| for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { |
| Struct topicResponse = (Struct) topicResponseObj; |
| String topic = topicResponse.getString(TOPIC_KEY_NAME); |
| for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { |
| Struct partitionResponse = (Struct) partitionResponseObj; |
| int partition = partitionResponse.getInt(PARTITION_KEY_NAME); |
| long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); |
| TopicPartition tp = new TopicPartition(topic, partition); |
| if (partitionResponse.hasField(MAX_NUM_OFFSETS_KEY_NAME)) { |
| int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME); |
| PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets); |
| offsetData.put(tp, partitionData); |
| } else { |
| if (partitionTimestamps.put(tp, timestamp) != null) |
| duplicatePartitions.add(tp); |
| } |
| } |
| } |
| this.duplicatePartitions = duplicatePartitions; |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { |
| Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); |
| |
| short versionId = version(); |
| if (versionId == 0) { |
| for (Map.Entry<TopicPartition, PartitionData> entry : offsetData.entrySet()) { |
| ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData( |
| Errors.forException(e), Collections.<Long>emptyList()); |
| responseData.put(entry.getKey(), partitionResponse); |
| } |
| } else { |
| for (Map.Entry<TopicPartition, Long> entry : partitionTimestamps.entrySet()) { |
| ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData( |
| Errors.forException(e), -1L, -1L); |
| responseData.put(entry.getKey(), partitionResponse); |
| } |
| } |
| |
| switch (versionId) { |
| case 0: |
| case 1: |
| case 2: |
| return new ListOffsetResponse(throttleTimeMs, responseData); |
| default: |
| throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", |
| versionId, this.getClass().getSimpleName(), ApiKeys.LIST_OFFSETS.latestVersion())); |
| } |
| } |
| |
| public int replicaId() { |
| return replicaId; |
| } |
| |
| public IsolationLevel isolationLevel() { |
| return isolationLevel; |
| } |
| |
| @Deprecated |
| public Map<TopicPartition, PartitionData> offsetData() { |
| return offsetData; |
| } |
| |
| public Map<TopicPartition, Long> partitionTimestamps() { |
| return partitionTimestamps; |
| } |
| |
| public Set<TopicPartition> duplicatePartitions() { |
| return duplicatePartitions; |
| } |
| |
| public static ListOffsetRequest parse(ByteBuffer buffer, short version) { |
| return new ListOffsetRequest(ApiKeys.LIST_OFFSETS.parseRequest(version, buffer), version); |
| } |
| |
| @Override |
| protected Struct toStruct() { |
| short version = version(); |
| Struct struct = new Struct(ApiKeys.LIST_OFFSETS.requestSchema(version)); |
| |
| Map<TopicPartition, ?> targetTimes = partitionTimestamps == null ? offsetData : partitionTimestamps; |
| Map<String, Map<Integer, Object>> topicsData = CollectionUtils.groupDataByTopic(targetTimes); |
| |
| struct.set(REPLICA_ID_KEY_NAME, replicaId); |
| |
| if (struct.hasField(ISOLATION_LEVEL_KEY_NAME)) |
| struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id()); |
| List<Struct> topicArray = new ArrayList<>(); |
| for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) { |
| Struct topicData = struct.instance(TOPICS_KEY_NAME); |
| topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); |
| List<Struct> partitionArray = new ArrayList<>(); |
| for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) { |
| if (version == 0) { |
| PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue(); |
| Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); |
| partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); |
| partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); |
| partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); |
| partitionArray.add(partitionData); |
| } else { |
| Long timestamp = (Long) partitionEntry.getValue(); |
| Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); |
| partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); |
| partitionData.set(TIMESTAMP_KEY_NAME, timestamp); |
| partitionArray.add(partitionData); |
| } |
| } |
| topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); |
| topicArray.add(topicData); |
| } |
| struct.set(TOPICS_KEY_NAME, topicArray.toArray()); |
| return struct; |
| } |
| } |