blob: 28b22d5a77d525ea60d8aae684fe82b89a44644e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ClusterControlManagerTest {
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})
public void testReplay(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertFalse(clusterControl.isUnfenced(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1);
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
clusterControl.checkBrokerEpoch(1, 100);
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(1, 101));
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(2, 100));
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
UnfenceBrokerRecord unfenceBrokerRecord =
new UnfenceBrokerRecord().setId(1).setEpoch(100);
clusterControl.replay(unfenceBrokerRecord);
} else {
BrokerRegistrationChangeRecord changeRecord =
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(changeRecord);
}
assertFalse(clusterControl.isUnfenced(0));
assertTrue(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
FenceBrokerRecord fenceBrokerRecord =
new FenceBrokerRecord().setId(1).setEpoch(100);
clusterControl.replay(fenceBrokerRecord);
} else {
BrokerRegistrationChangeRecord changeRecord =
new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.FENCE.value());
clusterControl.replay(changeRecord);
}
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.isUnfenced(1));
}
@Test
public void testReplayRegisterBrokerRecord() {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setRack(null).
setFenced(true).
setInControlledShutdown(true);
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
brokerRecord.setInControlledShutdown(false);
clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
brokerRecord.setFenced(false);
clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
}
@Test
public void testReplayBrokerRegistrationChangeRecord() {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setRack(null).
setFenced(false);
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
BrokerRegistrationChangeRecord registrationChangeRecord = new BrokerRegistrationChangeRecord()
.setBrokerId(0)
.setBrokerEpoch(100)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
clusterControl.replay(registrationChangeRecord);
assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
registrationChangeRecord = new BrokerRegistrationChangeRecord()
.setBrokerId(0)
.setBrokerEpoch(100)
.setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(registrationChangeRecord);
assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
}
@Test
public void testRegistrationWithIncorrectClusterId() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertThrows(InconsistentClusterIdException.class, () ->
clusterControl.registerBroker(new BrokerRegistrationRequestData().
setClusterId("WIjw3grwRZmR2uOpdpVXbg").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
false));
}
private static Stream<Arguments> metadataVersions() {
return Stream.of(
MetadataVersion.IBP_3_3_IV2,
MetadataVersion.IBP_3_3_IV3,
MetadataVersion.IBP_3_7_IV2, // introduces directory assignment
MetadataVersion.latestTesting()
).map(Arguments::of);
}
@ParameterizedTest
@MethodSource("metadataVersions")
public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
List<Uuid> logDirs = metadataVersion.isDirectoryAssignmentSupported() ? asList(
Uuid.fromString("63k9SN1nQOS0dFHSCIMA0A"),
Uuid.fromString("Vm1MjsOCR1OjDDydOsDbzg")
) : Collections.emptyList();
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setLogDirs(logDirs).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
false);
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
assertEquals(
Collections.singletonList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(123L).
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setFenced(true).
setLogDirs(logDirs).
setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(Collections.singletonList(
new RegisterBrokerRecord.BrokerFeature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion((short) 1).
setMaxSupportedVersion((short) 1)).iterator())).
setInControlledShutdown(false), expectedVersion)),
result.records());
}
@Test
public void testUnregister() {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100).
setIncarnationId(Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w")).
setRack("arack");
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
clusterControl.replay(brokerRecord, 100L);
assertEquals(new BrokerRegistration.Builder().
setId(1).
setEpoch(100).
setIncarnationId(Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w")).
setListeners(Collections.singletonMap("PLAINTEXT",
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092))).
setRack(Optional.of("arack")).
setFenced(true).
setInControlledShutdown(false).build(),
clusterControl.brokerRegistrations().get(1));
assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100);
clusterControl.replay(unregisterRecord);
assertFalse(clusterControl.brokerRegistrations().containsKey(1));
assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent());
}
@ParameterizedTest
@ValueSource(ints = {3, 10})
public void testPlaceReplicas(int numUsableBrokers) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
RegisterBrokerRecord brokerRecord =
new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(i);
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
UnfenceBrokerRecord unfenceRecord =
new UnfenceBrokerRecord().setId(i).setEpoch(100);
clusterControl.replay(unfenceRecord);
clusterControl.heartbeatManager().touch(i, false, 0);
}
for (int i = 0; i < numUsableBrokers; i++) {
assertTrue(clusterControl.isUnfenced(i),
String.format("broker %d was not unfenced.", i));
}
for (int i = 0; i < 100; i++) {
List<PartitionAssignment> results = clusterControl.replicaPlacer().place(
new PlacementSpec(0,
1,
(short) 3),
new ClusterDescriber() {
@Override
public Iterator<UsableBroker> usableBrokers() {
return clusterControl.usableBrokers();
}
@Override
public Uuid defaultDir(int brokerId) {
return DirectoryId.UNASSIGNED;
}
}
).assignments();
HashSet<Integer> seen = new HashSet<>();
for (Integer result : results.get(0).replicas()) {
assertTrue(result >= 0);
assertTrue(result < numUsableBrokers);
assertTrue(seen.add(result));
}
}
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
public void testRegistrationsToRecords(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertFalse(clusterControl.isUnfenced(0));
for (int i = 0; i < 3; i++) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(i).setRack(null);
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + i).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
}
for (int i = 0; i < 2; i++) {
UnfenceBrokerRecord unfenceBrokerRecord =
new UnfenceBrokerRecord().setId(i).setEpoch(100);
clusterControl.replay(unfenceBrokerRecord);
}
BrokerRegistrationChangeRecord registrationChangeRecord =
new BrokerRegistrationChangeRecord().
setBrokerId(0).
setBrokerEpoch(100).
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.
IN_CONTROLLED_SHUTDOWN.value());
clusterControl.replay(registrationChangeRecord);
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setLossHandler(__ -> { }).
build();
assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(0).setRack(null).
setEndPoints(new BrokerEndpointCollection(Collections.singleton(
new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
setInControlledShutdown(metadataVersion.isInControlledShutdownStateSupported()).
setFenced(false), expectedVersion),
clusterControl.brokerRegistrations().get(0).toRecord(options));
assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(1).setRack(null).
setEndPoints(new BrokerEndpointCollection(Collections.singleton(
new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9093).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
setFenced(false), expectedVersion),
clusterControl.brokerRegistrations().get(1).toRecord(options));
assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(2).setRack(null).
setEndPoints(new BrokerEndpointCollection(Collections.singleton(
new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9094).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
setFenced(true), expectedVersion),
clusterControl.brokerRegistrations().get(2).toRecord(options));
}
@Test
public void testRegistrationWithUnsupportedFeature() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
Map<String, VersionRange> supportedFeatures = new HashMap<>();
supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_1_IV0.featureLevel(),
MetadataVersion.IBP_3_7_IV0.featureLevel()));
supportedFeatures.put(TestFeatureVersion.FEATURE_NAME, VersionRange.of(
TestFeatureVersion.TEST_0.featureLevel(),
TestFeatureVersion.TEST_1.featureLevel()));
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
FeatureLevelRecord testFeatureRecord = new FeatureLevelRecord().
setName(TestFeatureVersion.FEATURE_NAME).setFeatureLevel((short) 1);
featureControl.replay(testFeatureRecord);
List<Uuid> logDirs = asList(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"));
BrokerRegistrationRequestData baseRequest = new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setLogDirs(logDirs);
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"test.feature.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_1_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
}
@Test
public void testRegistrationWithUnsupportedKraftVersion() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
Map<String, VersionRange> supportedFeatures = new HashMap<>();
supportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_1_IV0.featureLevel(),
MetadataVersion.IBP_3_9_IV0.featureLevel()));
supportedFeatures.put(KRaftVersion.FEATURE_NAME, VersionRange.of(
KRaftVersion.KRAFT_VERSION_1.featureLevel(),
KRaftVersion.KRAFT_VERSION_1.featureLevel()));
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_9_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
List<Uuid> logDirs = asList(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"));
BrokerRegistrationRequestData baseRequest = new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setLogDirs(logDirs);
// quorum controller passes in the latest kraft version to populate finalized features
Map<String, Short> updatedFeaturesMap = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
updatedFeaturesMap.put(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel());
FinalizedControllerFeatures updatedFinalizedFeatures = new FinalizedControllerFeatures(updatedFeaturesMap, Long.MAX_VALUE);
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"kraft.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures,
false)).getMessage());
assertEquals("Unable to register because the broker does not support finalized version 1 of " +
"kraft.version. The broker wants a version between 0 and 0, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Arrays.asList(
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()),
new BrokerRegistrationRequestData.Feature().
setName(KRaftVersion.FEATURE_NAME).
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_0.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures,
false)).getMessage());
clusterControl.registerBroker(
baseRequest.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Arrays.asList(
new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_9_IV0.featureLevel()),
new BrokerRegistrationRequestData.Feature().
setName(KRaftVersion.FEATURE_NAME).
setMinSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel()).
setMaxSupportedVersion(KRaftVersion.KRAFT_VERSION_1.featureLevel())).iterator())),
123L,
updatedFinalizedFeatures,
false);
}
@Test
public void testRegistrationWithUnsupportedMetadataVersion() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
Collections.singletonMap(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_1_IV0.featureLevel(),
MetadataVersion.IBP_3_3_IV0.featureLevel())),
Collections.singletonList(0))).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertEquals("Unable to register because the broker does not support finalized version 4 of " +
"metadata.version. The broker wants a version between 1 and 1, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
assertEquals("Unable to register because the broker does not support finalized version 4 of " +
"metadata.version. The broker wants a version between 7 and 7, inclusive.",
assertThrows(UnsupportedVersionException.class,
() -> clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setBrokerId(0).
setRack(null).
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
}
@Test
public void testRegisterControlWithOlderMetadataVersion() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setFeatureControlManager(featureControl).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
assertEquals("The current MetadataVersion is too old to support controller registrations.",
assertThrows(UnsupportedVersionException.class, () -> clusterControl.registerController(
new ControllerRegistrationRequestData().setControllerId(1))).getMessage());
}
@Test
public void testRegisterWithDuplicateDirectoryId() {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("QzZZEtC7SxucRM29Xdzijw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(0).setLogDirs(asList(
Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"),
Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ")
));
brokerRecord.endPoints().add(new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort((short) 9092).setName("PLAINTEXT").setHost("127.0.0.1"));
clusterControl.replay(brokerRecord, 100L);
clusterControl.activate();
assertDoesNotThrow(() ->
registerNewBrokerWithDirs(clusterControl, 0, asList(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))),
"it should be possible to re-register the same broker with the same directories"
);
assertEquals("No directories specified in request", assertThrows(InvalidRegistrationException.class, () ->
registerNewBrokerWithDirs(clusterControl, 1, Collections.emptyList())
).getMessage());
assertEquals("Broker 0 is already registered with directory Mj3CW3OSRi29cFeNJlXuAQ", assertThrows(InvalidRegistrationException.class, () ->
registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"), Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ")))
).getMessage());
assertEquals("Reserved directory ID in request", assertThrows(InvalidRegistrationException.class, () ->
registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"), DirectoryId.UNASSIGNED))
).getMessage());
assertEquals("Duplicate directory ID in request", assertThrows(InvalidRegistrationException.class, () ->
registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("aR6lssMrSeyXRf65hiUovQ"), Uuid.fromString("aR6lssMrSeyXRf65hiUovQ")))
).getMessage());
}
void registerNewBrokerWithDirs(ClusterControlManager clusterControl, int brokerId, List<Uuid> dirs) {
BrokerRegistrationRequestData data = new BrokerRegistrationRequestData().setBrokerId(brokerId)
.setClusterId(clusterControl.clusterId())
.setIncarnationId(new Uuid(brokerId, brokerId))
.setLogDirs(dirs);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(Collections.emptyMap(), 456L);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures, false);
RecordTestUtils.replayAll(clusterControl, result.records());
}
@Test
public void testHasOnlineDir() {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
registerNewBrokerWithDirs(clusterControl, 1, asList(Uuid.fromString("dir1SEbpRuG1dcpTRGOvJw"), Uuid.fromString("dir2xaEwR2m3JHTiy7PWwA")));
assertTrue(clusterControl.registration(1).hasOnlineDir(Uuid.fromString("dir1SEbpRuG1dcpTRGOvJw")));
assertTrue(clusterControl.hasOnlineDir(1, Uuid.fromString("dir1SEbpRuG1dcpTRGOvJw")));
assertTrue(clusterControl.hasOnlineDir(1, Uuid.fromString("dir2xaEwR2m3JHTiy7PWwA")));
assertTrue(clusterControl.hasOnlineDir(1, DirectoryId.UNASSIGNED));
assertTrue(clusterControl.hasOnlineDir(1, DirectoryId.MIGRATING));
assertFalse(clusterControl.hasOnlineDir(1, Uuid.fromString("otherAA1QFK4U1GWzkjZ5A")));
assertFalse(clusterControl.hasOnlineDir(77, Uuid.fromString("8xVRVs6UQHGVonA9SRYseQ")));
assertFalse(clusterControl.hasOnlineDir(1, DirectoryId.LOST));
}
@Test
public void testDefaultDir() {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1).setLogDirs(Collections.emptyList());
brokerRecord.endPoints().add(new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort((short) 9092).setName("PLAINTEXT").setHost("127.0.0.1"));
clusterControl.replay(brokerRecord, 100L);
registerNewBrokerWithDirs(clusterControl, 2, Collections.singletonList(Uuid.fromString("singleOnlineDirectoryA")));
registerNewBrokerWithDirs(clusterControl, 3, asList(Uuid.fromString("s4fRmyNFSH6J0vI8AVA5ew"), Uuid.fromString("UbtxBcqYSnKUEMcnTyZFWw")));
assertEquals(DirectoryId.MIGRATING, clusterControl.defaultDir(1));
assertEquals(Uuid.fromString("singleOnlineDirectoryA"), clusterControl.defaultDir(2));
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(3));
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(4));
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
build();
clusterControl.activate();
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
false).
records());
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(newIncarnationId ?
Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww") : Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
false).
records());
if (newIncarnationId) {
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(111,
clusterControl.brokerRegistrations().get(1).epoch());
} else {
assertEquals(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(100,
clusterControl.brokerRegistrations().get(1).epoch());
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReRegistrationWithCleanShutdownDetection(boolean isCleanShutdown) {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> {
if (!cleanShutdown) {
records.add(new ApiMessageAndVersion(new PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION));
}
}).
build();
clusterControl.activate();
List<ApiMessageAndVersion> records = clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
true).
records();
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(1).setBrokerEpoch(100).
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
(short) 1));
RecordTestUtils.replayAll(clusterControl, records);
records = clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")).
setPreviousBrokerEpoch(isCleanShutdown ? 100 : 10).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
true).records();
RecordTestUtils.replayAll(clusterControl, records);
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertFalse(clusterControl.brokerRegistrations().get(1).inControlledShutdown());
assertEquals(111, clusterControl.brokerRegistrations().get(1).epoch());
if (isCleanShutdown) {
assertEquals(1, records.size());
} else {
assertEquals(2, records.size());
}
}
@Test
public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
MockTime time = new MockTime(0L, 20L, 1000L);
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
setTime(time).
build();
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setLogDirs(asList(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))), 10002);
clusterControl.replay(new RegisterBrokerRecord().
setBrokerEpoch(123).
setBrokerId(1).
setLogDirs(asList(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))), 10005);
clusterControl.activate();
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(0, 100)));
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(1, 123)));
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(1, 124)));
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(2, 100)));
}
}