blob: a776e3fe10afcae41ce968a93178fa9cc324f0ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.junit.jupiter.api.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class MetadataVersionConfigValidatorTest {
private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(90)
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000);
void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) {
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
if (metadataVersion != null) {
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()));
}
MetadataImage image = delta.apply(TEST_PROVENANCE);
validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
}
}
@Test
void testValidatesConfigOnMetadataChange() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(config.brokerId()).thenReturn(8);
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verifyNoMoreInteractions(faultHandler);
}
@SuppressWarnings("ThrowableNotThrown")
@Test
void testInvokesFaultHandlerOnException() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
Exception exception = new Exception();
KafkaConfig config = mock(KafkaConfig.class);
FaultHandler faultHandler = mock(FaultHandler.class);
when(faultHandler.handleFault(any(), any())).thenReturn(new RuntimeException("returned exception"));
when(config.brokerId()).thenReturn(8);
willAnswer(invocation -> {
throw exception;
}).given(config).validateWithMetadataVersion(eq(metadataVersion));
testWith(metadataVersion, config, faultHandler);
verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion));
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
eq(exception));
}
}