blob: 23bd9a5c19eceb9c51b90587cb04604bbf9fa7c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import scala.jdk.javaapi.CollectionConverters;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(brokers = 3, serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "auto.leader.rebalance.enable", value = "false"),
@ClusterConfigProperty(key = "controlled.shutdown.enable", value = "true"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "2")
})
public class LeaderElectionCommandTest {
private final ClusterInstance cluster;
int broker2 = 1;
int broker3 = 2;
public LeaderElectionCommandTest(ClusterInstance cluster) {
this.cluster = cluster;
}
@ClusterTest
public void testAllTopicPartition() throws InterruptedException, ExecutionException {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
try (Admin client = cluster.admin()) {
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
));
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
public void testAdminConfigCustomTimeouts() throws Exception {
String defaultApiTimeoutMs = String.valueOf(110000);
String requestTimeoutMs = String.valueOf(55000);
Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs, requestTimeoutMs);
try (final MockedStatic<Admin> mockedAdmin = Mockito.mockStatic(Admin.class)) {
assertEquals(1, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean", "--all-topic-partitions",
"--admin.config", adminConfigPath.toString()
));
ArgumentCaptor<Properties> argumentCaptor = ArgumentCaptor.forClass(Properties.class);
mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
// verify that properties provided to admin client are the overridden properties
final Properties actualProps = argumentCaptor.getValue();
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), requestTimeoutMs);
assertEquals(actualProps.get(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG), defaultApiTimeoutMs);
}
}
@ClusterTest
public void testTopicPartition() throws InterruptedException, ExecutionException {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
try (Admin client = cluster.admin()) {
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", Integer.toString(partition)
));
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
public void testPathToJsonFile() throws Exception {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
try (Admin client = cluster.admin()) {
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
Path topicPartitionPath = tempTopicPartitionFile(singletonList(topicPartition));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString()
));
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
String topic = "preferred-topic";
int partition = 0;
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
try (Admin client = cluster.admin()) {
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
));
TestUtils.assertLeader(client, topicPartition, broker2);
}
}
@ClusterTest
public void testTopicDoesNotExist() {
Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
Duration.ofSeconds(30),
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--topic", "unknown-topic-name",
"--partition", "0"
));
assertInstanceOf(UnknownTopicOrPartitionException.class, e.getSuppressed()[0]);
}
@ClusterTest
public void testElectionResultOutput() throws Exception {
String topic = "non-preferred-topic";
int partition0 = 0;
int partition1 = 1;
List<Integer> assignment0 = asList(broker2, broker3);
List<Integer> assignment1 = asList(broker3, broker2);
cluster.waitForReadyBrokers();
TopicPartition topicPartition0;
TopicPartition topicPartition1;
try (Admin client = cluster.admin()) {
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition0, assignment0);
partitionAssignment.put(partition1, assignment1);
createTopic(client, topic, partitionAssignment);
topicPartition0 = new TopicPartition(topic, partition0);
topicPartition1 = new TopicPartition(topic, partition1);
TestUtils.assertLeader(client, topicPartition0, broker2);
TestUtils.assertLeader(client, topicPartition1, broker3);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition0, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition0,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
TestUtils.waitForBrokersInIsr(client, topicPartition1,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
}
Path topicPartitionPath = tempTopicPartitionFile(asList(topicPartition0, topicPartition1));
String output = ToolsTestUtils.captureStandardOut(() ->
LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--path-to-json-file", topicPartitionPath.toString()
));
Iterator<String> electionResultOutputIter = Arrays.stream(output.split("\n")).iterator();
assertTrue(electionResultOutputIter.hasNext());
String firstLine = electionResultOutputIter.next();
assertTrue(firstLine.contains(String.format(
"Successfully completed leader election (PREFERRED) for partitions %s", topicPartition0)),
String.format("Unexpected output: %s", firstLine));
assertTrue(electionResultOutputIter.hasNext());
String secondLine = electionResultOutputIter.next();
assertTrue(secondLine.contains(String.format("Valid replica already elected for partitions %s", topicPartition1)),
String.format("Unexpected output: %s", secondLine));
}
private void createTopic(Admin admin, String topic, Map<Integer, List<Integer>> replicaAssignment) throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(topic, replicaAssignment);
List<NewTopic> newTopics = singletonList(newTopic);
CreateTopicsResult createTopicResult = admin.createTopics(newTopics);
createTopicResult.all().get();
}
private Path tempTopicPartitionFile(List<TopicPartition> partitions) throws Exception {
java.io.File file = TestUtils.tempFile("leader-election-command", ".json");
String jsonString = stringifyTopicPartitions(new HashSet<>(partitions));
Files.writeString(file.toPath(), jsonString);
return file.toPath();
}
private Path tempAdminConfig(String defaultApiTimeoutMs, String requestTimeoutMs) throws Exception {
String content = "default.api.timeout.ms=" + defaultApiTimeoutMs + "\nrequest.timeout.ms=" + requestTimeoutMs;
java.io.File file = TestUtils.tempFile("admin-config", ".properties");
Files.writeString(file.toPath(), content);
return file.toPath();
}
private String stringifyTopicPartitions(Set<TopicPartition> topicPartitions) {
StringBuilder sb = new StringBuilder();
sb.append("{\"partitions\":[");
Iterator<TopicPartition> iterator = topicPartitions.iterator();
while (iterator.hasNext()) {
TopicPartition topicPartition = iterator.next();
sb.append("{\"topic\":\"")
.append(topicPartition.topic())
.append("\",\"partition\":")
.append(topicPartition.partition()).append("}");
if (iterator.hasNext()) {
sb.append(",");
}
}
sb.append("]}");
return sb.toString();
}
}