test
diff --git a/core/src/main/java/kafka/server/ServerSocketFactory.java b/core/src/main/java/kafka/server/ServerSocketFactory.java
new file mode 100644
index 0000000..427ab18
--- /dev/null
+++ b/core/src/main/java/kafka/server/ServerSocketFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerSocketFactory {
+    ServerSocketChannel openServerSocket(
+        String listenerName,
+        InetSocketAddress socketAddress,
+        int listenBacklogSize,
+        int recvBufferSize
+    ) throws IOException;
+
+    class KafkaServerSocketFactory implements ServerSocketFactory {
+        public static final KafkaServerSocketFactory INSTANCE = new KafkaServerSocketFactory();
+
+        @Override
+        public ServerSocketChannel openServerSocket(
+                String listenerName,
+                InetSocketAddress socketAddress,
+                int listenBacklogSize,
+                int recvBufferSize
+        ) throws IOException {
+            ServerSocketChannel socketChannel = null;
+            try {
+                socketChannel = ServerSocketChannel.open();
+                socketChannel.configureBlocking(false);
+                if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
+                    socketChannel.socket().setReceiveBufferSize(recvBufferSize);
+                }
+                socketChannel.socket().bind(socketAddress, listenBacklogSize);
+            } catch (SocketException e) {
+                Utils.closeQuietly(socketChannel, "server socket");
+                throw new KafkaException(String.format("Socket server failed to bind to %s:%d: %s.",
+                    socketAddress.getHostString(), socketAddress.getPort(), e.getMessage()), e);
+            }
+            return socketChannel;
+        }
+    }
+}
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d854510..d65eaa3 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,7 +29,8 @@
 import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.network.SocketServer._
-import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
+import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig, ServerSocketFactory}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import kafka.utils._
 import org.apache.kafka.common.config.ConfigException
@@ -74,12 +75,14 @@
  *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
  *      1 Handler thread that handles requests and produces responses back to the processor thread for writing.
  */
