blob: 7bda5348e9c384123cefb99ca7cc126d5b901c0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockClusterResourceListener;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class MetadataTest {
private long refreshBackoffMs = 100;
private long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(),
new ClusterResourceListeners());
private static MetadataResponse emptyMetadataResponse() {
return MetadataResponse.prepareResponse(
Collections.emptyList(),
null,
-1,
Collections.emptyList());
}
@Test(expected = IllegalStateException.class)
public void testMetadataUpdateAfterClose() {
metadata.close();
metadata.update(emptyMetadataResponse(), 1000);
}
private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {
long now = 10000;
// Metadata timeToNextUpdate is implicitly relying on the premise that the currentTimeMillis is always
// larger than the metadataExpireMs or refreshBackoffMs.
// It won't be a problem practically since all usages of Metadata calls first update() immediately after
// it's construction.
if (metadataExpireMs > now || refreshBackoffMs > now) {
throw new IllegalArgumentException(
"metadataExpireMs and refreshBackoffMs must be smaller than 'now'");
}
long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs);
Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(),
new ClusterResourceListeners());
assertEquals(0, metadata.timeToNextUpdate(now));
// lastSuccessfulRefreshMs updated to now.
metadata.update(emptyMetadataResponse(), now);
// The last update was successful so the remaining time to expire the current metadata should be returned.
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
// Metadata update requested explicitly
metadata.requestUpdate();
// Update requested so metadataExpireMs should no longer take effect.
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
// Reset needUpdate to false.
metadata.update(emptyMetadataResponse(), now);
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
// Both metadataExpireMs and refreshBackoffMs elapsed.
now += largerOfBackoffAndExpire;
assertEquals(0, metadata.timeToNextUpdate(now));
assertEquals(0, metadata.timeToNextUpdate(now + 1));
}
@Test
public void testTimeToNextUpdate() {
checkTimeToNextUpdate(100, 1000);
checkTimeToNextUpdate(1000, 100);
checkTimeToNextUpdate(0, 0);
checkTimeToNextUpdate(0, 100);
checkTimeToNextUpdate(100, 0);
}
@Test
public void testTimeToNextUpdate_RetryBackoff() {
long now = 10000;
// lastRefreshMs updated to now.
metadata.failedUpdate(now, null);
// Backing off. Remaining time until next try should be returned.
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
// Even though metadata update requested explicitly, still respects backoff.
metadata.requestUpdate();
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
// refreshBackoffMs elapsed.
now += refreshBackoffMs;
// It should return 0 to let next try.
assertEquals(0, metadata.timeToNextUpdate(now));
assertEquals(0, metadata.timeToNextUpdate(now + 1));
}
/**
* Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does not propagate leader epoch
* information accurately while a reassignment is in progress, so we cannot rely on it. This is explained in more
* detail in MetadataResponse's constructor.
*/
@Test
public void testIgnoreLeaderEpochInOlderMetadataResponse() {
TopicPartition tp = new TopicPartition("topic", 0);
MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setLeaderId(5)
.setLeaderEpoch(10)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2, 3))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setName(tp.topic())
.setErrorCode(Errors.NONE.code())
.setPartitions(Collections.singletonList(partitionMetadata))
.setIsInternal(false);
MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
MetadataResponseData data = new MetadataResponseData()
.setClusterId("clusterId")
.setControllerId(0)
.setTopics(topics)
.setBrokers(new MetadataResponseBrokerCollection());
for (short version = ApiKeys.METADATA.oldestVersion(); version < 9; version++) {
Struct struct = data.toStruct(version);
MetadataResponse response = new MetadataResponse(struct, version);
assertFalse(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
assertEquals(-1, info.epoch());
}
for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
Struct struct = data.toStruct(version);
MetadataResponse response = new MetadataResponse(struct, version);
assertTrue(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
assertEquals(10, info.epoch());
}
}
@Test
public void testStaleMetadata() {
TopicPartition tp = new TopicPartition("topic", 0);
MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setLeaderId(1)
.setLeaderEpoch(10)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2, 3))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setName(tp.topic())
.setErrorCode(Errors.NONE.code())
.setPartitions(Collections.singletonList(partitionMetadata))
.setIsInternal(false);
MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
MetadataResponseData data = new MetadataResponseData()
.setClusterId("clusterId")
.setControllerId(0)
.setTopics(topics)
.setBrokers(new MetadataResponseBrokerCollection());
metadata.update(new MetadataResponse(data), 100);
// Older epoch with changed ISR should be ignored
partitionMetadata
.setPartitionIndex(tp.partition())
.setLeaderId(1)
.setLeaderEpoch(9)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
metadata.update(new MetadataResponse(data), 101);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
.map(Node::id).collect(Collectors.toList());
assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
assertEquals(10, info.epoch());
}
@Test
public void testFailedUpdate() {
long time = 100;
metadata.update(emptyMetadataResponse(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
metadata.failedUpdate(1100, null);
assertEquals(100, metadata.timeToNextUpdate(1100));
assertEquals(100, metadata.lastSuccessfulUpdate());
metadata.update(emptyMetadataResponse(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
}
@Test
public void testClusterListenerGetsNotifiedOfUpdate() {
long time = 0;
MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
ClusterResourceListeners listeners = new ClusterResourceListeners();
listeners.maybeAdd(mockClusterListener);
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), listeners);
String hostName = "www.example.com";
metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic", 1);
partitionCounts.put("topic1", 1);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
metadata.update(metadataResponse, 100);
assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
"dummy", mockClusterListener.clusterResource().clusterId());
assertTrue("MockClusterResourceListener should be called when metadata is updated with non-bootstrap Cluster",
MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
}
@Test
public void testRequestUpdate() {
assertFalse(metadata.updateRequested());
int[] epochs = {42, 42, 41, 41, 42, 43, 43, 42, 41, 44};
boolean[] updateResult = {true, false, false, false, false, true, false, false, false, true};
TopicPartition tp = new TopicPartition("topic", 0);
for (int i = 0; i < epochs.length; i++) {
metadata.updateLastSeenEpochIfNewer(tp, epochs[i]);
if (updateResult[i]) {
assertTrue("Expected metadata update to be requested [" + i + "]", metadata.updateRequested());
} else {
assertFalse("Did not expect metadata update to be requested [" + i + "]", metadata.updateRequested());
}
metadata.update(emptyMetadataResponse(), 0L);
assertFalse(metadata.updateRequested());
}
}
@Test
public void testRejectOldMetadata() {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic-1", 1);
TopicPartition tp = new TopicPartition("topic-1", 0);
metadata.update(emptyMetadataResponse(), 0L);
// First epoch seen, accept it
{
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
metadata.update(metadataResponse, 10L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
}
// Fake an empty ISR, but with an older epoch, should reject it
{
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99,
(error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas));
metadata.update(metadataResponse, 20L);
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1);
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
}
// Fake an empty ISR, with same epoch, accept it
{
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100,
(error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas));
metadata.update(metadataResponse, 20L);
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0);
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
}
// Empty metadata response, should not keep old partition but should keep the last-seen epoch
{
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.emptyMap());
metadata.update(metadataResponse, 20L);
assertNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
}
// Back in the metadata, with old epoch, should not get added
{
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99);
metadata.update(metadataResponse, 10L);
assertNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
}
}
@Test
public void testMaybeRequestUpdate() {
TopicPartition tp = new TopicPartition("topic-1", 0);
metadata.update(emptyMetadataResponse(), 0L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.update(emptyMetadataResponse(), 1L);
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.update(emptyMetadataResponse(), 2L);
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.update(emptyMetadataResponse(), 3L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 2));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 2);
}
@Test
public void testOutOfBandEpochUpdate() {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic-1", 5);
TopicPartition tp = new TopicPartition("topic-1", 0);
metadata.update(emptyMetadataResponse(), 0L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99));
// Update epoch to 100
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
metadata.update(metadataResponse, 10L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
// Simulate a leader epoch from another response, like a fetch response (not yet implemented)
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
// Cache of partition stays, but current partition info is not available since it's stale
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with older epoch is rejected, metadata state is unchanged
metadata.update(metadataResponse, 20L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with equal or newer epoch is accepted
metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 101);
metadata.update(metadataResponse, 30L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
}
@Test
public void testNoEpoch() {
metadata.update(emptyMetadataResponse(), 0L);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1));
metadata.update(metadataResponse, 10L);
TopicPartition tp = new TopicPartition("topic-1", 0);
// no epoch
assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
// still works
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(), 0);
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(), 0);
}
@Test
public void testClusterCopy() {
Map<String, Integer> counts = new HashMap<>();
Map<String, Errors> errors = new HashMap<>();
counts.put("topic1", 2);
counts.put("topic2", 3);
counts.put(Topic.GROUP_METADATA_TOPIC_NAME, 3);
errors.put("topic3", Errors.INVALID_TOPIC_EXCEPTION);
errors.put("topic4", Errors.TOPIC_AUTHORIZATION_FAILED);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 4, errors, counts);
metadata.update(metadataResponse, 0L);
Cluster cluster = metadata.fetch();
assertEquals(cluster.clusterResource().clusterId(), "dummy");
assertEquals(cluster.nodes().size(), 4);
// topic counts
assertEquals(cluster.invalidTopics(), Collections.singleton("topic3"));
assertEquals(cluster.unauthorizedTopics(), Collections.singleton("topic4"));
assertEquals(cluster.topics().size(), 3);
assertEquals(cluster.internalTopics(), Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME));
// partition counts
assertEquals(cluster.partitionsForTopic("topic1").size(), 2);
assertEquals(cluster.partitionsForTopic("topic2").size(), 3);
// Sentinel instances
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
Cluster fromMetadata = MetadataCache.bootstrap(Collections.singletonList(address)).cluster();
Cluster fromCluster = Cluster.bootstrap(Collections.singletonList(address));
assertEquals(fromMetadata, fromCluster);
Cluster fromMetadataEmpty = MetadataCache.empty().cluster();
Cluster fromClusterEmpty = Cluster.empty();
assertEquals(fromMetadataEmpty, fromClusterEmpty);
}
@Test
public void testRequestVersion() {
Time time = new MockTime();
metadata.requestUpdate();
Metadata.MetadataRequestAndVersion versionAndBuilder = metadata.newMetadataRequestAndVersion();
metadata.update(versionAndBuilder.requestVersion,
TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
assertFalse(metadata.updateRequested());
// bump the request version for new topics added to the metadata
metadata.requestUpdateForNewTopics();
// simulating a bump while a metadata request is in flight
versionAndBuilder = metadata.newMetadataRequestAndVersion();
metadata.requestUpdateForNewTopics();
metadata.update(versionAndBuilder.requestVersion,
TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
// metadata update is still needed
assertTrue(metadata.updateRequested());
// the next update will resolve it
versionAndBuilder = metadata.newMetadataRequestAndVersion();
metadata.update(versionAndBuilder.requestVersion,
TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
assertFalse(metadata.updateRequested());
}
@Test
public void testInvalidTopicError() {
Time time = new MockTime();
String invalidTopic = "topic dfsa";
MetadataResponse invalidTopicResponse = TestUtils.metadataUpdateWith("clusterId", 1,
Collections.singletonMap(invalidTopic, Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
metadata.update(invalidTopicResponse, time.milliseconds());
InvalidTopicException e = assertThrows(InvalidTopicException.class, () -> metadata.maybeThrowAnyException());
assertEquals(Collections.singleton(invalidTopic), e.invalidTopics());
// We clear the exception once it has been raised to the user
metadata.maybeThrowAnyException();
// Reset the invalid topic error
metadata.update(invalidTopicResponse, time.milliseconds());
// If we get a good update, the error should clear even if we haven't had a chance to raise it to the user
metadata.update(emptyMetadataResponse(), time.milliseconds());
metadata.maybeThrowAnyException();
}
@Test
public void testTopicAuthorizationError() {
Time time = new MockTime();
String invalidTopic = "foo";
MetadataResponse unauthorizedTopicResponse = TestUtils.metadataUpdateWith("clusterId", 1,
Collections.singletonMap(invalidTopic, Errors.TOPIC_AUTHORIZATION_FAILED), Collections.emptyMap());
metadata.update(unauthorizedTopicResponse, time.milliseconds());
TopicAuthorizationException e = assertThrows(TopicAuthorizationException.class, () -> metadata.maybeThrowAnyException());
assertEquals(Collections.singleton(invalidTopic), e.unauthorizedTopics());
// We clear the exception once it has been raised to the user
metadata.maybeThrowAnyException();
// Reset the unauthorized topic error
metadata.update(unauthorizedTopicResponse, time.milliseconds());
// If we get a good update, the error should clear even if we haven't had a chance to raise it to the user
metadata.update(emptyMetadataResponse(), time.milliseconds());
metadata.maybeThrowAnyException();
}
@Test
public void testMetadataTopicErrors() {
Time time = new MockTime();
Map<String, Errors> topicErrors = new HashMap<>(3);
topicErrors.put("invalidTopic", Errors.INVALID_TOPIC_EXCEPTION);
topicErrors.put("sensitiveTopic1", Errors.TOPIC_AUTHORIZATION_FAILED);
topicErrors.put("sensitiveTopic2", Errors.TOPIC_AUTHORIZATION_FAILED);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("clusterId", 1, topicErrors, Collections.emptyMap());
metadata.update(metadataResponse, time.milliseconds());
TopicAuthorizationException e1 = assertThrows(TopicAuthorizationException.class,
() -> metadata.maybeThrowExceptionForTopic("sensitiveTopic1"));
assertEquals(Collections.singleton("sensitiveTopic1"), e1.unauthorizedTopics());
// We clear the exception once it has been raised to the user
metadata.maybeThrowAnyException();
metadata.update(metadataResponse, time.milliseconds());
TopicAuthorizationException e2 = assertThrows(TopicAuthorizationException.class,
() -> metadata.maybeThrowExceptionForTopic("sensitiveTopic2"));
assertEquals(Collections.singleton("sensitiveTopic2"), e2.unauthorizedTopics());
metadata.maybeThrowAnyException();
metadata.update(metadataResponse, time.milliseconds());
InvalidTopicException e3 = assertThrows(InvalidTopicException.class,
() -> metadata.maybeThrowExceptionForTopic("invalidTopic"));
assertEquals(Collections.singleton("invalidTopic"), e3.invalidTopics());
metadata.maybeThrowAnyException();
// Other topics should not throw exception, but they should clear existing exception
metadata.update(metadataResponse, time.milliseconds());
metadata.maybeThrowExceptionForTopic("anotherTopic");
metadata.maybeThrowAnyException();
}
@Test
public void testNodeIfOffline() {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic-1", 1);
Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), partitionCounts, _tp -> 99,
(error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
new MetadataResponse.PartitionMetadata(error, partition, node0, leaderEpoch,
Collections.singletonList(node0), Collections.emptyList(), Collections.singletonList(node1)));
metadata.update(emptyMetadataResponse(), 0L);
metadata.update(metadataResponse, 10L);
TopicPartition tp = new TopicPartition("topic-1", 0);
assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(node.id(), 0));
assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent());
assertEquals(metadata.fetch().nodeById(0).id(), 0);
assertEquals(metadata.fetch().nodeById(1).id(), 1);
}
@Test
public void testLeaderMetadataInconsistentWithBrokerMetadata() {
// Tests a reordering scenario which can lead to inconsistent leader state.
// A partition initially has one broker offline. That broker comes online and
// is elected leader. The client sees these two events in the opposite order.
TopicPartition tp = new TopicPartition("topic", 0);
Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
Node node2 = new Node(2, "localhost", 9094);
// The first metadata received by broker (epoch=10)
MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(10)
.setLeaderId(0)
.setReplicaNodes(Arrays.asList(0, 1, 2))
.setIsrNodes(Arrays.asList(0, 1, 2))
.setOfflineReplicas(Collections.emptyList());
// The second metadata received has stale metadata (epoch=8)
MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(8)
.setLeaderId(1)
.setReplicaNodes(Arrays.asList(0, 1, 2))
.setIsrNodes(Arrays.asList(1, 2))
.setOfflineReplicas(Collections.singletonList(0));
metadata.update(new MetadataResponse(new MetadataResponseData()
.setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata))
.setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))),
10L);
metadata.update(new MetadataResponse(new MetadataResponseData()
.setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata))
.setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))),
20L);
assertNull(metadata.fetch().leaderFor(tp));
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
assertTrue(metadata.leaderAndEpoch(tp).leader.isEmpty());
}
private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition partitionMetadata) {
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setErrorCode(Errors.NONE.code())
.setName(topic)
.setIsInternal(false);
topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
return topics;
}
private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> nodes) {
MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection();
for (Node node : nodes) {
MetadataResponseBroker broker = new MetadataResponseBroker()
.setNodeId(node.id())
.setHost(node.host())
.setPort(node.port())
.setRack(node.rack());
brokers.add(broker);
}
return brokers;
}
}