blob: a3ad1f64202ca4a56fbf0e26318eb0f7dbc60eba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class QuorumControllerTest {
private final static Logger log = LoggerFactory.getLogger(QuorumControllerTest.class);
static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
/**
* Test setting some configuration values and reading them back.
*/
@Test
public void testConfigurationOperations() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest())).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
}
}
private void testConfigurationOperations(QuorumController controller) throws Throwable {
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), true).get());
assertEquals(Collections.singletonMap(BROKER0,
new ResultOrError<>(Collections.emptyMap())),
controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false).get());
assertEquals(Collections.singletonMap(BROKER0, new ResultOrError<>(Collections.
singletonMap("baz", "123"))),
controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
}
/**
* Test that an incrementalAlterConfigs operation doesn't complete until the records
* can be written to the metadata log.
*/
@Test
public void testDelayedConfigurationOperations() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest())).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
}
}
private void testDelayedConfigurationOperations(
LocalLogManagerTestEnv logEnv,
QuorumController controller
) throws Throwable {
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
CompletableFuture<Map<ConfigResource, ApiError>> future1 =
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false);
assertFalse(future1.isDone());
assertEquals(Collections.singletonMap(BROKER0,
new ResultOrError<>(Collections.emptyMap())),
controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(4L));
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
}
@Test
public void testFenceMultipleBrokers() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
short replicationFactor = (short) allBrokers.size();
short numberOfPartitions = (short) allBrokers.size();
long sessionTimeoutMillis = 1000;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = new HashMap<>();
for (Integer brokerId : allBrokers) {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
}
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
// Unfence all brokers and create a topic foo
sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
Collections.singleton("foo")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
// Fence some of the brokers
TestUtils.waitForCondition(() -> {
sendBrokerHeartbeat(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
return true;
}, sessionTimeoutMillis * 3,
"Fencing of brokers did not process within expected time"
);
// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeat(active, brokersToKeepUnfenced, brokerEpochs);
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
// Verify the isr and leaders for the topic partition
int[] expectedIsr = {1};
int[] isrFoo = active.replicationControl().getPartition(topicIdFoo, 0).isr;
assertArrayEquals(isrFoo, expectedIsr, "The ISR for topic foo was " + Arrays.toString(isrFoo) +
". It is expected to be " + Arrays.toString(expectedIsr));
int fooLeader = active.replicationControl().getPartition(topicIdFoo, 0).leader;
assertEquals(expectedIsr[0], fooLeader);
// Check that there are imbalaned partitions
assertTrue(active.replicationControl().arePartitionLeadersImbalanced());
}
}
@Test
public void testBalancePartitionLeaders() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
List<Integer> brokersToFence = Collections.singletonList(3);
short replicationFactor = (short) allBrokers.size();
short numberOfPartitions = (short) allBrokers.size();
long sessionTimeoutMillis = 1000;
long leaderImbalanceCheckIntervalNs = 1_000_000_000;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = new HashMap<>();
for (Integer brokerId : allBrokers) {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
}
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
// Unfence all brokers and create a topic foo
sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
// Fence some of the brokers
TestUtils.waitForCondition(
() -> {
sendBrokerHeartbeat(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
return true;
},
sessionTimeoutMillis * 3,
"Fencing of brokers did not process within expected time"
);
// Send another heartbeat to the brokers we want to keep alive
sendBrokerHeartbeat(active, brokersToKeepUnfenced, brokerEpochs);
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
// Check that there are imbalanced partitions
assertTrue(active.replicationControl().arePartitionLeadersImbalanced());
// Re-register all fenced brokers
for (Integer brokerId : brokersToFence) {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
}
// Unfence all brokers
sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
// Let the unfenced broker, 3, join the ISR partition 2
Set<TopicIdPartition> imbalancedPartitions = active.replicationControl().imbalancedPartitions();
assertEquals(1, imbalancedPartitions.size());
int imbalancedPartitionId = imbalancedPartitions.iterator().next().partitionId();
PartitionRegistration partitionRegistration = active.replicationControl().getPartition(topicIdFoo, imbalancedPartitionId);
AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(imbalancedPartitionId)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
.setTopicName("foo");
topicData.partitions().add(partitionData);
AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData()
.setBrokerId(partitionRegistration.leader)
.setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
alterPartitionRequest.topics().add(topicData);
active.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest
.Builder(alterPartitionRequest, false).build((short) 0).data()).get();
AtomicLong lastHeartbeatMs = new AtomicLong(getMonotonicMs(active.time()));
sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
// Check that partitions are balanced
TestUtils.waitForCondition(
() -> {
long currentMonotonicMs = getMonotonicMs(active.time());
if (currentMonotonicMs > lastHeartbeatMs.get() + (sessionTimeoutMillis / 2)) {
lastHeartbeatMs.set(currentMonotonicMs);
sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
}
return !active.replicationControl().arePartitionLeadersImbalanced();
},
TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10, TimeUnit.NANOSECONDS),
"Leaders were not balanced after unfencing all of the brokers"
);
}
}
private static long getMonotonicMs(Time time) {
return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
}
@Test
public void testNoOpRecordWriteAfterTimeout() throws Throwable {
long maxIdleIntervalNs = 1_000;
long maxReplicationDelayMs = 60_000;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
}).
build()
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
LocalLogManager localLogManager = logEnv
.logManagers()
.stream()
.filter(logManager -> logManager.nodeId().equals(OptionalInt.of(active.nodeId())))
.findAny()
.get();
TestUtils.waitForCondition(
() -> localLogManager.highWatermark().isPresent(),
maxReplicationDelayMs,
"High watermark was not established"
);
final long firstHighWatermark = localLogManager.highWatermark().getAsLong();
TestUtils.waitForCondition(
() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark,
maxReplicationDelayMs,
"Active controller didn't write NoOpRecord the first time"
);
// Do it again to make sure that we are not counting the leader change record
final long secondHighWatermark = localLogManager.highWatermark().getAsLong();
TestUtils.waitForCondition(
() -> localLogManager.highWatermark().getAsLong() > secondHighWatermark,
maxReplicationDelayMs,
"Active controller didn't write NoOpRecord the second time"
);
}
}
@Test
public void testUnregisterBroker() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").
setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest())).
setListeners(listeners));
assertEquals(3L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor((short) 1)).iterator()));
assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).get().
topics().find("foo").errorCode());
assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
"are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(3L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
get().topics().find("foo").errorCode());
CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
"debugGetPartition", OptionalLong.empty(), () -> {
Iterator<TopicIdPartition> iterator = active.
replicationControl().brokersToIsrs().iterator(0, true);
assertTrue(iterator.hasNext());
return iterator.next();
});
assertEquals(0, topicPartitionFuture.get().partitionId());
active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get();
topicPartitionFuture = active.appendReadEvent(
"debugGetPartition", OptionalLong.empty(), () -> {
Iterator<TopicIdPartition> iterator = active.
replicationControl().brokersToIsrs().partitionsWithNoLeader();
assertTrue(iterator.hasNext());
return iterator.next();
});
assertEquals(0, topicPartitionFuture.get().partitionId());
}
}
private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
MetadataVersion minVersion,
MetadataVersion maxVersion
) {
RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
features.add(new RegisterBrokerRecord.BrokerFeature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(minVersion.featureLevel()).
setMaxSupportedVersion(maxVersion.featureLevel()));
return features;
}
@Test
public void testSnapshotSaveAndLoad() throws Throwable {
final int numBrokers = 4;
Map<Integer, Long> brokerEpochs = new HashMap<>();
RawSnapshotReader reader = null;
Uuid fooId;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(i).
setRack(null).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
setPort(9092 + i)).iterator()))).get();
brokerEpochs.put(i, reply.epoch());
}
for (int i = 0; i < numBrokers - 1; i++) {
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
}
CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(-1).
setReplicationFactor((short) -1).
setAssignments(new CreatableReplicaAssignmentCollection(
Arrays.asList(new CreatableReplicaAssignment().
setPartitionIndex(0).
setBrokerIds(Arrays.asList(0, 1, 2)),
new CreatableReplicaAssignment().
setPartitionIndex(1).
setBrokerIds(Arrays.asList(1, 2, 0))).
iterator()))).iterator())),
Collections.singleton("foo")).get();
fooId = fooData.topics().find("foo").topicId();
active.allocateProducerIds(ANONYMOUS_CONTEXT,
new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
controlEnv.close();
assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());
}
}
private List<ApiMessageAndVersion> generateTestRecords(Uuid fooId, Map<Integer, Long> brokerEpochs) {
return Arrays.asList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel()), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
setEndPoints(new BrokerEndpointCollection(
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9092).setSecurityProtocol((short) 0)).iterator())).
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setRack(null).
setFenced(true), (short) 1),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
setEndPoints(new BrokerEndpointCollection(Arrays.asList(
new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9093).setSecurityProtocol((short) 0)).iterator())).
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setRack(null).
setFenced(true), (short) 1),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
setEndPoints(new BrokerEndpointCollection(
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9094).setSecurityProtocol((short) 0)).iterator())).
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setRack(null).
setFenced(true), (short) 1),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
setEndPoints(new BrokerEndpointCollection(Arrays.asList(
new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9095).setSecurityProtocol((short) 0)).iterator())).
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setRack(null).
setFenced(true), (short) 1),
new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(0).
setBrokerEpoch(brokerEpochs.get(0)).
setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(1).
setBrokerEpoch(brokerEpochs.get(1)).
setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(2).
setBrokerEpoch(brokerEpochs.get(2)).
setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
new ApiMessageAndVersion(new TopicRecord().
setName("foo").setTopicId(fooId), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(0).
setBrokerEpoch(brokerEpochs.get(0)).
setNextProducerId(1000), (short) 0));
}
/**
* Test that certain controller operations time out if they stay on the controller
* queue for too long.
*/
@Test
public void testTimeouts() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
QuorumController controller = controlEnv.activeController();
CountDownLatch countDownLatch = pause(controller);
long now = controller.time().nanoseconds();
ControllerRequestContext context0 = new ControllerRequestContext(
new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
CompletableFuture<CreateTopicsResponseData> createFuture =
controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).
setTopics(new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo")).iterator())),
Collections.emptySet());
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
controller.findTopicIds(context0, Collections.singletonList("foo"));
CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
controller.createPartitions(context0, Collections.singletonList(
new CreatePartitionsTopic()), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
setTopicPartitions(null));
CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
controller.alterPartitionReassignments(context0,
new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).
setTopics(Collections.singletonList(new ReassignableTopic())));
CompletableFuture<ListPartitionReassignmentsResponseData> listReassignmentsFuture =
controller.listPartitionReassignments(context0,
new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
while (controller.time().nanoseconds() == now) {
Thread.sleep(0, 10);
}
countDownLatch.countDown();
assertYieldsTimeout(createFuture);
assertYieldsTimeout(deleteFuture);
assertYieldsTimeout(findTopicIdsFuture);
assertYieldsTimeout(findTopicNamesFuture);
assertYieldsTimeout(createPartitionsFuture);
assertYieldsTimeout(electLeadersFuture);
assertYieldsTimeout(alterReassignmentsFuture);
assertYieldsTimeout(listReassignmentsFuture);
}
}
private static void assertYieldsTimeout(Future<?> future) {
assertEquals(TimeoutException.class, assertThrows(ExecutionException.class,
() -> future.get()).getCause().getClass());
}
/**
* Test that certain controller operations finish immediately without putting an event
* on the controller queue, if there is nothing to do.
*/
@Test
public void testEarlyControllerResults() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
QuorumController controller = controlEnv.activeController();
CountDownLatch countDownLatch = pause(controller);
CompletableFuture<CreateTopicsResponseData> createFuture =
controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
setTimeoutMs(120000), Collections.emptySet());
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList());
CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList());
CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
controller.alterPartitionReassignments(ANONYMOUS_CONTEXT,
new AlterPartitionReassignmentsRequestData());
createFuture.get();
deleteFuture.get();
findTopicIdsFuture.get();
findTopicNamesFuture.get();
createPartitionsFuture.get();
electLeadersFuture.get();
alterReassignmentsFuture.get();
countDownLatch.countDown();
}
}
@Disabled // TODO: need to fix leader election in LocalLog.
public void testMissingInMemorySnapshot() throws Exception {
int numBrokers = 3;
int numPartitions = 3;
String topicName = "topic-name";
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
QuorumController controller = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(controller, numBrokers);
// Create a lot of partitions
List<CreatableReplicaAssignment> partitions = IntStream
.range(0, numPartitions)
.mapToObj(partitionIndex -> new CreatableReplicaAssignment()
.setPartitionIndex(partitionIndex)
.setBrokerIds(Arrays.asList(0, 1, 2))
)
.collect(Collectors.toList());
Uuid topicId = controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData()
.setTopics(new CreatableTopicCollection(Collections.singleton(new CreatableTopic()
.setName(topicName)
.setNumPartitions(-1)
.setReplicationFactor((short) -1)
.setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
).iterator())),
Collections.singleton("foo")).get().topics().find(topicName).topicId();
// Create a lot of alter isr
List<AlterPartitionRequestData.PartitionData> alterPartitions = IntStream
.range(0, numPartitions)
.mapToObj(partitionIndex -> {
PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(
topicId,
partitionIndex
);
return new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(partitionIndex)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(0, 1)));
})
.collect(Collectors.toList());
AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
.setTopicName(topicName);
topicData.partitions().addAll(alterPartitions);
int leaderId = 0;
AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData()
.setBrokerId(leaderId)
.setBrokerEpoch(brokerEpochs.get(leaderId));
alterPartitionRequest.topics().add(topicData);
logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
int oldClaimEpoch = controller.curClaimEpoch();
assertThrows(ExecutionException.class,
() -> controller.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest
.Builder(alterPartitionRequest, false).build((short) 0).data()).get());
// Wait for the controller to become active again
assertSame(controller, controlEnv.activeController());
assertTrue(
oldClaimEpoch < controller.curClaimEpoch(),
String.format("oldClaimEpoch = %s, newClaimEpoch = %s", oldClaimEpoch, controller.curClaimEpoch())
);
// Since the alterPartition partially failed we expect to see
// some partitions to still have 2 in the ISR.
int partitionsWithReplica2 = Utils.toList(
controller
.replicationControl()
.brokersToIsrs()
.partitionsWithBrokerInIsr(2)
).size();
int partitionsWithReplica0 = Utils.toList(
controller
.replicationControl()
.brokersToIsrs()
.partitionsWithBrokerInIsr(0)
).size();
assertEquals(numPartitions, partitionsWithReplica0);
assertNotEquals(0, partitionsWithReplica2);
assertTrue(
partitionsWithReplica0 > partitionsWithReplica2,
String.format(
"partitionsWithReplica0 = %s, partitionsWithReplica2 = %s",
partitionsWithReplica0,
partitionsWithReplica2
)
);
}
}
@Test
public void testConfigResourceExistenceChecker() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
build()
) {
QuorumController active = controlEnv.activeController();
registerBrokersAndUnfence(active, 5);
active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
setTopics(new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").
setReplicationFactor((short) 3).
setNumPartitions(1)).iterator())),
Collections.singleton("foo")).get();
ConfigResourceExistenceChecker checker =
active.new ConfigResourceExistenceChecker();
// A ConfigResource with type=BROKER and name=(empty string) represents
// the default broker resource. It is used to set cluster configs.
checker.accept(new ConfigResource(BROKER, ""));
// Broker 3 exists, so we can set a configuration for it.
checker.accept(new ConfigResource(BROKER, "3"));
// Broker 10 does not exist, so this should throw an exception.
assertThrows(BrokerIdNotRegisteredException.class,
() -> checker.accept(new ConfigResource(BROKER, "10")));
// Topic foo exists, so we can set a configuration for it.
checker.accept(new ConfigResource(TOPIC, "foo"));
// Topic bar does not exist, so this should throw an exception.
assertThrows(UnknownTopicOrPartitionException.class,
() -> checker.accept(new ConfigResource(TOPIC, "bar")));
}
}
private static final Uuid FOO_ID = Uuid.fromString("igRktLOnR8ektWHr79F8mw");
private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), __ -> 0L));
@Test
public void testFatalMetadataReplayErrorOnActive() throws Throwable {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
build()
) {
QuorumController active = controlEnv.activeController();
CompletableFuture<Void> future = active.appendWriteEvent("errorEvent",
OptionalLong.empty(), () -> {
return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().
setName(null).
setResourceName(null).
setResourceType((byte) 255).
setValue(null), (short) 0)), null);
});
assertThrows(ExecutionException.class, () -> future.get());
assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId())
.firstException().getCause().getClass());
controlEnv.ignoreFatalFaults();
}
}
@Test
public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
);
LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3)
.setSnapshotReader(FileRawSnapshotReader.open(
invalidSnapshot.tempDir.toPath(),
new OffsetAndEpoch(0, 0)
));
try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
}),
"At least one controller failed to detect the fatal fault"
);
controlEnv.ignoreFatalFaults();
}
}
}
@Test
public void testFatalMetadataErrorDuringLogLoading() throws Exception {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
));
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
}),
"At least one controller failed to detect the fatal fault"
);
controlEnv.ignoreFatalFaults();
}
}
}
private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
for (int i = 0; i < authorizers.size(); i++) {
assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
"authorizer " + i + " should not have completed loading.");
}
}
static class InitialSnapshot implements AutoCloseable {
File tempDir = null;
BatchFileWriter writer = null;
public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
tempDir = TestUtils.tempDirectory();
Path path = Snapshots.snapshotPath(tempDir.toPath(), new OffsetAndEpoch(0, 0));
writer = BatchFileWriter.open(path);
writer.append(records);
writer.close();
writer = null;
}
@Override
public void close() throws Exception {
Utils.closeQuietly(writer, "BatchFileWriter");
Utils.delete(tempDir);
}
}
private final static List<ApiMessageAndVersion> PRE_PRODUCTION_RECORDS =
Collections.unmodifiableList(Arrays.asList(
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(42).
setBrokerId(123).
setIncarnationId(Uuid.fromString("v78Gbc6sQXK0y5qqRxiryw")).
setRack(null),
(short) 0),
new ApiMessageAndVersion(new UnfenceBrokerRecord().
setEpoch(42).
setId(123),
(short) 0),
new ApiMessageAndVersion(new TopicRecord().
setName("bar").
setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")),
(short) 0)));
private final static BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(
Arrays.asList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_3_IV1.featureLevel()),
(short) 0),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(BROKER.id()).
setResourceName("").
setName("foo").
setValue("bar"),
(short) 0)),
"test bootstrap");
@Test
public void testUpgradeFromPreProductionVersion() throws Exception {
try (
InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS);
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
setSnapshotReader(FileRawSnapshotReader.open(
initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
setBootstrapMetadata(COMPLEX_BOOTSTRAP).
build()
) {
QuorumController active = controlEnv.activeController();
TestUtils.waitForCondition(() ->
active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1),
"Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1);
// The ConfigRecord in our bootstrap should not have been applied, since there
// were already records present.
assertEquals(Collections.emptyMap(), active.configurationControl().
getConfigs(new ConfigResource(BROKER, "")));
}
}
@Test
public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
}).
setBootstrapMetadata(COMPLEX_BOOTSTRAP).
build()
) {
QuorumController active = controlEnv.activeController();
ControllerRequestContext ctx = new ControllerRequestContext(
new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
TestUtils.waitForCondition(() -> {
FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get();
Optional<Short> metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME);
return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt);
}, "Failed to see expected metadata version from bootstrap metadata");
TestUtils.waitForCondition(() -> {
ConfigResource defaultBrokerResource = new ConfigResource(BROKER, "");
Map<ConfigResource, Collection<String>> configs = Collections.singletonMap(
defaultBrokerResource,
Collections.emptyList()
);
Map<ConfigResource, ResultOrError<Map<String, String>>> results =
active.describeConfigs(ctx, configs).get();
ResultOrError<Map<String, String>> resultOrError = results.get(defaultBrokerResource);
return resultOrError.isResult() &&
Collections.singletonMap("foo", "bar").equals(resultOrError.result());
}, "Failed to see expected config change from bootstrap metadata");
}
}
static class TestAppender implements Function<List<ApiMessageAndVersion>, Long> {
private long offset = 0;
@Override
public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
for (ApiMessageAndVersion apiMessageAndVersion : apiMessageAndVersions) {
BrokerRegistrationChangeRecord record =
(BrokerRegistrationChangeRecord) apiMessageAndVersion.message();
assertEquals((int) offset, record.brokerId());
offset++;
}
return offset;
}
}
private static ApiMessageAndVersion rec(int i) {
return new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().setBrokerId(i),
(short) 0);
}
@Test
public void testAppendRecords() {
TestAppender appender = new TestAppender();
assertEquals(5, QuorumController.appendRecords(log,
ControllerResult.of(Arrays.asList(rec(0), rec(1), rec(2), rec(3), rec(4)), null),
2,
appender));
}
@Test
public void testAppendRecordsAtomically() {
TestAppender appender = new TestAppender();
assertEquals("Attempted to atomically commit 5 records, but maxRecordsPerBatch is 2",
assertThrows(IllegalStateException.class, () ->
QuorumController.appendRecords(log,
ControllerResult.atomicOf(Arrays.asList(rec(0), rec(1), rec(2), rec(3), rec(4)), null),
2,
appender)).getMessage());
}
@Test
public void testBootstrapZkMigrationRecord() throws Exception {
assertEquals(ZkMigrationState.PRE_MIGRATION,
checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_4_IV0, true));
assertEquals(ZkMigrationState.NONE,
checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_4_IV0, false));
assertEquals(ZkMigrationState.NONE,
checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_3_IV0, false));
assertEquals(
"The bootstrap metadata.version 3.3-IV0 does not support ZK migrations. Cannot continue with ZK migrations enabled.",
assertThrows(FaultHandlerException.class, () ->
checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_3_IV0, true)).getCause().getMessage()
);
}
public ZkMigrationState checkBootstrapZkMigrationRecord(
MetadataVersion metadataVersion,
boolean migrationEnabled
) throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setZkMigrationEnabled(migrationEnabled);
}).
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
build();
) {
QuorumController active = controlEnv.activeController();
return active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS);
}
}
@Test
public void testUpgradeMigrationStateFrom34() throws Exception {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()) {
// In 3.4, we only wrote a PRE_MIGRATION to the log. In that software version, we defined this
// as enum value 1. In 3.5+ software, this enum value is redefined as MIGRATION
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test");
List<ApiMessageAndVersion> initialRecords = new ArrayList<>(bootstrapMetadata.records());
initialRecords.add(ZkMigrationState.of((byte) 1).toRecord());
logEnv.appendInitialRecords(initialRecords);
try (
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setZkMigrationEnabled(true);
}).
setBootstrapMetadata(bootstrapMetadata).
build();
) {
QuorumController active = controlEnv.activeController();
assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION);
assertFalse(active.featureControl().inPreMigrationMode());
}
}
}
FeatureControlManager getActivationRecords(
MetadataVersion metadataVersion,
Optional<ZkMigrationState> stateInLog,
boolean zkMigrationEnabled
) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
.setMetadataVersion(metadataVersion)
.build();
stateInLog.ifPresent(zkMigrationState ->
featureControlManager.replay((ZkMigrationStateRecord) zkMigrationState.toRecord().message()));
List<ApiMessageAndVersion> records = QuorumController.generateActivationRecords(
log,
!stateInLog.isPresent(),
zkMigrationEnabled,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
featureControlManager);
RecordTestUtils.replayAll(featureControlManager, records);
return featureControlManager;
}
@Test
public void testActivationRecords33() {
FeatureControlManager featureControl;
assertEquals(
"The bootstrap metadata.version 3.3-IV0 does not support ZK migrations. Cannot continue with ZK migrations enabled.",
assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty(), true)).getMessage()
);
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty(), false);
assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
assertEquals(
"Should not have ZK migrations enabled on a cluster running metadata.version 3.3-IV0",
assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE), true)).getMessage()
);
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE), false);
assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
public void testActivationRecords34() {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), false);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
assertEquals(
"Should not have ZK migrations enabled on a cluster that was created in KRaft mode.",
assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE), true)).getMessage()
);
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE), false);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.PRE_MIGRATION), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.MIGRATION, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION), false);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION), false);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
}
@Test
public void testActivationRecordsNonEmptyLog() {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
}
@Test
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setZkMigrationEnabled(true);
}).
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV0, "test"));
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
QuorumController active = controlEnv.activeController();
assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
assertThrows(FaultHandlerException.class, controlEnv::close);
}
}
}