-class SocketServer(val config: KafkaConfig,
-                   val metrics: Metrics,
-                   val time: Time,
-                   val credentialProvider: CredentialProvider,
-                   val apiVersionManager: ApiVersionManager)
-  extends Logging with BrokerReconfigurable {
+class SocketServer(
+  val config: KafkaConfig,
+  val metrics: Metrics,
+  val time: Time,
+  val credentialProvider: CredentialProvider,
+  val apiVersionManager: ApiVersionManager,
+  val socketFactory: ServerSocketFactory = KafkaServerSocketFactory.INSTANCE
+) extends Logging with BrokerReconfigurable {
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -720,23 +723,17 @@
    * Create a server socket to listen for connections on.
    */
   private def openServerSocket(host: String, port: Int, listenBacklogSize: Int): ServerSocketChannel = {
-    val socketAddress =
-      if (Utils.isBlank(host))
-        new InetSocketAddress(port)
-      else
-        new InetSocketAddress(host, port)
-    val serverChannel = ServerSocketChannel.open()
-    try {
-      serverChannel.configureBlocking(false)
-      if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
-        serverChannel.socket().setReceiveBufferSize(recvBufferSize)
-      serverChannel.socket.bind(socketAddress, listenBacklogSize)
-      info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
-    } catch {
-      case e: SocketException =>
-        Utils.closeQuietly(serverChannel, "server socket")
-        throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
+    val socketAddress = if (Utils.isBlank(host)) {
+      new InetSocketAddress(port)
+    } else {
+      new InetSocketAddress(host, port)
     }
+    val serverChannel = socketServer.socketFactory.openServerSocket(
+      endPoint.listenerName.value(),
+      socketAddress,
+      listenBacklogSize,
+      recvBufferSize)
+    info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
     serverChannel
   }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 08bf559..b31ce8a 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -257,7 +257,12 @@
       // Create and start the socket server acceptor threads so that the bound port is known.
       // Delay starting processors until the end of the initialization sequence to ensure
       // that credentials have been loaded before processing authentications.
-      socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
+      socketServer = new SocketServer(config,
+        metrics,
+        time,
+        credentialProvider,
+        apiVersionManager,
+        sharedServer.socketFactory)
 
       clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
 
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 1008fe2..017499e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -185,7 +185,8 @@
         metrics,
         time,
         credentialProvider,
-        apiVersionManager)
+        apiVersionManager,
+        sharedServer.socketFactory)
 
       val listenerInfo = ListenerInfo
         .create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index dbc6f76..43738ec 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -20,6 +20,7 @@
 import java.util.concurrent.CompletableFuture
 import kafka.log.UnifiedLog
 import kafka.metrics.KafkaMetricsReporter
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
@@ -73,6 +74,7 @@
     CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
     QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
     new StandardFaultHandlerFactory(),
+    KafkaServerSocketFactory.INSTANCE,
   )
 
   private val broker: Option[BrokerServer] = if (config.processRoles.contains(ProcessRole.BrokerRole)) {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 4a675ee..913a09d 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -99,7 +99,8 @@
   private val _metrics: Metrics,
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
   val bootstrapServers: JCollection[InetSocketAddress],
-  val faultHandlerFactory: FaultHandlerFactory
+  val faultHandlerFactory: FaultHandlerFactory,
+  val socketFactory: ServerSocketFactory
 ) extends Logging {
   private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
   this.logIdent = logContext.logPrefix
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index f8353df..832d1e3 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -550,8 +550,10 @@
   override def features(): FinalizedFeatures = {
     val image = _currentImage
     val finalizedFeatures = new java.util.HashMap[String, java.lang.Short](image.features().finalizedVersions())
-    finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionSupplier.get().featureLevel())
-
+    val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
+    if (kraftVersionLevel > 0) {
+      finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
+    }
     new FinalizedFeatures(image.features().metadataVersion(),
       finalizedFeatures,
       image.highestOffsetAndEpoch().offset,
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f7cc416..ecd63ea 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -29,20 +29,15 @@
 import kafka.zk.EmbeddedZookeeper;
 
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerState;
-import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
 import org.junit.jupiter.api.extension.Extension;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -246,26 +241,16 @@
 
         public void format() throws Exception {
             if (formated.compareAndSet(false, true)) {
-                List<ApiMessageAndVersion> records = new ArrayList<>();
-                records.add(
-                    new ApiMessageAndVersion(new FeatureLevelRecord().
-                        setName(MetadataVersion.FEATURE_NAME).
-                        setFeatureLevel(clusterConfig.metadataVersion().featureLevel()), (short) 0));
-
-                clusterConfig.features().forEach((feature, version) -> {
-                    records.add(
-                        new ApiMessageAndVersion(new FeatureLevelRecord().
-                            setName(feature.featureName()).
-                            setFeatureLevel(version), (short) 0));
-                });
-
-                TestKitNodes nodes = new TestKitNodes.Builder()
-                        .setBootstrapMetadata(BootstrapMetadata.fromRecords(records, "testkit"))
+                TestKitNodes.Builder nodesBuilder = new TestKitNodes.Builder()
+                        .setBootstrapMetadataVersion(clusterConfig.metadataVersion())
                         .setCombined(isCombined)
                         .setNumBrokerNodes(clusterConfig.numBrokers())
                         .setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
                         .setPerServerProperties(clusterConfig.perServerOverrideProperties())
-                        .setNumControllerNodes(clusterConfig.numControllers()).build();
+                        .setNumControllerNodes(clusterConfig.numControllers());
+                clusterConfig.features().forEach((feature, version) ->
+                    nodesBuilder.setFeature(feature.featureName(), version));
+                TestKitNodes nodes = nodesBuilder.build();
                 KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
                 if (Boolean.parseBoolean(clusterConfig.serverProperties()
                         .getOrDefault("zookeeper.metadata.migration.enable", "false"))) {
diff --git a/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java b/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 0000000..9f5b003
--- /dev/null
+++ b/core/src/test/java/kafka/test/server/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.server;
+
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.QuorumInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReconfigurableQuorumIntegrationTest {
+    static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
+        FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
+        if (finalized > 0) {
+            assertTrue(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+            assertEquals(finalized, featureMetadata.finalizedFeatures().
+                    get(KRaftVersion.FEATURE_NAME).minVersionLevel());
+            assertEquals(finalized, featureMetadata.finalizedFeatures().
+                    get(KRaftVersion.FEATURE_NAME).maxVersionLevel());
+        } else {
+            assertFalse(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+        }
+        assertEquals((short) 0, featureMetadata.supportedFeatures().
+                get(KRaftVersion.FEATURE_NAME).minVersion());
+        assertEquals((short) 1, featureMetadata.supportedFeatures().
+                get(KRaftVersion.FEATURE_NAME).maxVersion());
+    }
+
+    @Test
+    public void testCreateAndDestroyNonReconfigurableCluster() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    checkKRaftVersions(admin, (short) 0);
+                });
+            }
+        }
+    }
+
+    @Test
+    public void testCreateAndDestroyReconfigurableCluster() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(1).
+                setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    checkKRaftVersions(admin, (short) 1);
+                });
+            }
+        }
+    }
+
+    static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
+        QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
+        Map<Integer, Uuid> result = new TreeMap<>();
+        quorumInfo.voters().forEach(v -> {
+            result.put(v.replicaId(), v.replicaDirectoryId());
+        });
+        return result;
+    }
+
+    @Test
+    public void testRemoveController() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(new HashSet<>(Arrays.asList(3000, 3001, 3002)), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+                    }
+                });
+                admin.removeRaftVoter(3000, cluster.nodes().
+                    controllerNodes().get(3000).metadataUuid()).all().get();
+            }
+        }
+    }
+}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 615ffa2..c12e3cf 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -34,12 +34,13 @@
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.Controller;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
-import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.storage.Formatter;
 import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.config.KRaftConfigs;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.fault.FaultHandler;
