test
diff --git a/core/src/main/java/kafka/server/ServerSocketFactory.java b/core/src/main/java/kafka/server/ServerSocketFactory.java
new file mode 100644
index 0000000..427ab18
--- /dev/null
+++ b/core/src/main/java/kafka/server/ServerSocketFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerSocketFactory {
+ ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException;
+
+ class KafkaServerSocketFactory implements ServerSocketFactory {
+ public static final KafkaServerSocketFactory INSTANCE = new KafkaServerSocketFactory();
+
+ @Override
+ public ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException {
+ ServerSocketChannel socketChannel = null;
+ try {
+ socketChannel = ServerSocketChannel.open();
+ socketChannel.configureBlocking(false);
+ if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
+ socketChannel.socket().setReceiveBufferSize(recvBufferSize);
+ }
+ socketChannel.socket().bind(socketAddress, listenBacklogSize);
+ } catch (SocketException e) {
+ Utils.closeQuietly(socketChannel, "server socket");
+ throw new KafkaException(String.format("Socket server failed to bind to %s:%d: %s.",
+ socketAddress.getHostString(), socketAddress.getPort(), e.getMessage()), e);
+ }
+ return socketChannel;
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d854510..d65eaa3 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,7 +29,8 @@
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
-import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
+import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig, ServerSocketFactory}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
@@ -74,12 +75,14 @@
* Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
* 1 Handler thread that handles requests and produces responses back to the processor thread for writing.
*/
-class SocketServer(val config: KafkaConfig,
- val metrics: Metrics,
- val time: Time,
- val credentialProvider: CredentialProvider,
- val apiVersionManager: ApiVersionManager)
- extends Logging with BrokerReconfigurable {
+class SocketServer(
+ val config: KafkaConfig,
+ val metrics: Metrics,
+ val time: Time,
+ val credentialProvider: CredentialProvider,
+ val apiVersionManager: ApiVersionManager,
+ val socketFactory: ServerSocketFactory = KafkaServerSocketFactory.INSTANCE
+) extends Logging with BrokerReconfigurable {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -720,23 +723,17 @@
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int, listenBacklogSize: Int): ServerSocketChannel = {
- val socketAddress =
- if (Utils.isBlank(host))
- new InetSocketAddress(port)
- else
- new InetSocketAddress(host, port)
- val serverChannel = ServerSocketChannel.open()
- try {
- serverChannel.configureBlocking(false)
- if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
- serverChannel.socket().setReceiveBufferSize(recvBufferSize)
- serverChannel.socket.bind(socketAddress, listenBacklogSize)
- info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
- } catch {
- case e: SocketException =>
- Utils.closeQuietly(serverChannel, "server socket")
- throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
+ val socketAddress = if (Utils.isBlank(host)) {
+ new InetSocketAddress(port)
+ } else {
+ new InetSocketAddress(host, port)
}
+ val serverChannel = socketServer.socketFactory.openServerSocket(
+ endPoint.listenerName.value(),
+ socketAddress,
+ listenBacklogSize,
+ recvBufferSize)
+ info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
serverChannel
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 08bf559..b31ce8a 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -257,7 +257,12 @@
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
- socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
+ socketServer = new SocketServer(config,
+ metrics,
+ time,
+ credentialProvider,
+ apiVersionManager,
+ sharedServer.socketFactory)
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 1008fe2..017499e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -185,7 +185,8 @@
metrics,
time,
credentialProvider,
- apiVersionManager)
+ apiVersionManager,
+ sharedServer.socketFactory)
val listenerInfo = ListenerInfo
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index dbc6f76..43738ec 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.metrics.KafkaMetricsReporter
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
@@ -73,6 +74,7 @@
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
new StandardFaultHandlerFactory(),
+ KafkaServerSocketFactory.INSTANCE,
)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(ProcessRole.BrokerRole)) {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 4a675ee..913a09d 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -99,7 +99,8 @@
private val _metrics: Metrics,
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
val bootstrapServers: JCollection[InetSocketAddress],
- val faultHandlerFactory: FaultHandlerFactory
+ val faultHandlerFactory: FaultHandlerFactory,
+ val socketFactory: ServerSocketFactory
) extends Logging {
private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
this.logIdent = logContext.logPrefix
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index f8353df..832d1e3 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -550,8 +550,10 @@
override def features(): FinalizedFeatures = {
val image = _currentImage
val finalizedFeatures = new java.util.HashMap[String, java.lang.Short](image.features().finalizedVersions())
- finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionSupplier.get().featureLevel())
-
+ val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
+ if (kraftVersionLevel > 0) {
+ finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
+ }
new FinalizedFeatures(image.features().metadataVersion(),
finalizedFeatures,
image.highestOffsetAndEpoch().offset,
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f7cc416..ecd63ea 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -29,20 +29,15 @@
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
-import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -246,26 +241,16 @@
public void format() throws Exception {
if (formated.compareAndSet(false, true)) {
- List<ApiMessageAndVersion> records = new ArrayList<>();
- records.add(
- new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(clusterConfig.metadataVersion().featureLevel()), (short) 0));
-
- clusterConfig.features().forEach((feature, version) -> {
- records.add(
- new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(feature.featureName()).
- setFeatureLevel(version), (short) 0));
- });
-
- TestKitNodes nodes = new TestKitNodes.Builder()
- .setBootstrapMetadata(BootstrapMetadata.fromRecords(records, "testkit"))
+ TestKitNodes.Builder nodesBuilder = new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(clusterConfig.metadataVersion())
.setCombined(isCombined)
.setNumBrokerNodes(clusterConfig.numBrokers())
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
- .setNumControllerNodes(clusterConfig.numControllers()).build();
+ .setNumControllerNodes(clusterConfig.numControllers());
+ clusterConfig.features().forEach((feature, version) ->
+ nodesBuilder.setFeature(feature.featureName(), version));
+ TestKitNodes nodes = nodesBuilder.build();
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
if (Boolean.parseBoolean(clusterConfig.serverProperties()
.getOrDefault("zookeeper.metadata.migration.enable", "false"))) {
diff --git a/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java b/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 0000000..9f5b003
--- /dev/null
+++ b/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.server;
+
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.QuorumInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReconfigurableQuorumIntegrationTest {
+ static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
+ FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
+ if (finalized > 0) {
+ assertTrue(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+ assertEquals(finalized, featureMetadata.finalizedFeatures().
+ get(KRaftVersion.FEATURE_NAME).minVersionLevel());
+ assertEquals(finalized, featureMetadata.finalizedFeatures().
+ get(KRaftVersion.FEATURE_NAME).maxVersionLevel());
+ } else {
+ assertFalse(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+ }
+ assertEquals((short) 0, featureMetadata.supportedFeatures().
+ get(KRaftVersion.FEATURE_NAME).minVersion());
+ assertEquals((short) 1, featureMetadata.supportedFeatures().
+ get(KRaftVersion.FEATURE_NAME).maxVersion());
+ }
+
+ @Test
+ public void testCreateAndDestroyNonReconfigurableCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ checkKRaftVersions(admin, (short) 0);
+ });
+ }
+ }
+ }
+
+ @Test
+ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).
+ setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ checkKRaftVersions(admin, (short) 1);
+ });
+ }
+ }
+ }
+
+ static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
+ QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
+ Map<Integer, Uuid> result = new TreeMap<>();
+ quorumInfo.voters().forEach(v -> {
+ result.put(v.replicaId(), v.replicaDirectoryId());
+ });
+ return result;
+ }
+
+ @Test
+ public void testRemoveController() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(new HashSet<>(Arrays.asList(3000, 3001, 3002)), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+ assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+ }
+ });
+ admin.removeRaftVoter(3000, cluster.nodes().
+ controllerNodes().get(3000).metadataUuid()).all().get();
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 615ffa2..c12e3cf 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -34,12 +34,13 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
-import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.fault.FaultHandler;
@@ -52,6 +53,7 @@
import org.slf4j.event.Level;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -151,6 +153,7 @@
private TestKitNodes nodes;
private final Map<String, Object> configProps = new HashMap<>();
private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
+ private final PreboundSocketFactoryManager socketFactoryManager = new PreboundSocketFactoryManager();
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
@@ -161,7 +164,7 @@
return this;
}
- private KafkaConfig createNodeConfig(TestKitNode node) {
+ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
@@ -198,13 +201,18 @@
nodes.interBrokerListenerName().value());
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
- // Note: we can't accurately set controller.quorum.voters yet, since we don't
- // yet know what ports each controller will pick. Set it to a dummy string
- // for now as a placeholder.
- String uninitializedQuorumVotersString = nodes.controllerNodes().keySet().stream().
- map(n -> String.format("%d@0.0.0.0:0", n)).
- collect(Collectors.joining(","));
- props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
+ StringBuilder quorumVoterStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ quorumVoterStringBuilder.append(prefix).
+ append(nodeId).
+ append("@").
+ append("localhost").
+ append(":").
+ append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
// reduce log cleaner offset map memory usage
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -233,6 +241,9 @@
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
+ socketFactoryManager.getOrCreatePortForListener(node.id(), "CONTROLLER");
+ }
+ for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
SharedServer sharedServer = new SharedServer(
createNodeConfig(node),
@@ -241,7 +252,8 @@
new Metrics(),
connectFutureManager.future,
Collections.emptyList(),
- faultHandlerFactory
+ faultHandlerFactory,
+ socketFactoryManager.getOrCreateSocketFactory(node.id())
);
ControllerServer controller = null;
try {
@@ -265,18 +277,20 @@
jointServers.put(node.id(), sharedServer);
}
for (TestKitNode node : nodes.brokerNodes().values()) {
- SharedServer sharedServer = jointServers.computeIfAbsent(
- node.id(),
- id -> new SharedServer(
+ SharedServer sharedServer = jointServers.get(node.id());
+ if (sharedServer == null) {
+ sharedServer = new SharedServer(
createNodeConfig(node),
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
connectFutureManager.future,
Collections.emptyList(),
- faultHandlerFactory
- )
- );
+ faultHandlerFactory,
+ socketFactoryManager.getOrCreateSocketFactory(node.id())
+ );
+ jointServers.put(node.id(), sharedServer);
+ }
BrokerServer broker = null;
try {
broker = new BrokerServer(sharedServer);
@@ -298,6 +312,7 @@
if (baseDirectory != null) {
Utils.delete(baseDirectory);
}
+ socketFactoryManager.close();
throw e;
}
return new KafkaClusterTestKit(
@@ -306,7 +321,8 @@
brokers,
connectFutureManager,
baseDirectory,
- faultHandlerFactory);
+ faultHandlerFactory,
+ socketFactoryManager);
}
private String listeners(int node) {
@@ -349,6 +365,7 @@
private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
+ private final PreboundSocketFactoryManager socketFactoryManager;
private KafkaClusterTestKit(
TestKitNodes nodes,
@@ -356,7 +373,8 @@
Map<Integer, BrokerServer> brokers,
ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
File baseDirectory,
- SimpleFaultHandlerFactory faultHandlerFactory
+ SimpleFaultHandlerFactory faultHandlerFactory,
+ PreboundSocketFactoryManager socketFactoryManager
) {
/*
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
@@ -371,6 +389,7 @@
this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
this.baseDirectory = baseDirectory;
this.faultHandlerFactory = faultHandlerFactory;
+ this.socketFactoryManager = socketFactoryManager;
}
public void format() throws Exception {
@@ -404,22 +423,44 @@
boolean writeMetadataDirectory
) {
try {
- MetaPropertiesEnsemble.Copier copier =
- new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
- for (Entry<String, MetaProperties> entry : ensemble.logDirProps().entrySet()) {
- String logDir = entry.getKey();
- if (writeMetadataDirectory || (!ensemble.metadataLogDir().equals(Optional.of(logDir)))) {
- log.trace("Adding {} to the list of directories to format.", logDir);
- copier.setLogDirProps(logDir, entry.getValue());
- }
+ Formatter formatter = new Formatter();
+ formatter.setNodeId(ensemble.nodeId().getAsInt());
+ formatter.setClusterId(ensemble.clusterId().get());
+ if (writeMetadataDirectory) {
+ formatter.setDirectories(ensemble.logDirProps().keySet());
+ } else {
+ formatter.setDirectories(ensemble.logDirProps().keySet().stream().
+ filter(d -> !ensemble.metadataLogDir().get().equals(d)).
+ collect(Collectors.toSet()));
}
- copier.setPreWriteHandler((logDir, isNew, metaProperties) -> {
- log.info("Formatting {}.", logDir);
- Files.createDirectories(Paths.get(logDir));
- BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty());
- bootstrapDirectory.writeBinaryFile(nodes.bootstrapMetadata());
- });
- copier.writeLogDirChanges();
+ if (formatter.directories().isEmpty()) {
+ return;
+ }
+ formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+ formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
+ nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
+ formatter.setUnstableFeatureVersionsEnabled(true);
+ formatter.setIgnoreFormatted(false);
+ formatter.setControllerListenerName("CONTROLLER");
+ if (writeMetadataDirectory) {
+ formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+ } else {
+ formatter.setMetadataLogDirectory(Optional.empty());
+ }
+ if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
+ StringBuilder dynamicVotersBuilder = new StringBuilder();
+ String prefix = "";
+ for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
+ int port = socketFactoryManager.
+ getOrCreatePortForListener(controllerNode.id(), "CONTROLLER");
+ dynamicVotersBuilder.append(prefix);
+ prefix = ",";
+ dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+ controllerNode.id(), port, controllerNode.metadataUuid()));
+ }
+ formatter.setInitialVoters(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ }
+ formatter.run();
} catch (Exception e) {
throw new RuntimeException("Failed to format node " + ensemble.nodeId(), e);
}
@@ -636,6 +677,7 @@
throw e;
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5, TimeUnit.MINUTES);
+ socketFactoryManager.close();
}
waitForAllThreads();
faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
diff --git a/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java b/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java
new file mode 100644
index 0000000..673699c
--- /dev/null
+++ b/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.ServerSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class PreboundSocketFactoryManager implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(PreboundSocketFactoryManager.class);
+
+ private class PreboundSocketFactory implements ServerSocketFactory {
+ private final int nodeId;
+
+ private PreboundSocketFactory(int nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException {
+ ServerSocketChannel socketChannel = getSocketForListenerAndMarkAsUsed(
+ nodeId,
+ listenerName);
+ if (socketChannel != null) {
+ return socketChannel;
+ }
+ return KafkaServerSocketFactory.INSTANCE.openServerSocket(
+ listenerName,
+ socketAddress,
+ listenBacklogSize,
+ recvBufferSize);
+ }
+ }
+
+ /**
+ * True if this manager is closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Maps node IDs to socket factory objects.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, PreboundSocketFactory> factories = new HashMap<>();
+
+ /**
+ * Maps node IDs to maps of listener names to ports.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, Map<String, ServerSocketChannel>> sockets = new HashMap<>();
+
+ /**
+ * Maps node IDs to set of the listeners that were used.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, Set<String>> usedSockets = new HashMap<>();
+
+ /**
+ * Get a socket from this manager, mark it as used, and return it.
+ *
+ * @param nodeId The ID of the node.
+ * @param listener The listener for the socket.
+ *
+ * @return null if the socket was not found; the socket, otherwise.
+ */
+ public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
+ int nodeId,
+ String listener
+ ) {
+ Map<String, ServerSocketChannel> socketsForNode = sockets.get(nodeId);
+ if (socketsForNode == null) {
+ return null;
+ }
+ ServerSocketChannel socket = socketsForNode.get(listener);
+ if (socket == null) {
+ return null;
+ }
+ usedSockets.computeIfAbsent(nodeId, __ -> new HashSet<>()).add(listener);
+ return socket;
+ }
+
+ /**
+ * Get or create a socket factory object associated with a given node ID.
+ *
+ * @param nodeId The ID of the node.
+ *
+ * @return The socket factory.
+ */
+ public synchronized PreboundSocketFactory getOrCreateSocketFactory(int nodeId) {
+ return factories.computeIfAbsent(nodeId, __ -> new PreboundSocketFactory(nodeId));
+ }
+
+ /**
+ * Get a specific port number. The port will be created if it does not already exist.
+ *
+ * @param nodeId The ID of the node.
+ * @param listener The listener for the socket.
+ *
+ * @return The port number.
+ */
+ public synchronized int getOrCreatePortForListener(
+ int nodeId,
+ String listener
+ ) throws IOException {
+ Map<String, ServerSocketChannel> socketsForNode =
+ sockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
+ ServerSocketChannel socketChannel = socketsForNode.get(listener);
+ if (socketChannel == null) {
+ if (closed) {
+ throw new RuntimeException("Cannot open new socket: manager is closed.");
+ }
+ socketChannel = ServerSocketFactory.KafkaServerSocketFactory.INSTANCE.openServerSocket(
+ listener,
+ new InetSocketAddress(0),
+ -1,
+ -1);
+ socketsForNode.put(listener, socketChannel);
+ }
+ InetSocketAddress socketAddress = (InetSocketAddress) socketChannel.getLocalAddress();
+ return socketAddress.getPort();
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ // Close all sockets that haven't been used by a SocketServer. (We don't want to close the
+ // ones that have been used by a SocketServer because that is the responsibility of that
+ // SocketServer.)
+ for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : sockets.entrySet()) {
+ Set<String> usedListeners = usedSockets.getOrDefault(
+ socketsEntry.getKey(), Collections.emptySet());
+ for (Entry<String, ServerSocketChannel> entry : socketsEntry.getValue().entrySet()) {
+ if (!usedListeners.contains(entry.getKey())) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ LOG.error("Error closing socket", e);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/kafka/testkit/TestKitNode.java b/core/src/test/java/kafka/testkit/TestKitNode.java
index 2293c91..78f785e 100644
--- a/core/src/test/java/kafka/testkit/TestKitNode.java
+++ b/core/src/test/java/kafka/testkit/TestKitNode.java
@@ -17,6 +17,8 @@
package kafka.testkit;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import java.util.Map;
@@ -35,6 +37,18 @@
return initialMetaPropertiesEnsemble().logDirProps().keySet();
}
+ default Uuid metadataUuid() {
+ MetaProperties props = initialMetaPropertiesEnsemble().logDirProps().get(
+ initialMetaPropertiesEnsemble().metadataLogDir().get());
+ return props.directoryId().get();
+ }
+
+ default String voterString(int port) {
+ MetaProperties props = initialMetaPropertiesEnsemble().logDirProps().get(
+ initialMetaPropertiesEnsemble().metadataLogDir().get());
+ return String.format("%d@localhost:%d:%s", id(), port, props.directoryId().get());
+ }
+
MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
Map<String, String> propertyOverrides();
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index b716f66..3bc0b6c 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -51,8 +51,15 @@
private int numBrokerNodes;
private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
- private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
- fromVersion(MetadataVersion.latestTesting(), "testkit");
+ private BootstrapMetadata bootstrapMetadata;
+
+ public Builder() {
+ this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+ }
+
+ public Builder(BootstrapMetadata bootstrapMetadata) {
+ this.bootstrapMetadata = bootstrapMetadata;
+ }
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
@@ -60,12 +67,13 @@
}
public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
- this.bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "testkit");
+ this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
+ MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
return this;
}
- public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
- this.bootstrapMetadata = bootstrapMetadata;
+ public Builder setFeature(String featureName, short level) {
+ this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level);
return this;
}
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e74ced..dee3cc7 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1389,8 +1389,7 @@
setName("num.io.threads").
setValue("9"), 0.toShort))
val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
+ new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).
build()
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 969e7e7..ae7f04f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -17,6 +17,8 @@
package kafka.server
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
+
import java.io.File
import java.net.InetSocketAddress
import java.util
@@ -129,7 +131,8 @@
new Metrics(),
controllerQuorumVotersFuture,
controllerQuorumVotersFuture.get().values(),
- faultHandlerFactory
+ faultHandlerFactory,
+ KafkaServerSocketFactory.INSTANCE,
)
var broker: BrokerServer = null
try {
@@ -369,7 +372,8 @@
new Metrics(),
controllerQuorumVotersFuture,
Collections.emptyList(),
- faultHandlerFactory
+ faultHandlerFactory,
+ KafkaServerSocketFactory.INSTANCE,
)
var controllerServer: ControllerServer = null
try {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 1de6a1f..2dc6d9a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -134,19 +134,43 @@
return source;
}
- public BootstrapMetadata copyWithOnlyVersion() {
- ApiMessageAndVersion versionRecord = null;
+ public short featureLevel(String featureName) {
+ short result = 0;
for (ApiMessageAndVersion record : records) {
- if (recordToMetadataVersion(record.message()).isPresent()) {
- versionRecord = record;
+ if (record.message() instanceof FeatureLevelRecord) {
+ FeatureLevelRecord message = (FeatureLevelRecord) record.message();
+ if (message.name().equals(featureName)) {
+ result = message.featureLevel();
+ }
}
}
- if (versionRecord == null) {
- throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME +
- " was found in " + source);
+ return result;
+ }
+
+ public BootstrapMetadata copyWithFeatureRecord(String featureName, short level) {
+ List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+ int i = 0;
+ while (i < records.size()) {
+ if (records.get(i).message() instanceof FeatureLevelRecord) {
+ FeatureLevelRecord record = (FeatureLevelRecord) records.get(i).message();
+ if (record.name().equals(featureName)) {
+ FeatureLevelRecord newRecord = record.duplicate();
+ newRecord.setFeatureLevel(level);
+ newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
+ break;
+ } else {
+ newRecords.add(records.get(i));
+ }
+ }
+ i++;
}
- return new BootstrapMetadata(Collections.singletonList(versionRecord),
- metadataVersion, source);
+ if (i == records.size()) {
+ FeatureLevelRecord newRecord = new FeatureLevelRecord().
+ setName(featureName).
+ setFeatureLevel(level);
+ newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
+ }
+ return BootstrapMetadata.fromRecords(newRecords, source);
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 72995fb..952ef85 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -125,7 +125,7 @@
/**
* The metadata log directory.
*/
- private String metadataLogDirectory = null;
+ private Optional<String> metadataLogDirectory = null;
/**
* The initial KIP-853 voters.
@@ -167,6 +167,10 @@
return this;
}
+ public Collection<String> directories() {
+ return directories;
+ }
+
public Formatter setReleaseVersion(MetadataVersion releaseVersion) {
this.releaseVersion = releaseVersion;
return this;
@@ -198,6 +202,11 @@
}
public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
+ this.metadataLogDirectory = Optional.of(metadataLogDirectory);
+ return this;
+ }
+
+ public Formatter setMetadataLogDirectory(Optional<String> metadataLogDirectory) {
this.metadataLogDirectory = metadataLogDirectory;
return this;
}
@@ -234,10 +243,12 @@
if (metadataLogDirectory == null) {
throw new FormatterException("You must specify the metadata log directory.");
}
- if (!directories.contains(metadataLogDirectory)) {
- throw new FormatterException("The specified metadata log directory, " + metadataLogDirectory +
+ metadataLogDirectory.ifPresent(d -> {
+ if (!directories.contains(d)) {
+ throw new FormatterException("The specified metadata log directory, " + d +
" was not one of the given directories: " + directories);
- }
+ }
+ });
releaseVersion = calculateEffectiveReleaseVersion();
featureLevels = calculateEffectiveFeatureLevels();
this.bootstrapMetadata = calculateBootstrapMetadata();
@@ -401,7 +412,7 @@
Map<String, DirectoryType> directoryTypes = new HashMap<>();
for (String emptyLogDir : ensemble.emptyLogDirs()) {
DirectoryType directoryType = DirectoryType.calculate(emptyLogDir,
- metadataLogDirectory,
+ metadataLogDirectory.orElseGet(() -> ""),
nodeId,
initialControllers);
directoryTypes.put(emptyLogDir, directoryType);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 5118941..edd75c5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -24,6 +24,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -71,10 +72,50 @@
() -> BootstrapMetadata.fromRecords(emptyList(), "quux")).getMessage());
}
+ private static final ApiMessageAndVersion MV_10 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(FEATURE_NAME).
+ setFeatureLevel((short) 10), (short) 0);
+
+ private static final ApiMessageAndVersion MV_11 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(FEATURE_NAME).
+ setFeatureLevel((short) 11), (short) 0);
+
+ private static final ApiMessageAndVersion FOO_1 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("foo").
+ setFeatureLevel((short) 1), (short) 0);
+
+ private static final ApiMessageAndVersion FOO_2 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("foo").
+ setFeatureLevel((short) 2), (short) 0);
+
@Test
- public void testCopyWithOnlyVersion() {
- assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), IBP_3_3_IV2, "baz"),
- BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "baz").copyWithOnlyVersion());
+ public void testCopyWithNewFeatureRecord() {
+ assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src"),
+ BootstrapMetadata.fromRecords(Arrays.asList(MV_10), "src").
+ copyWithFeatureRecord("foo", (short) 1));
+ }
+
+ @Test
+ public void testCopyWithModifiedFeatureRecord() {
+ assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_2), "src"),
+ BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src").
+ copyWithFeatureRecord("foo", (short) 2));
+ }
+
+ @Test
+ public void testFeatureLevelForFeatureThatIsNotSet() {
+ assertEquals((short) 0, BootstrapMetadata.
+ fromRecords(Arrays.asList(MV_10), "src").featureLevel("foo"));
+ }
+
+ @Test
+ public void testFeatureLevelForFeature() {
+ assertEquals((short) 2, BootstrapMetadata.
+ fromRecords(Arrays.asList(MV_10, FOO_2), "src").featureLevel("foo"));
}
static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = Collections.singletonList(