| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.common.requests; |
| |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.UnsupportedVersionException; |
| import org.apache.kafka.common.network.ListenerName; |
| import org.apache.kafka.common.protocol.ApiKeys; |
| import org.apache.kafka.common.protocol.Errors; |
| import org.apache.kafka.common.protocol.SecurityProtocol; |
| import org.apache.kafka.common.protocol.types.Struct; |
| import org.apache.kafka.common.utils.Utils; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| public class UpdateMetadataRequest extends AbstractRequest { |
| public static class Builder extends AbstractRequest.Builder<UpdateMetadataRequest> { |
| private final int controllerId; |
| private final int controllerEpoch; |
| private final Map<TopicPartition, PartitionState> partitionStates; |
| private final Set<Broker> liveBrokers; |
| |
| public Builder(short version, int controllerId, int controllerEpoch, |
| Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) { |
| super(ApiKeys.UPDATE_METADATA_KEY, version); |
| this.controllerId = controllerId; |
| this.controllerEpoch = controllerEpoch; |
| this.partitionStates = partitionStates; |
| this.liveBrokers = liveBrokers; |
| } |
| |
| @Override |
| public UpdateMetadataRequest build(short version) { |
| if (version == 0) { |
| for (Broker broker : liveBrokers) { |
| if (broker.endPoints.size() != 1 || broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) { |
| throw new UnsupportedVersionException("UpdateMetadataRequest v0 only handles PLAINTEXT endpoints"); |
| } |
| } |
| } |
| return new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates, liveBrokers); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder bld = new StringBuilder(); |
| bld.append("(type: UpdateMetadataRequest="). |
| append(", controllerId=").append(controllerId). |
| append(", controllerEpoch=").append(controllerEpoch). |
| append(", partitionStates=").append(partitionStates). |
| append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")). |
| append(")"); |
| return bld.toString(); |
| } |
| } |
| |
| public static final class Broker { |
| public final int id; |
| public final List<EndPoint> endPoints; |
| public final String rack; // introduced in V2 |
| |
| public Broker(int id, List<EndPoint> endPoints, String rack) { |
| this.id = id; |
| this.endPoints = endPoints; |
| this.rack = rack; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder bld = new StringBuilder(); |
| bld.append("(id=").append(id); |
| bld.append(", endPoints=").append(Utils.join(endPoints, ",")); |
| bld.append(", rack=").append(rack); |
| bld.append(")"); |
| return bld.toString(); |
| } |
| } |
| |
| public static final class EndPoint { |
| public final String host; |
| public final int port; |
| public final SecurityProtocol securityProtocol; |
| public final ListenerName listenerName; // introduced in V3 |
| |
| public EndPoint(String host, int port, SecurityProtocol securityProtocol, ListenerName listenerName) { |
| this.host = host; |
| this.port = port; |
| this.securityProtocol = securityProtocol; |
| this.listenerName = listenerName; |
| } |
| |
| @Override |
| public String toString() { |
| return "(host=" + host + ", port=" + port + ", listenerName=" + listenerName + |
| ", securityProtocol=" + securityProtocol + ")"; |
| } |
| } |
| |
| private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; |
| private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch"; |
| private static final String PARTITION_STATES_KEY_NAME = "partition_states"; |
| private static final String LIVE_BROKERS_KEY_NAME = "live_brokers"; |
| |
| // PartitionState key names |
| private static final String TOPIC_KEY_NAME = "topic"; |
| private static final String PARTITION_KEY_NAME = "partition"; |
| private static final String LEADER_KEY_NAME = "leader"; |
| private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch"; |
| private static final String ISR_KEY_NAME = "isr"; |
| private static final String ZK_VERSION_KEY_NAME = "zk_version"; |
| private static final String REPLICAS_KEY_NAME = "replicas"; |
| |
| // Broker key names |
| private static final String BROKER_ID_KEY_NAME = "id"; |
| private static final String ENDPOINTS_KEY_NAME = "end_points"; |
| private static final String RACK_KEY_NAME = "rack"; |
| |
| // EndPoint key names |
| private static final String HOST_KEY_NAME = "host"; |
| private static final String PORT_KEY_NAME = "port"; |
| private static final String LISTENER_NAME_KEY_NAME = "listener_name"; |
| private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type"; |
| |
| private final int controllerId; |
| private final int controllerEpoch; |
| private final Map<TopicPartition, PartitionState> partitionStates; |
| private final Set<Broker> liveBrokers; |
| |
| private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, Map<TopicPartition, |
| PartitionState> partitionStates, Set<Broker> liveBrokers) { |
| super(version); |
| this.controllerId = controllerId; |
| this.controllerEpoch = controllerEpoch; |
| this.partitionStates = partitionStates; |
| this.liveBrokers = liveBrokers; |
| } |
| |
| public UpdateMetadataRequest(Struct struct, short versionId) { |
| super(versionId); |
| Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); |
| for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { |
| Struct partitionStateData = (Struct) partitionStateDataObj; |
| String topic = partitionStateData.getString(TOPIC_KEY_NAME); |
| int partition = partitionStateData.getInt(PARTITION_KEY_NAME); |
| int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME); |
| int leader = partitionStateData.getInt(LEADER_KEY_NAME); |
| int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME); |
| |
| Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME); |
| List<Integer> isr = new ArrayList<>(isrArray.length); |
| for (Object r : isrArray) |
| isr.add((Integer) r); |
| |
| int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); |
| |
| Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); |
| List<Integer> replicas = new ArrayList<>(replicasArray.length); |
| for (Object r : replicasArray) |
| replicas.add((Integer) r); |
| |
| PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); |
| partitionStates.put(new TopicPartition(topic, partition), partitionState); |
| |
| } |
| |
| Set<Broker> liveBrokers = new HashSet<>(); |
| |
| for (Object brokerDataObj : struct.getArray(LIVE_BROKERS_KEY_NAME)) { |
| Struct brokerData = (Struct) brokerDataObj; |
| int brokerId = brokerData.getInt(BROKER_ID_KEY_NAME); |
| |
| // V0 |
| if (brokerData.hasField(HOST_KEY_NAME)) { |
| String host = brokerData.getString(HOST_KEY_NAME); |
| int port = brokerData.getInt(PORT_KEY_NAME); |
| List<EndPoint> endPoints = new ArrayList<>(1); |
| SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; |
| endPoints.add(new EndPoint(host, port, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))); |
| liveBrokers.add(new Broker(brokerId, endPoints, null)); |
| } else { // V1, V2 or V3 |
| List<EndPoint> endPoints = new ArrayList<>(); |
| for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) { |
| Struct endPointData = (Struct) endPointDataObj; |
| int port = endPointData.getInt(PORT_KEY_NAME); |
| String host = endPointData.getString(HOST_KEY_NAME); |
| short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME); |
| SecurityProtocol securityProtocol = SecurityProtocol.forId(protocolTypeId); |
| String listenerName; |
| if (endPointData.hasField(LISTENER_NAME_KEY_NAME)) // V3 |
| listenerName = endPointData.getString(LISTENER_NAME_KEY_NAME); |
| else |
| listenerName = securityProtocol.name; |
| endPoints.add(new EndPoint(host, port, securityProtocol, new ListenerName(listenerName))); |
| } |
| String rack = null; |
| if (brokerData.hasField(RACK_KEY_NAME)) { // V2 |
| rack = brokerData.getString(RACK_KEY_NAME); |
| } |
| liveBrokers.add(new Broker(brokerId, endPoints, rack)); |
| } |
| } |
| controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); |
| controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME); |
| this.partitionStates = partitionStates; |
| this.liveBrokers = liveBrokers; |
| } |
| |
| @Override |
| protected Struct toStruct() { |
| short version = version(); |
| Struct struct = new Struct(ApiKeys.UPDATE_METADATA_KEY.requestSchema(version)); |
| struct.set(CONTROLLER_ID_KEY_NAME, controllerId); |
| struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); |
| |
| List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); |
| for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { |
| Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); |
| TopicPartition topicPartition = entry.getKey(); |
| partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); |
| partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); |
| PartitionState partitionState = entry.getValue(); |
| partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); |
| partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); |
| partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); |
| partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); |
| partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); |
| partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); |
| partitionStatesData.add(partitionStateData); |
| } |
| struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); |
| |
| List<Struct> brokersData = new ArrayList<>(liveBrokers.size()); |
| for (Broker broker : liveBrokers) { |
| Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME); |
| brokerData.set(BROKER_ID_KEY_NAME, broker.id); |
| |
| if (version == 0) { |
| EndPoint endPoint = broker.endPoints.get(0); |
| brokerData.set(HOST_KEY_NAME, endPoint.host); |
| brokerData.set(PORT_KEY_NAME, endPoint.port); |
| } else { |
| List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size()); |
| for (EndPoint endPoint : broker.endPoints) { |
| Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME); |
| endPointData.set(PORT_KEY_NAME, endPoint.port); |
| endPointData.set(HOST_KEY_NAME, endPoint.host); |
| endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, endPoint.securityProtocol.id); |
| if (version >= 3) |
| endPointData.set(LISTENER_NAME_KEY_NAME, endPoint.listenerName.value()); |
| endPointsData.add(endPointData); |
| |
| } |
| brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray()); |
| if (version >= 2) { |
| brokerData.set(RACK_KEY_NAME, broker.rack); |
| } |
| } |
| |
| brokersData.add(brokerData); |
| } |
| struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray()); |
| |
| return struct; |
| } |
| |
| @Override |
| public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { |
| short versionId = version(); |
| if (versionId <= 3) |
| return new UpdateMetadataResponse(Errors.forException(e)); |
| else |
| throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", |
| versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA_KEY.latestVersion())); |
| } |
| |
| public int controllerId() { |
| return controllerId; |
| } |
| |
| public int controllerEpoch() { |
| return controllerEpoch; |
| } |
| |
| public Map<TopicPartition, PartitionState> partitionStates() { |
| return partitionStates; |
| } |
| |
| public Set<Broker> liveBrokers() { |
| return liveBrokers; |
| } |
| |
| public static UpdateMetadataRequest parse(ByteBuffer buffer, short version) { |
| return new UpdateMetadataRequest(ApiKeys.UPDATE_METADATA_KEY.parseRequest(version, buffer), version); |
| } |
| |
| } |