@@ -52,6 +53,7 @@
 import org.slf4j.event.Level;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -151,6 +153,7 @@
         private TestKitNodes nodes;
         private final Map<String, Object> configProps = new HashMap<>();
         private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
+        private final PreboundSocketFactoryManager socketFactoryManager = new PreboundSocketFactoryManager();
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
@@ -161,7 +164,7 @@
             return this;
         }
 
-        private KafkaConfig createNodeConfig(TestKitNode node) {
+        private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
 
@@ -198,13 +201,18 @@
                 nodes.interBrokerListenerName().value());
             props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
 
-            // Note: we can't accurately set controller.quorum.voters yet, since we don't
-            // yet know what ports each controller will pick.  Set it to a dummy string
-            // for now as a placeholder.
-            String uninitializedQuorumVotersString = nodes.controllerNodes().keySet().stream().
-                    map(n -> String.format("%d@0.0.0.0:0", n)).
-                    collect(Collectors.joining(","));
-            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
+            StringBuilder quorumVoterStringBuilder = new StringBuilder();
+            String prefix = "";
+            for (int nodeId : nodes.controllerNodes().keySet()) {
+                quorumVoterStringBuilder.append(prefix).
+                    append(nodeId).
+                    append("@").
+                    append("localhost").
+                    append(":").
+                    append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+                prefix = ",";
+            }
+            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString());
 
             // reduce log cleaner offset map memory usage
             props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -233,6 +241,9 @@
             try {
                 baseDirectory = new File(nodes.baseDirectory());
                 for (TestKitNode node : nodes.controllerNodes().values()) {
+                    socketFactoryManager.getOrCreatePortForListener(node.id(), "CONTROLLER");
+                }
+                for (TestKitNode node : nodes.controllerNodes().values()) {
                     setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
                     SharedServer sharedServer = new SharedServer(
                         createNodeConfig(node),
@@ -241,7 +252,8 @@
                         new Metrics(),
                         connectFutureManager.future,
                         Collections.emptyList(),
-                        faultHandlerFactory
+                        faultHandlerFactory,
+                        socketFactoryManager.getOrCreateSocketFactory(node.id())
                     );
                     ControllerServer controller = null;
                     try {
@@ -265,18 +277,20 @@
                     jointServers.put(node.id(), sharedServer);
                 }
                 for (TestKitNode node : nodes.brokerNodes().values()) {
-                    SharedServer sharedServer = jointServers.computeIfAbsent(
-                        node.id(),
-                        id -> new SharedServer(
+                    SharedServer sharedServer = jointServers.get(node.id());
+                    if (sharedServer == null) {
+                        sharedServer = new SharedServer(
                             createNodeConfig(node),
                             node.initialMetaPropertiesEnsemble(),
                             Time.SYSTEM,
                             new Metrics(),
                             connectFutureManager.future,
                             Collections.emptyList(),
-                            faultHandlerFactory
-                        )
-                    );
+                            faultHandlerFactory,
+                            socketFactoryManager.getOrCreateSocketFactory(node.id())
+                        );
+                        jointServers.put(node.id(), sharedServer);
+                    }
                     BrokerServer broker = null;
                     try {
                         broker = new BrokerServer(sharedServer);
@@ -298,6 +312,7 @@
                 if (baseDirectory != null) {
                     Utils.delete(baseDirectory);
                 }
+                socketFactoryManager.close();
                 throw e;
             }
             return new KafkaClusterTestKit(
@@ -306,7 +321,8 @@
                     brokers,
                     connectFutureManager,
                     baseDirectory,
-                    faultHandlerFactory);
+                    faultHandlerFactory,
+                    socketFactoryManager);
         }
 
         private String listeners(int node) {
@@ -349,6 +365,7 @@
     private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
     private final File baseDirectory;
     private final SimpleFaultHandlerFactory faultHandlerFactory;
+    private final PreboundSocketFactoryManager socketFactoryManager;
 
     private KafkaClusterTestKit(
         TestKitNodes nodes,
@@ -356,7 +373,8 @@
         Map<Integer, BrokerServer> brokers,
         ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
         File baseDirectory,
-        SimpleFaultHandlerFactory faultHandlerFactory
+        SimpleFaultHandlerFactory faultHandlerFactory,
+        PreboundSocketFactoryManager socketFactoryManager
     ) {
         /*
           Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
@@ -371,6 +389,7 @@
         this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
         this.baseDirectory = baseDirectory;
         this.faultHandlerFactory = faultHandlerFactory;
+        this.socketFactoryManager = socketFactoryManager;
     }
 
     public void format() throws Exception {
@@ -404,22 +423,44 @@
         boolean writeMetadataDirectory
     ) {
         try {
-            MetaPropertiesEnsemble.Copier copier =
-                new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
-            for (Entry<String, MetaProperties> entry : ensemble.logDirProps().entrySet()) {
-                String logDir = entry.getKey();
-                if (writeMetadataDirectory || (!ensemble.metadataLogDir().equals(Optional.of(logDir)))) {
-                    log.trace("Adding {} to the list of directories to format.", logDir);
-                    copier.setLogDirProps(logDir, entry.getValue());
-                }
+            Formatter formatter = new Formatter();
+            formatter.setNodeId(ensemble.nodeId().getAsInt());
+            formatter.setClusterId(ensemble.clusterId().get());
+            if (writeMetadataDirectory) {
+                formatter.setDirectories(ensemble.logDirProps().keySet());
+            } else {
+                formatter.setDirectories(ensemble.logDirProps().keySet().stream().
+                    filter(d -> !ensemble.metadataLogDir().get().equals(d)).
+                    collect(Collectors.toSet()));
             }
-            copier.setPreWriteHandler((logDir, isNew, metaProperties) -> {
-                log.info("Formatting {}.", logDir);
-                Files.createDirectories(Paths.get(logDir));
-                BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty());
-                bootstrapDirectory.writeBinaryFile(nodes.bootstrapMetadata());
-            });
-            copier.writeLogDirChanges();
+            if (formatter.directories().isEmpty()) {
+                return;
+            }
+            formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+            formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
+                nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
+            formatter.setUnstableFeatureVersionsEnabled(true);
+            formatter.setIgnoreFormatted(false);
+            formatter.setControllerListenerName("CONTROLLER");
+            if (writeMetadataDirectory) {
+                formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+            } else {
+                formatter.setMetadataLogDirectory(Optional.empty());
+            }
+            if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
+                StringBuilder dynamicVotersBuilder = new StringBuilder();
+                String prefix = "";
+                for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
+                    int port = socketFactoryManager.
+                        getOrCreatePortForListener(controllerNode.id(), "CONTROLLER");
+                    dynamicVotersBuilder.append(prefix);
+                    prefix = ",";
+                    dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+                        controllerNode.id(), port, controllerNode.metadataUuid()));
+                }
+                formatter.setInitialVoters(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+            }
+            formatter.run();
         } catch (Exception e) {
             throw new RuntimeException("Failed to format node " + ensemble.nodeId(), e);
         }
@@ -636,6 +677,7 @@
             throw e;
         } finally {
             ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5, TimeUnit.MINUTES);
+            socketFactoryManager.close();
         }
         waitForAllThreads();
         faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
diff --git a/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java b/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java
new file mode 100644
index 0000000..673699c
--- /dev/null
+++ b/core/src/test/java/kafka/testkit/PreboundSocketFactoryManager.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.ServerSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class PreboundSocketFactoryManager implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(PreboundSocketFactoryManager.class);
+
+    private class PreboundSocketFactory implements ServerSocketFactory {
+        private final int nodeId;
+
+        private PreboundSocketFactory(int nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public ServerSocketChannel openServerSocket(
+                String listenerName,
+                InetSocketAddress socketAddress,
+                int listenBacklogSize,
+                int recvBufferSize
+        ) throws IOException {
+            ServerSocketChannel socketChannel = getSocketForListenerAndMarkAsUsed(
+                nodeId,
+                listenerName);
+            if (socketChannel != null) {
+                return socketChannel;
+            }
+            return KafkaServerSocketFactory.INSTANCE.openServerSocket(
+                listenerName,
+                socketAddress,
+                listenBacklogSize,
+                recvBufferSize);
+        }
+    }
+
+    /**
+     * True if this manager is closed.
+     */
+    private boolean closed = false;
+
+    /**
+     * Maps node IDs to socket factory objects.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, PreboundSocketFactory> factories = new HashMap<>();
+
+    /**
+     * Maps node IDs to maps of listener names to ports.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, Map<String, ServerSocketChannel>> sockets = new HashMap<>();
+
+    /**
+     * Maps node IDs to set of the listeners that were used.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, Set<String>> usedSockets = new HashMap<>();
+
+    /**
+     * Get a socket from this manager, mark it as used, and return it.
+     *
+     * @param nodeId        The ID of the node.
+     * @param listener      The listener for the socket.
+     *
+     * @return              null if the socket was not found; the socket, otherwise.
+     */
+    public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
+        int nodeId,
+        String listener
+    ) {
+        Map<String, ServerSocketChannel> socketsForNode = sockets.get(nodeId);
+        if (socketsForNode == null) {
+            return null;
+        }
+        ServerSocketChannel socket = socketsForNode.get(listener);
+        if (socket == null) {
+            return null;
+        }
+        usedSockets.computeIfAbsent(nodeId, __ -> new HashSet<>()).add(listener);
+        return socket;
+    }
+
+    /**
+     * Get or create a socket factory object associated with a given node ID.
+     *
+     * @param nodeId        The ID of the node.
+     *
+     * @return              The socket factory.
+     */
+    public synchronized PreboundSocketFactory getOrCreateSocketFactory(int nodeId) {
+        return factories.computeIfAbsent(nodeId, __ -> new PreboundSocketFactory(nodeId));
+    }
+
+    /**
+     * Get a specific port number. The port will be created if it does not already exist.
+     *
+     * @param nodeId        The ID of the node.
+     * @param listener      The listener for the socket.
+     *
+     * @return              The port number.
+     */
+    public synchronized int getOrCreatePortForListener(
+        int nodeId,
+        String listener
+    ) throws IOException {
+        Map<String, ServerSocketChannel> socketsForNode =
+            sockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
+        ServerSocketChannel socketChannel = socketsForNode.get(listener);
+        if (socketChannel == null) {
+            if (closed) {
+                throw new RuntimeException("Cannot open new socket: manager is closed.");
+            }
+            socketChannel = ServerSocketFactory.KafkaServerSocketFactory.INSTANCE.openServerSocket(
+                listener,
+                new InetSocketAddress(0),
+                -1,
+                -1);
+            socketsForNode.put(listener, socketChannel);
+        }
+        InetSocketAddress socketAddress = (InetSocketAddress) socketChannel.getLocalAddress();
+        return socketAddress.getPort();
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        // Close all sockets that haven't been used by a SocketServer. (We don't want to close the
+        // ones that have been used by a SocketServer because that is the responsibility of that
+        // SocketServer.)
+        for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : sockets.entrySet()) {
+            Set<String> usedListeners = usedSockets.getOrDefault(
+                socketsEntry.getKey(), Collections.emptySet());
+            for (Entry<String, ServerSocketChannel> entry : socketsEntry.getValue().entrySet()) {
+                if (!usedListeners.contains(entry.getKey())) {
+                    try {
+                        entry.getValue().close();
+                    } catch (Exception e) {
+                        LOG.error("Error closing socket", e);
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/core/src/test/java/kafka/testkit/TestKitNode.java b/core/src/test/java/kafka/testkit/TestKitNode.java
index 2293c91..78f785e 100644
--- a/core/src/test/java/kafka/testkit/TestKitNode.java
+++ b/core/src/test/java/kafka/testkit/TestKitNode.java
@@ -17,6 +17,8 @@
 
 package kafka.testkit;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 
 import java.util.Map;
@@ -35,6 +37,18 @@
         return initialMetaPropertiesEnsemble().logDirProps().keySet();
     }
 
+    default Uuid metadataUuid() {
+        MetaProperties props = initialMetaPropertiesEnsemble().logDirProps().get(
+            initialMetaPropertiesEnsemble().metadataLogDir().get());
+        return props.directoryId().get();
+    }
+
+    default String voterString(int port) {
+        MetaProperties props = initialMetaPropertiesEnsemble().logDirProps().get(
+                initialMetaPropertiesEnsemble().metadataLogDir().get());
+        return String.format("%d@localhost:%d:%s", id(), port, props.directoryId().get());
+    }
+
     MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
 
     Map<String, String> propertyOverrides();
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index b716f66..3bc0b6c 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -51,8 +51,15 @@
         private int numBrokerNodes;
         private int numDisksPerBroker = 1;
         private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
-        private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
-            fromVersion(MetadataVersion.latestTesting(), "testkit");
+        private BootstrapMetadata bootstrapMetadata;
+
+        public Builder() {
+            this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+        }
+
+        public Builder(BootstrapMetadata bootstrapMetadata) {
+            this.bootstrapMetadata = bootstrapMetadata;
+        }
 
         public Builder setClusterId(String clusterId) {
             this.clusterId = clusterId;
@@ -60,12 +67,13 @@
         }
 
         public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
-            this.bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "testkit");
+            this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
+                    MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
             return this;
         }
 
-        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
-            this.bootstrapMetadata = bootstrapMetadata;
+        public Builder setFeature(String featureName, short level) {
+            this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level);
             return this;
         }
 
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e74ced..dee3cc7 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1389,8 +1389,7 @@
         setName("num.io.threads").
         setValue("9"), 0.toShort))
     val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
+      new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
         setNumBrokerNodes(1).
         setNumControllerNodes(1).build()).
       build()
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 969e7e7..ae7f04f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import kafka.server.ServerSocketFactory.KafkaServerSocketFactory
+
 import java.io.File
 import java.net.InetSocketAddress
 import java.util
@@ -129,7 +131,8 @@
       new Metrics(),
       controllerQuorumVotersFuture,
       controllerQuorumVotersFuture.get().values(),
-      faultHandlerFactory
+      faultHandlerFactory,
+      KafkaServerSocketFactory.INSTANCE,
     )
     var broker: BrokerServer = null
     try {
@@ -369,7 +372,8 @@
       new Metrics(),
       controllerQuorumVotersFuture,
       Collections.emptyList(),
-      faultHandlerFactory
+      faultHandlerFactory,
+      KafkaServerSocketFactory.INSTANCE,
     )
     var controllerServer: ControllerServer = null
     try {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 1de6a1f..2dc6d9a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -134,19 +134,43 @@
         return source;
     }
 
-    public BootstrapMetadata copyWithOnlyVersion() {
-        ApiMessageAndVersion versionRecord = null;
+    public short featureLevel(String featureName) {
+        short result = 0;
         for (ApiMessageAndVersion record : records) {
-            if (recordToMetadataVersion(record.message()).isPresent()) {
-                versionRecord = record;
+            if (record.message() instanceof FeatureLevelRecord) {
+                FeatureLevelRecord message = (FeatureLevelRecord) record.message();
+                if (message.name().equals(featureName)) {
+                    result = message.featureLevel();
+                }
             }
         }
-        if (versionRecord == null) {
-            throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME +
-                    " was found in " + source);
+        return result;
+    }
+
+    public BootstrapMetadata copyWithFeatureRecord(String featureName, short level) {
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        int i = 0;
+        while (i < records.size()) {
+            if (records.get(i).message() instanceof FeatureLevelRecord) {
+                FeatureLevelRecord record = (FeatureLevelRecord) records.get(i).message();
+                if (record.name().equals(featureName)) {
+                    FeatureLevelRecord newRecord = record.duplicate();
+                    newRecord.setFeatureLevel(level);
+                    newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
+                    break;
+                } else {
+                    newRecords.add(records.get(i));
+                }
+            }
+            i++;
         }
-        return new BootstrapMetadata(Collections.singletonList(versionRecord),
-                metadataVersion, source);
+        if (i == records.size()) {
+            FeatureLevelRecord newRecord = new FeatureLevelRecord().
+                setName(featureName).
+                setFeatureLevel(level);
+            newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
+        }
+        return BootstrapMetadata.fromRecords(newRecords, source);
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 72995fb..952ef85 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -125,7 +125,7 @@
     /**
      * The metadata log directory.
      */
-    private String metadataLogDirectory = null;
+    private Optional<String> metadataLogDirectory = null;
 
     /**
      * The initial KIP-853 voters.
@@ -167,6 +167,10 @@
         return this;
     }
 
+    public Collection<String> directories() {
+        return directories;
+    }
+
     public Formatter setReleaseVersion(MetadataVersion releaseVersion) {
         this.releaseVersion = releaseVersion;
         return this;
@@ -198,6 +202,11 @@
     }
 
     public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
+        this.metadataLogDirectory = Optional.of(metadataLogDirectory);
+        return this;
+    }
+
+    public Formatter setMetadataLogDirectory(Optional<String> metadataLogDirectory) {
         this.metadataLogDirectory = metadataLogDirectory;
         return this;
     }
@@ -234,10 +243,12 @@
         if (metadataLogDirectory == null) {
             throw new FormatterException("You must specify the metadata log directory.");
         }
-        if (!directories.contains(metadataLogDirectory)) {
-            throw new FormatterException("The specified metadata log directory, " + metadataLogDirectory +
+        metadataLogDirectory.ifPresent(d -> {
+            if (!directories.contains(d)) {
+                throw new FormatterException("The specified metadata log directory, " + d +
                     " was not one of the given directories: " + directories);
-        }
+            }
+        });
         releaseVersion = calculateEffectiveReleaseVersion();
         featureLevels = calculateEffectiveFeatureLevels();
         this.bootstrapMetadata = calculateBootstrapMetadata();
@@ -401,7 +412,7 @@
             Map<String, DirectoryType> directoryTypes = new HashMap<>();
             for (String emptyLogDir : ensemble.emptyLogDirs()) {
                 DirectoryType directoryType = DirectoryType.calculate(emptyLogDir,
-                    metadataLogDirectory,
+                    metadataLogDirectory.orElseGet(() -> ""),
                     nodeId,
                     initialControllers);
                 directoryTypes.put(emptyLogDir, directoryType);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 5118941..edd75c5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -24,6 +24,7 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -71,10 +72,50 @@
                 () -> BootstrapMetadata.fromRecords(emptyList(), "quux")).getMessage());
     }
 
+    private static final ApiMessageAndVersion MV_10 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 10), (short) 0);
+
+    private static final ApiMessageAndVersion MV_11 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 11), (short) 0);
+
+    private static final ApiMessageAndVersion FOO_1 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("foo").
+            setFeatureLevel((short) 1), (short) 0);
+
+    private static final ApiMessageAndVersion FOO_2 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("foo").
+            setFeatureLevel((short) 2), (short) 0);
+
     @Test
-    public void testCopyWithOnlyVersion() {
-        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), IBP_3_3_IV2, "baz"),
-                BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "baz").copyWithOnlyVersion());
+    public void testCopyWithNewFeatureRecord() {
+        assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src"),
+            BootstrapMetadata.fromRecords(Arrays.asList(MV_10), "src").
+                copyWithFeatureRecord("foo", (short) 1));
+    }
+
+    @Test
+    public void testCopyWithModifiedFeatureRecord() {
+        assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_2), "src"),
+            BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src").
+                copyWithFeatureRecord("foo", (short) 2));
+    }
+
+    @Test
+    public void testFeatureLevelForFeatureThatIsNotSet() {
+        assertEquals((short) 0, BootstrapMetadata.
+            fromRecords(Arrays.asList(MV_10), "src").featureLevel("foo"));
+    }
+
+    @Test
+    public void testFeatureLevelForFeature() {
+        assertEquals((short) 2, BootstrapMetadata.
+            fromRecords(Arrays.asList(MV_10, FOO_2), "src").featureLevel("foo"));
     }
 
     static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = Collections.singletonList(