Merge remote-tracking branch 'origin' into testRemoteLogManagerRemoteMetrics
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 174b032..2a27103 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -1185,5 +1185,4 @@
 
         return loggers.setLevel(namespace, level);
     }
-
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index a8a6e78..f33fa6f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -122,6 +122,14 @@
                             Callback<Created<ConnectorInfo>> callback);
 
     /**
+     * Patch the configuration for a connector.
+     * @param connName name of the connector
+     * @param configPatch the connector's configuration patch.
+     * @param callback callback to invoke when the configuration has been written
+     */
+    void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback);
+
+    /**
      * Delete a connector and its configuration.
      * @param connName name of the connector
      * @param callback callback to invoke when the configuration has been written
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 22a9640..f3f2ae7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1096,48 +1096,7 @@
         log.trace("Submitting connector config write request {}", connName);
         addRequest(
             () -> {
-                validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
-                    if (error != null) {
-                        callback.onCompletion(error, null);
-                        return;
-                    }
-
-                    // Complete the connector config write via another herder request in order to
-                    // perform the write to the backing store (or forward to the leader) during
-                    // the "external request" portion of the tick loop
-                    addRequest(
-                        () -> {
-                            if (maybeAddConfigErrors(configInfos, callback)) {
-                                return null;
-                            }
-
-                            log.trace("Handling connector config request {}", connName);
-                            if (!isLeader()) {
-                                callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
-                                return null;
-                            }
-                            boolean exists = configState.contains(connName);
-                            if (!allowReplace && exists) {
-                                callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
-                                return null;
-                            }
-
-                            log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
-                            writeToConfigTopicAsLeader(
-                                    "writing a config for connector " + connName + " to the config topic",
-                                    () -> configBackingStore.putConnectorConfig(connName, config, targetState)
-                            );
-
-                            // Note that we use the updated connector config despite the fact that we don't have an updated
-                            // snapshot yet. The existing task info should still be accurate.
-                            ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
-                                connectorType(config));
-                            callback.onCompletion(null, new Created<>(!exists, info));
-                            return null;
-                        },
-                        forwardErrorAndTickThreadStages(callback)
-                    );
-                }));
+                doPutConnectorConfig(connName, config, targetState, allowReplace, callback);
                 return null;
             },
             forwardErrorAndTickThreadStages(callback)
@@ -1145,6 +1104,75 @@
     }
 
     @Override
+    public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
+        log.trace("Submitting connector config patch request {}", connName);
+        addRequest(() -> {
+            // This reduces (but not completely eliminates) the chance for race conditions.
+            if (!isLeader()) {
+                callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
+                return null;
+            }
+
+            ConnectorInfo connectorInfo = connectorInfo(connName);
+            if (connectorInfo == null) {
+                callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+            } else {
+                Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
+                doPutConnectorConfig(connName, patchedConfig, null, true, callback);
+            }
+            return null;
+        }, forwardErrorAndTickThreadStages(callback));
+    }
+
+    private void doPutConnectorConfig(
+            String connName,
+            Map<String, String> config,
+            TargetState targetState, boolean allowReplace,
+            Callback<Created<ConnectorInfo>> callback) {
+        validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
+            if (error != null) {
+                callback.onCompletion(error, null);
+                return;
+            }
+
+            // Complete the connector config write via another herder request in order to
+            // perform the write to the backing store (or forward to the leader) during
+            // the "external request" portion of the tick loop
+            addRequest(
+                    () -> {
+                        if (maybeAddConfigErrors(configInfos, callback)) {
+                            return null;
+                        }
+
+                        log.trace("Handling connector config request {}", connName);
+                        if (!isLeader()) {
+                            callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
+                            return null;
+                        }
+                        boolean exists = configState.contains(connName);
+                        if (!allowReplace && exists) {
+                            callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
+                            return null;
+                        }
+
+                        log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
+                        writeToConfigTopicAsLeader(
+                                "writing a config for connector " + connName + " to the config topic",
+                                () -> configBackingStore.putConnectorConfig(connName, config, targetState)
+                        );
+
+                        // Note that we use the updated connector config despite the fact that we don't have an updated
+                        // snapshot yet. The existing task info should still be accurate.
+                        ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
+                                connectorType(config));
+                        callback.onCompletion(null, new Created<>(!exists, info));
+                        return null;
+                    },
+                    forwardErrorAndTickThreadStages(callback)
+            );
+        }));
+    }
+    @Override
     public void stopConnector(final String connName, final Callback<Void> callback) {
         log.trace("Submitting request to transition connector {} to STOPPED state", connName);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 7a0f02c..532af87 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -242,6 +242,19 @@
         return response.entity(createdInfo.result()).build();
     }
 
+    @PATCH
+    @Path("/{connector}/config")
+    public Response patchConnectorConfig(final @PathParam("connector") String connector,
+                                         final @Context HttpHeaders headers,
+                                         final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
+                                         final Map<String, String> connectorConfigPatch) throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
+        herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+        Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+                "PATCH", headers, connectorConfigPatch, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
+        return Response.ok().entity(createdInfo.result()).build();
+    }
+
     @POST
     @Path("/{connector}/restart")
     @Operation(summary = "Restart the specified connector")
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index e1386bf..e773eee 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -47,6 +47,7 @@
 import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -247,6 +248,31 @@
     }
 
     @Override
+    public synchronized void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
+        try {
+            ConnectorInfo connectorInfo = connectorInfo(connName);
+            if (connectorInfo == null) {
+                callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+                return;
+            }
+
+            Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
+            validateConnectorConfig(patchedConfig, (error, configInfos) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+
+                requestExecutorService.submit(
+                        () -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos)
+                );
+            });
+        } catch (Throwable e) {
+            callback.onCompletion(e, null);
+        }
+    }
+
+    @Override
     public synchronized void stopConnector(String connName, Callback<Void> callback) {
         try {
             removeConnectorTasks(connName);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 3a2c88b..06ea5a6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -29,6 +29,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -218,4 +219,28 @@
     public static String className(Object o) {
         return o != null ? o.getClass().getName() : "null";
     }
+
+    /**
+     * Apply a patch on a connector config.
+     *
+     * <p>In the output, the values from the patch will override the values from the config.
+     * {@code null} values will cause the corresponding key to be removed completely.
+     * @param config the config to be patched.
+     * @param patch the patch.
+     * @return the output config map.
+     */
+    public static Map<String, String> patchConfig(
+            Map<String, String> config,
+            Map<String, String> patch
+    ) {
+        Map<String, String> result = new HashMap<>(config);
+        patch.forEach((k, v) -> {
+            if (v != null) {
+                result.put(k, v);
+            } else {
+                result.remove(k);
+            }
+        });
+        return result;
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 35bfeab..65f2dc8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -773,6 +773,43 @@
         connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
     }
 
+    @Test
+    public void testPatchConnectorConfig() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        connect.kafka().createTopic(TOPIC_NAME);
+
+        Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
+        props.put("unaffected-key", "unaffected-value");
+        props.put("to-be-deleted-key", "value");
+        props.put(TASKS_MAX_CONFIG, "2");
+
+        Map<String, String> patch = new HashMap<>();
+        patch.put(TASKS_MAX_CONFIG, "3");  // this plays as a value to be changed
+        patch.put("to-be-added-key", "value");
+        patch.put("to-be-deleted-key", null);
+
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2,
+                "connector and tasks did not start in time");
+
+        connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 3,
+                "connector and tasks did not reconfigure and restart in time");
+
+        Map<String, String> expectedConfig = new HashMap<>(props);
+        expectedConfig.put("name", CONNECTOR_NAME);
+        expectedConfig.put("to-be-added-key", "value");
+        expectedConfig.put(TASKS_MAX_CONFIG, "3");
+        expectedConfig.remove("to-be-deleted-key");
+        assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config());
+    }
+
     private Map<String, String> defaultSinkConnectorProps(String topics) {
         // setup props for the sink connector
         Map<String, String> props = new HashMap<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index fa05e55..f4a7cd2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -2337,6 +2337,133 @@
     }
 
     @Test
+    public void testPatchConnectorConfigNotFound() {
+        when(member.memberId()).thenReturn("leader");
+        expectRebalance(0, Collections.emptyList(), Collections.emptyList(), true);
+        when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+        ClusterConfigState clusterConfigState = new ClusterConfigState(
+                0,
+                null,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(clusterConfigState);
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
+        herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get);
+        assertInstanceOf(NotFoundException.class, exception.getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfigNotALeader() {
+        when(member.memberId()).thenReturn("not-leader");
+
+        // The connector is pre-existing due to the mocks.
+        ClusterConfigState originalSnapshot = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 0),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(originalSnapshot);
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+        // Patch the connector config.
+
+        expectMemberEnsureActive();
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false);
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
+        herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        ExecutionException fencingException = assertThrows(ExecutionException.class, patchCallback::get);
+        assertInstanceOf(ConnectException.class, fencingException.getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws Exception {
+        when(member.memberId()).thenReturn("leader");
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
+        when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+        Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+        originalConnConfig.put("foo0", "unaffected");
+        originalConnConfig.put("foo1", "will-be-changed");
+        originalConnConfig.put("foo2", "will-be-removed");
+
+        // The connector is pre-existing due to the mocks.
+
+        ClusterConfigState originalSnapshot = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 0),
+                Collections.singletonMap(CONN1, originalConnConfig),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(originalSnapshot);
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+        expectMemberPoll();
+
+        // Patch the connector config.
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo0", "unaffected");
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectMemberEnsureActive();
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
+
+        ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
+            return null;
+        }).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture());
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
+        herder.patchConnectorConfig(CONN1, connConfigPatch, patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        assertEquals(patchedConnConfig, patchCallback.get().result().config());
+
+        // This is effectively the main check of this test:
+        // we validate that what's written in the config storage is the patched config.
+        verify(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull());
+        verifyNoMoreInteractions(configBackingStore);
+
+        // No need to check herder.connectorConfig explicitly:
+        // all the related parts are mocked and that the config is correct is checked by eq()'s in the mocked called above.
+    }
+
+    @Test
     public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
         long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
         when(member.memberId()).thenReturn("member");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 65f27f7..2386b25 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -109,6 +109,17 @@
         CONNECTOR_CONFIG.put("sample_config", "test_config");
     }
 
+    private static final Map<String, String> CONNECTOR_CONFIG_PATCH = new HashMap<>();
+    static {
+        CONNECTOR_CONFIG_PATCH.put("sample_config", "test_config_new");
+        CONNECTOR_CONFIG_PATCH.put("sample_config_2", "test_config_2");
+    }
+
+    private static final Map<String, String> CONNECTOR_CONFIG_PATCHED = new HashMap<>(CONNECTOR_CONFIG);
+    static {
+        CONNECTOR_CONFIG_PATCHED.putAll(CONNECTOR_CONFIG_PATCH);
+    }
+
     private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
     static {
         CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
@@ -589,6 +600,37 @@
     }
 
     @Test
+    public void testPatchConnectorConfig() throws Throwable {
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG_PATCHED, CONNECTOR_TASK_NAMES,
+                ConnectorType.SINK))
+        ).when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
+
+        connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
+    }
+
+    @Test
+    public void testPatchConnectorConfigLeaderRedirect() throws Throwable {
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackNotLeaderException(cb)
+                .when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
+        when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/config?forward=false"), eq("PATCH"), isNull(), eq(CONNECTOR_CONFIG_PATCH), any()))
+                .thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(CONNECTOR_CONFIG_PATCHED), null));
+
+        connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
+    }
+
+    @Test
+    public void testPatchConnectorConfigNotFound() throws Throwable {
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackException(cb, new NotFoundException("Connector " + CONNECTOR_NAME + " not found"))
+                .when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
+
+        assertThrows(NotFoundException.class, () -> connectorsResource.patchConnectorConfig(
+                CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH));
+    }
+
+    @Test
     public void testGetConnectorTaskConfigs() throws Throwable {
         final ArgumentCaptor<Callback<List<TaskInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, TASK_INFOS).when(herder).taskConfigs(eq(CONNECTOR_NAME), cb.capture());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index bed08ff..e8ab2ad 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -77,7 +77,9 @@
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -752,6 +754,77 @@
     }
 
     @Test
+    public void testPatchConnectorConfigNotFound() {
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class);
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
+
+        ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class);
+        verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull());
+        assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found");
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException {
+        // Create the connector.
+        Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE);
+        originalConnConfig.put("foo0", "unaffected");
+        originalConnConfig.put("foo1", "will-be-changed");
+        originalConnConfig.put("foo2", "will-be-removed");
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo0", "unaffected");
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectAdd(SourceSink.SOURCE);
+        expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig);
+
+        expectConnectorStartingWithoutTasks(originalConnConfig, SourceSink.SOURCE);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback);
+        createCallback.get(1000L, TimeUnit.SECONDS);
+
+        expectConnectorStartingWithoutTasks(patchedConnConfig, SourceSink.SOURCE);
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
+
+        Map<String, String> returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config();
+        assertEquals(patchedConnConfig, returnedConfig);
+
+        // Also check the returned config when requested.
+        FutureCallback<Map<String, String>> configCallback = new FutureCallback<>();
+        herder.connectorConfig(CONNECTOR_NAME, configCallback);
+
+        Map<String, String> returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(patchedConnConfig, returnedConfig2);
+    }
+
+    private void expectConnectorStartingWithoutTasks(Map<String, String> config, SourceSink sourceSink) {
+        doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+        final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), any(Map.class), any(),
+                eq(herder), eq(TargetState.STARTED), onStart.capture());
+        ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
+                new SourceConnectorConfig(plugins, config, true) :
+                new SinkConnectorConfig(plugins, config);
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
+                .thenReturn(emptyList());
+    }
+
+    @Test
     public void testPutTaskConfigs() {
         Callback<Void> cb = mock(Callback.class);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index 0bdfa0d..42fe5d1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -169,4 +169,24 @@
         assertEquals(expectedClientIdBase, actualClientIdBase);
     }
 
+    @Test
+    public void testPatchConfig() {
+        HashMap<String, String> config = new HashMap<>();
+        config.put("unaffected-key", "unaffected-value");
+        config.put("to-be-changed-key", "to-be-changed-value-old");
+        config.put("to-be-deleted-key", "to-be-deleted-value");
+
+        HashMap<String, String> patch = new HashMap<>();
+        patch.put("to-be-changed-key", "to-be-changed-value-new");
+        patch.put("to-be-deleted-key", null);
+        patch.put("to-be-added-key", "to-be-added-value");
+
+        HashMap<String, String> expectedResult = new HashMap<>();
+        expectedResult.put("unaffected-key", "unaffected-value");
+        expectedResult.put("to-be-changed-key", "to-be-changed-value-new");
+        expectedResult.put("to-be-added-key", "to-be-added-value");
+
+        Map<String, String> result = ConnectUtils.patchConfig(config, patch);
+        assertEquals(expectedResult, result);
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 0e58d63..a37de76 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -272,6 +272,43 @@
     }
 
     /**
+     * Patch the config of a connector.
+     *
+     * @param connName   the name of the connector
+     * @param connConfigPatch the configuration patch
+     * @throws ConnectRestException if the REST API returns error status
+     * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
+     */
+    public String patchConnectorConfig(String connName, Map<String, String> connConfigPatch) {
+        String url = endpointForResource(String.format("connectors/%s/config", connName));
+        return doPatchConnectorConfig(url, connConfigPatch);
+    }
+
+    /**
+     * Execute a PATCH request with the given connector configuration on the given URL endpoint.
+     *
+     * @param url        the full URL of the endpoint that corresponds to the given REST resource
+     * @param connConfigPatch the configuration patch
+     * @throws ConnectRestException if the REST api returns error status
+     * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
+     */
+    protected String doPatchConnectorConfig(String url, Map<String, String> connConfigPatch) {
+        ObjectMapper mapper = new ObjectMapper();
+        String content;
+        try {
+            content = mapper.writeValueAsString(connConfigPatch);
+        } catch (IOException e) {
+            throw new ConnectException("Could not serialize connector configuration and execute PUT request");
+        }
+        Response response = requestPatch(url, content);
+        if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+            return responseToString(response);
+        }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not execute PATCH request. Error response: " + responseToString(response));
+    }
+
+    /**
      * Delete an existing connector.
      *
      * @param connName name of the connector to be deleted
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 4a713f3..65d7786 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -53,14 +53,14 @@
     private final Map<String, String> adminClientProperties;
     private final Map<String, String> saslServerProperties;
     private final Map<String, String> saslClientProperties;
-    private final Map<Integer, Map<String, String>> perBrokerOverrideProperties;
+    private final Map<Integer, Map<String, String>> perServerProperties;
 
     @SuppressWarnings("checkstyle:ParameterNumber")
     private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
                   SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
                   MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
                   Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
-                  Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
+                  Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties) {
         // do fail fast. the following values are invalid for both zk and kraft modes.
         if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero.");
         if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
@@ -82,7 +82,7 @@
         this.adminClientProperties = Objects.requireNonNull(adminClientProperties);
         this.saslServerProperties = Objects.requireNonNull(saslServerProperties);
         this.saslClientProperties = Objects.requireNonNull(saslClientProperties);
-        this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties);
+        this.perServerProperties = Objects.requireNonNull(perServerProperties);
     }
 
     public Type clusterType() {
@@ -149,8 +149,8 @@
         return metadataVersion;
     }
 
-    public Map<Integer, Map<String, String>> perBrokerOverrideProperties() {
-        return perBrokerOverrideProperties;
+    public Map<Integer, Map<String, String>> perServerOverrideProperties() {
+        return perServerProperties;
     }
 
     public Map<String, String> nameTags() {
@@ -195,7 +195,7 @@
                 .setAdminClientProperties(clusterConfig.adminClientProperties)
                 .setSaslServerProperties(clusterConfig.saslServerProperties)
                 .setSaslClientProperties(clusterConfig.saslClientProperties)
-                .setPerBrokerProperties(clusterConfig.perBrokerOverrideProperties);
+                .setPerServerProperties(clusterConfig.perServerProperties);
     }
 
     public static class Builder {
@@ -215,7 +215,7 @@
         private Map<String, String> adminClientProperties = Collections.emptyMap();
         private Map<String, String> saslServerProperties = Collections.emptyMap();
         private Map<String, String> saslClientProperties = Collections.emptyMap();
-        private Map<Integer, Map<String, String>> perBrokerOverrideProperties = Collections.emptyMap();
+        private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
 
         private Builder() {}
 
@@ -299,9 +299,9 @@
             return this;
         }
 
-        public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
-            this.perBrokerOverrideProperties = Collections.unmodifiableMap(
-                    perBrokerOverrideProperties.entrySet().stream()
+        public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
+            this.perServerProperties = Collections.unmodifiableMap(
+                    perServerProperties.entrySet().stream()
                             .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
             return this;
         }
@@ -309,8 +309,7 @@
         public ClusterConfig build() {
             return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
                     trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
-                    adminClientProperties, saslServerProperties, saslClientProperties,
-                    perBrokerOverrideProperties);
+                    adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties);
         }
     }
 }
diff --git a/core/src/test/java/kafka/test/ClusterConfigTest.java b/core/src/test/java/kafka/test/ClusterConfigTest.java
index 790ca7d..1ad1659 100644
--- a/core/src/test/java/kafka/test/ClusterConfigTest.java
+++ b/core/src/test/java/kafka/test/ClusterConfigTest.java
@@ -62,7 +62,7 @@
                 .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value"))
                 .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value"))
                 .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value"))
-                .setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value")))
+                .setPerServerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value")))
                 .build();
 
         Map<String, Object> clusterConfigFields = fields(clusterConfig);
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 9024da0..b74a5f5 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -26,6 +26,9 @@
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.clients.admin.DescribeLogDirsResult;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -33,13 +36,13 @@
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-
 import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
 import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
 import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
@@ -49,6 +52,7 @@
 @Disabled
 @ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
     @ClusterConfigProperty(key = "default.key", value = "default.value"),
+    @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
 })   // Set defaults for a few params in @ClusterTest(s)
 @ExtendWith(ClusterTestExtensions.class)
 public class ClusterTestExtensionsTest {
@@ -91,30 +95,59 @@
     @ClusterTests({
         @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs")
+            @ClusterConfigProperty(key = "spam", value = "eggs"),
+            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
         }),
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = "overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"),
+            @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300")
         }),
         @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = "overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200")
         })
     })
-    public void testClusterTests() {
-        if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+    public void testClusterTests() throws ExecutionException, InterruptedException {
+        if (!clusterInstance.isKRaftTest()) {
             Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
-        } else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+
+            // assert broker server 0 contains property queued.max.requests 100 from ClusterTestDefaults
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value());
+            }
+        } else {
             Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggz", clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
-        } else {
-            Assertions.fail("Unknown cluster type " + clusterInstance.clusterType());
+
+            // assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
+            // the value 100 in server property in ClusterTestDefaults
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
+            }
+            // In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
+            if (clusterInstance.config().clusterType() == Type.KRAFT) {
+                try (Admin admin = Admin.create(Collections.singletonMap(
+                        AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
+                    ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
+                    Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                    Assertions.assertEquals(1, configs.size());
+                    Assertions.assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
+                }
+            }
         }
     }
 
diff --git a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
index eb1434d..fa98c00 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
@@ -27,6 +27,27 @@
 @Target({ElementType.ANNOTATION_TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ClusterConfigProperty {
+    /**
+     * The config applies to the controller/broker with specified id. Default is -1, indicating the property applied to
+     * all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller.
+     * The id can vary depending on the different {@link kafka.test.annotation.Type}.
+     * <ul>
+     *  <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 1
+     *  with each additional broker, and there is no controller server under this mode. </li>
+     *  <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller id
+     *  starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 3000}
+     *  and increases by 1 with each addition broker/controller.</li>
+     *  <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id and controller id both start from
+     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}
+     *  and increases by 1 with each additional broker/controller.</li>
+     * </ul>
+     *
+     * If the id doesn't correspond to any broker/controller server, throw IllegalArgumentException
+     * @return the controller/broker id
+     */
+    int id() default -1;
     String key();
     String value();
 }
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index bdf3b2b..6f5ad19 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -33,11 +33,13 @@
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -141,14 +143,15 @@
     private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
                                     Consumer<TestTemplateInvocationContext> testInvocations) {
         Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType();
+        Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
+                .filter(e -> e.id() == -1)
+                .collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b));
 
-        Map<String, String> serverProperties = new HashMap<>();
-        for (ClusterConfigProperty property : defaults.serverProperties()) {
-            serverProperties.put(property.key(), property.value());
-        }
-        for (ClusterConfigProperty property : annot.serverProperties()) {
-            serverProperties.put(property.key(), property.value());
-        }
+        Map<Integer, Map<String, String>> perServerProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
+                .filter(e -> e.id() != -1)
+                .collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(),
+                        Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b))));
+
         ClusterConfig config = ClusterConfig.builder()
                 .setType(type)
                 .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
@@ -158,6 +161,7 @@
                 .setName(annot.name().trim().isEmpty() ? null : annot.name())
                 .setListenerName(annot.listener().trim().isEmpty() ? null : annot.listener())
                 .setServerProperties(serverProperties)
+                .setPerServerProperties(perServerProperties)
                 .setSecurityProtocol(annot.securityProtocol())
                 .setMetadataVersion(annot.metadataVersion())
                 .build();
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index e14db91..57bd796 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -94,8 +94,8 @@
                         setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
                         setCombined(isCombined).
                         setNumBrokerNodes(clusterConfig.numBrokers()).
+                        setPerServerProperties(clusterConfig.perServerOverrideProperties()).
                         setNumDisksPerBroker(clusterConfig.numDisksPerBroker()).
-                        setPerBrokerProperties(clusterConfig.perBrokerOverrideProperties()).
                         setNumControllerNodes(clusterConfig.numControllers()).build();
                 KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
 
@@ -104,7 +104,7 @@
                     builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port()));
                 }
                 // Copy properties into the TestKit builder
-                clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
+                clusterConfig.serverProperties().forEach(builder::setConfigProp);
                 // KAFKA-12512 need to pass security protocol and listener name here
                 KafkaClusterTestKit cluster = builder.build();
                 clusterReference.set(cluster);
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 7496aff..ab23aed 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -301,7 +301,7 @@
         public void modifyConfigs(Seq<Properties> props) {
             super.modifyConfigs(props);
             for (int i = 0; i < props.length(); i++) {
-                props.apply(i).putAll(clusterConfig.perBrokerOverrideProperties().getOrDefault(i, Collections.emptyMap()));
+                props.apply(i).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(i, Collections.emptyMap()));
             }
         }
 
diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java b/core/src/test/java/kafka/testkit/BrokerNode.java
index baa75e7..7c49761 100644
--- a/core/src/test/java/kafka/testkit/BrokerNode.java
+++ b/core/src/test/java/kafka/testkit/BrokerNode.java
@@ -87,10 +87,10 @@
             Objects.requireNonNull(baseDirectory);
             Objects.requireNonNull(clusterId);
             if (id == -1) {
-                throw new RuntimeException("You must set the node id.");
+                throw new IllegalArgumentException("You must set the node id.");
             }
             if (numLogDirectories < 1) {
-                throw new RuntimeException("The value of numLogDirectories should be at least 1.");
+                throw new IllegalArgumentException("The value of numLogDirectories should be at least 1.");
             }
             List<String> logDataDirectories = IntStream
                 .range(0, numLogDirectories)
diff --git a/core/src/test/java/kafka/testkit/BrokerNodeTest.java b/core/src/test/java/kafka/testkit/BrokerNodeTest.java
index 24918f8..12d7fd3 100644
--- a/core/src/test/java/kafka/testkit/BrokerNodeTest.java
+++ b/core/src/test/java/kafka/testkit/BrokerNodeTest.java
@@ -26,13 +26,13 @@
     @Test
     public void testInvalidBuilder() {
         Assertions.assertEquals("You must set the node id.",
-                Assertions.assertThrows(RuntimeException.class, () -> BrokerNode.builder()
+                Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
                         .setBaseDirectory("foo")
                         .setClusterId(Uuid.randomUuid())
                         .build()).getMessage());
 
         Assertions.assertEquals("The value of numLogDirectories should be at least 1.",
-                Assertions.assertThrows(RuntimeException.class, () -> BrokerNode.builder()
+                Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
                         .setBaseDirectory("foo")
                         .setClusterId(Uuid.randomUuid())
                         .setId(0)
diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java b/core/src/test/java/kafka/testkit/ControllerNode.java
index 1a440e4..cbb1c09 100644
--- a/core/src/test/java/kafka/testkit/ControllerNode.java
+++ b/core/src/test/java/kafka/testkit/ControllerNode.java
@@ -23,6 +23,9 @@
 import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -36,6 +39,7 @@
         private String baseDirectory;
         private Uuid clusterId;
         private boolean combined;
+        private Map<String, String> propertyOverrides = Collections.emptyMap();
 
         private Builder() {}
 
@@ -63,12 +67,17 @@
             return this;
         }
 
+        public Builder setPropertyOverrides(Map<String, String> propertyOverrides) {
+            this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides));
+            return this;
+        }
+
         public ControllerNode build() {
             if (id == -1) {
-                throw new RuntimeException("You must set the node id.");
+                throw new IllegalArgumentException("You must set the node id.");
             }
             if (baseDirectory == null) {
-                throw new RuntimeException("You must set the base directory.");
+                throw new IllegalArgumentException("You must set the base directory.");
             }
             String metadataDirectory = new File(baseDirectory,
                 combined ? String.format("combined_%d_0", id) : String.format("controller_%d", id)).getAbsolutePath();
@@ -81,7 +90,7 @@
                 setNodeId(id).
                 setDirectoryId(copier.generateValidDirectoryId()).
                 build());
-            return new ControllerNode(copier.copy(), combined);
+            return new ControllerNode(copier.copy(), combined, propertyOverrides);
         }
     }
 
@@ -89,12 +98,16 @@
 
     private final boolean combined;
 
+    private final Map<String, String> propertyOverrides;
+
     private ControllerNode(
         MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
-        boolean combined
+        boolean combined,
+        Map<String, String> propertyOverrides
     ) {
         this.initialMetaPropertiesEnsemble = Objects.requireNonNull(initialMetaPropertiesEnsemble);
         this.combined = combined;
+        this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
     }
 
     @Override
@@ -106,4 +119,8 @@
     public boolean combined() {
         return combined;
     }
+
+    public Map<String, String> propertyOverrides() {
+        return propertyOverrides;
+    }
 }
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 16673b9..4d34ce0 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -209,6 +209,10 @@
             if (brokerNode != null) {
                 props.putAll(brokerNode.propertyOverrides());
             }
+            // Add associated controller node property overrides
+            if (controllerNode != null) {
+                props.putAll(controllerNode.propertyOverrides());
+            }
             props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true");
             props.putIfAbsent(KafkaConfig$.MODULE$.UnstableApiVersionsEnableProp(), "true");
             return new KafkaConfig(props, false, Option.empty());
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
index 3fbf9f9..1987c1d 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
@@ -23,7 +23,10 @@
 
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -35,7 +38,7 @@
     @ParameterizedTest
     @ValueSource(ints = {0, -1})
     public void testCreateClusterWithBadNumDisksThrows(int disks) {
-        RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
+        IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
                 new TestKitNodes.Builder()
                         .setNumBrokerNodes(1)
                         .setNumDisksPerBroker(disks)
@@ -47,7 +50,7 @@
 
     @Test
     public void testCreateClusterWithBadNumOfControllers() {
-        RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
+        IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
             new TestKitNodes.Builder()
                 .setNumBrokerNodes(1)
                 .setNumControllerNodes(-1)
@@ -58,7 +61,7 @@
 
     @Test
     public void testCreateClusterWithBadNumOfBrokers() {
-        RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
+        IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
             new TestKitNodes.Builder()
                 .setNumBrokerNodes(-1)
                 .setNumControllerNodes(1)
@@ -67,6 +70,22 @@
         assertEquals("Invalid negative value for numBrokerNodes", e.getMessage());
     }
 
+    @Test
+    public void testCreateClusterWithBadPerServerProperties() {
+        Map<Integer, Map<String, String>> perServerProperties = new HashMap<>();
+        perServerProperties.put(100, Collections.singletonMap("foo", "foo1"));
+        perServerProperties.put(200, Collections.singletonMap("bar", "bar1"));
+
+        IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
+                new TestKitNodes.Builder()
+                        .setNumBrokerNodes(1)
+                        .setNumControllerNodes(1)
+                        .setPerServerProperties(perServerProperties)
+                        .build())
+        );
+        assertEquals("Unknown server id 100, 200 in perServerProperties, the existent server ids are 0, 3000", e.getMessage());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 8cc1a23..df486a1 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -32,15 +32,19 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 public class TestKitNodes {
+    public static final int CONTROLLER_ID_OFFSET = 3000;
+    public static final int BROKER_ID_OFFSET = 0;
+
     public static class Builder {
         private boolean combined;
         private Uuid clusterId;
         private int numControllerNodes;
         private int numBrokerNodes;
         private int numDisksPerBroker = 1;
-        private Map<Integer, Map<String, String>> perBrokerProperties = Collections.emptyMap();
+        private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
             fromVersion(MetadataVersion.latestTesting(), "testkit");
 
@@ -79,22 +83,22 @@
             return this;
         }
 
-        public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) {
-            this.perBrokerProperties = Collections.unmodifiableMap(
-                perBrokerProperties.entrySet().stream()
+        public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
+            this.perServerProperties = Collections.unmodifiableMap(
+                perServerProperties.entrySet().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
             return this;
         }
 
         public TestKitNodes build() {
             if (numControllerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for numControllerNodes");
+                throw new IllegalArgumentException("Invalid negative value for numControllerNodes");
             }
             if (numBrokerNodes < 0) {
-                throw new RuntimeException("Invalid negative value for numBrokerNodes");
+                throw new IllegalArgumentException("Invalid negative value for numBrokerNodes");
             }
             if (numDisksPerBroker <= 0) {
-                throw new RuntimeException("Invalid value for numDisksPerBroker");
+                throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
             }
 
             String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
@@ -102,13 +106,28 @@
                 clusterId = Uuid.randomUuid();
             }
 
-            List<Integer> controllerNodeIds = IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
+            int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;
+            List<Integer> controllerNodeIds = IntStream.range(controllerId, controllerId + numControllerNodes)
                 .boxed()
                 .collect(Collectors.toList());
-            List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes)
+            List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, BROKER_ID_OFFSET + numBrokerNodes)
                 .boxed()
                 .collect(Collectors.toList());
 
+            String unknownIds = perServerProperties.keySet().stream()
+                    .filter(id -> !controllerNodeIds.contains(id))
+                    .filter(id -> !brokerNodeIds.contains(id))
+                    .map(Object::toString)
+                    .collect(Collectors.joining(", "));
+            if (!unknownIds.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format("Unknown server id %s in perServerProperties, the existent server ids are %s",
+                                unknownIds,
+                                Stream.concat(brokerNodeIds.stream(), controllerNodeIds.stream())
+                                        .map(Object::toString)
+                                        .collect(Collectors.joining(", "))));
+            }
+
             TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
             for (int id : controllerNodeIds) {
                 ControllerNode controllerNode = ControllerNode.builder()
@@ -116,6 +135,7 @@
                     .setBaseDirectory(baseDirectory)
                     .setClusterId(clusterId)
                     .setCombined(brokerNodeIds.contains(id))
+                    .setPropertyOverrides(perServerProperties.getOrDefault(id, Collections.emptyMap()))
                     .build();
                 controllerNodes.put(id, controllerNode);
             }
@@ -128,7 +148,7 @@
                     .setBaseDirectory(baseDirectory)
                     .setClusterId(clusterId)
                     .setCombined(controllerNodeIds.contains(id))
-                    .setPropertyOverrides(perBrokerProperties.getOrDefault(id, Collections.emptyMap()))
+                    .setPropertyOverrides(perServerProperties.getOrDefault(id, Collections.emptyMap()))
                     .build();
                 brokerNodes.put(id, brokerNode);
             }
@@ -139,17 +159,6 @@
                 controllerNodes,
                 brokerNodes);
         }
-
-        private int startBrokerId() {
-            return 0;
-        }
-
-        private int startControllerId() {
-            if (combined) {
-                return startBrokerId();
-            }
-            return startBrokerId() + 3000;
-        }
     }
 
     private final String baseDirectory;
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 77f7816..0ed7002 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -49,8 +49,8 @@
       .setBrokers(3)
       .setAutoStart(false)
       .setServerProperties(serverProperties)
-      .setPerBrokerProperties(perBrokerProperties)
-      .build());
+      .setPerServerProperties(perBrokerProperties)
+      .build())
   }
 }
 
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index e7c2d81..67606c3 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -402,7 +402,7 @@
     val nodes = new TestKitNodes.Builder()
       .setNumControllerNodes(1)
       .setNumBrokerNodes(3)
-      .setPerBrokerProperties(brokerPropertyOverrides)
+      .setPerServerProperties(brokerPropertyOverrides)
       .build()
 
     doOnStartedKafkaCluster(nodes) { implicit cluster =>
@@ -430,7 +430,7 @@
       .setNumControllerNodes(1)
       .setNumBrokerNodes(3)
       .setNumDisksPerBroker(1)
-      .setPerBrokerProperties(brokerPropertyOverrides)
+      .setPerServerProperties(brokerPropertyOverrides)
       .build()
 
     doOnStartedKafkaCluster(nodes) { implicit cluster =>
diff --git a/docs/connect.html b/docs/connect.html
index c068798..449f066 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -291,6 +291,7 @@
         <li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
         <li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
         <li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
+        <li><code>PATCH /connectors/{name}/config</code> - patch the configuration parameters for a specific connector, where <code>null</code> values in the JSON body indicates removing of the key from the final configuration</li>
         <li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
         <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
         <li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
diff --git a/docs/security.html b/docs/security.html
index 895f2b0..5a49ce6 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -2219,6 +2219,48 @@
             <td>Group</td>
             <td></td>
         </tr>
+        <tr>
+            <td>CONSUMER_GROUP_DESCRIBE (69)</td>
+            <td>Read</td>
+            <td>Group</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>CONTROLLER_REGISTRATION (70)</td>
+            <td>ClusterAction</td>
+            <td>Cluster</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>GET_TELEMETRY_SUBSCRIPTIONS (71)</td>
+            <td></td>
+            <td></td>
+            <td>No authorization check is performed for this request.</td>
+        </tr>
+        <tr>
+            <td>PUSH_TELEMETRY (72)</td>
+            <td></td>
+            <td></td>
+            <td>No authorization check is performed for this request.</td>
+        </tr>
+        <tr>
+            <td>ASSIGN_REPLICAS_TO_DIRS (73)</td>
+            <td>ClusterAction</td>
+            <td>Cluster</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>LIST_CLIENT_METRICS_RESOURCES (74)</td>
+            <td>DescribeConfigs</td>
+            <td>Cluster</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DESCRIBE_TOPIC_PARTITIONS (75)</td>
+            <td>Describe</td>
+            <td>Topic</td>
+            <td></td>
+        </tr>
         </tbody>
     </table>
 
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 40b3594..eb951a3 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -75,8 +75,8 @@
               <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
               <li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
               <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
-              <li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner</a></li>
-              <li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner</a></li>
+              <li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner (deprecated) </a></li>
+              <li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner (deprecated) </a></li>
               <li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
               <li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
               <li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
@@ -91,6 +91,7 @@
               <li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
               <li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
               <li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
+              <li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
             </ul>
           </li>
           <li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
@@ -159,29 +160,29 @@
         <p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
         <table border="1" class="non-scrolling-table docutils">
           <thead valign="bottom">
-          <tr class="row-odd"><th class="head">Parameter Name</th>
+          <tr class="row-even"><th class="head">Parameter Name</th>
             <th class="head">Corresponding Client</th>
             <th class="head">Default value</th>
             <th class="head">Consider setting to</th>
           </tr>
           </thead>
           <tbody valign="top">
-          <tr class="row-even"><td>acks</td>
+          <tr class="row-odd"><td>acks</td>
             <td>Producer</td>
             <td><code class="docutils literal"><span class="pre">acks=1</span></code></td>
             <td><code class="docutils literal"><span class="pre">acks=all</span></code></td>
           </tr>
-          <tr class="row-odd"><td>replication.factor (for broker version 2.3 or older)/td>
+          <tr class="row-even"><td>replication.factor (for broker version 2.3 or older)/td>
             <td>Streams</td>
             <td><code class="docutils literal"><span class="pre">-1</span></code></td>
             <td><code class="docutils literal"><span class="pre">3</span></code></td>
           </tr>
-          <tr class="row-even"><td>min.insync.replicas</td>
+          <tr class="row-odd"><td>min.insync.replicas</td>
             <td>Broker</td>
             <td><code class="docutils literal"><span class="pre">1</span></code></td>
             <td><code class="docutils literal"><span class="pre">2</span></code></td>
           </tr>
-          <tr class="row-odd"><td>num.standby.replicas</td>
+          <tr class="row-even"><td>num.standby.replicas</td>
             <td>Streams</td>
             <td><code class="docutils literal"><span class="pre">0</span></code></td>
             <td><code class="docutils literal"><span class="pre">1</span></code></td>
@@ -241,24 +242,29 @@
           </tr>
           </thead>
           <tbody valign="top">
-          <tr class="row-odd"><td>acceptable.recovery.lag</td>
+          <tr class="row-even"><td>acceptable.recovery.lag</td>
             <td>Medium</td>
             <td colspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
             <td><code class="docutils literal"><span class="pre">10000</span></code></td>
           </tr>
-          <tr class="row-even"><td>application.server</td>
+          <tr class="row-odd"><td>application.server</td>
             <td>Low</td>
             <td colspan="2">A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of
               state stores within a single Kafka Streams application. The value of this must be different for each instance
               of the application.</td>
             <td>the empty string</td>
           </tr>
-          <tr class="row-odd"><td>buffered.records.per.partition</td>
+          <tr class="row-even"><td>buffered.records.per.partition</td>
             <td>Low</td>
             <td colspan="2">The maximum number of records to buffer per partition.</td>
             <td><code class="docutils literal"><span class="pre">1000</span></code></td>
           </tr>
-          <tr class="row-even"><td>cache.max.bytes.buffering</td>
+          <tr class="row-odd"><td>statestore.cache.max.bytes</td>
+            <td>Medium</td>
+            <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
+            <td>10485760</td>
+          </tr>
+          <tr class="row-even"><td>cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.)</td>
             <td>Medium</td>
             <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
             <td>10485760 bytes</td>
@@ -301,12 +307,12 @@
               set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
             <td><code class="docutils literal"><span class="pre">null</span></code></td>
           </tr>
-          <tr class="row-even"><td>default.windowed.key.serde.inner</td>
+          <tr class="row-even"><td>default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
             <td>Medium</td>
             <td colspan="2">Default serializer/deserializer for the inner class of windowed keys, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
             <td><code class="docutils literal"><span class="pre">null</span></code></td>
           </tr>
-          <tr class="row-odd"><td>default.windowed.value.serde.inner</td>
+          <tr class="row-odd"><td>default.windowed.value.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
             <td>Medium</td>
             <td colspan="2">Default serializer/deserializer for the inner class of windowed values, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
             <td><code class="docutils literal"><span class="pre">null</span></code></td>
@@ -327,8 +333,9 @@
               the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.
             </td>
             <td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
+            <td>null</td>
           </tr>
-          <tr class="row-even"><td>max.task.idle.ms</td>
+          <tr class="row-odd"><td>max.task.idle.ms</td>
             <td>Medium</td>
             <td colspan="2">
               <p>
@@ -347,37 +354,37 @@
             </td>
             <td>0 milliseconds</td>
           </tr>
-          <tr class="row-odd"><td>max.warmup.replicas</td>
+          <tr class="row-even"><td>max.warmup.replicas</td>
             <td>Medium</td>
             <td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
             <td><code class="docutils literal"><span class="pre">2</span></code></td>
           </tr>
-          <tr class="row-even"><td>metric.reporters</td>
+          <tr class="row-odd"><td>metric.reporters</td>
             <td>Low</td>
             <td colspan="2">A list of classes to use as metrics reporters.</td>
             <td>the empty list</td>
           </tr>
-          <tr class="row-odd"><td>metrics.num.samples</td>
+          <tr class="row-even"><td>metrics.num.samples</td>
             <td>Low</td>
             <td colspan="2">The number of samples maintained to compute metrics.</td>
             <td><code class="docutils literal"><span class="pre">2</span></code></td>
           </tr>
-          <tr class="row-even"><td>metrics.recording.level</td>
+          <tr class="row-odd"><td>metrics.recording.level</td>
             <td>Low</td>
             <td colspan="2">The highest recording level for metrics.</td>
             <td><code class="docutils literal"><span class="pre">INFO</span></code></td>
           </tr>
-          <tr class="row-odd"><td>metrics.sample.window.ms</td>
+          <tr class="row-even"><td>metrics.sample.window.ms</td>
             <td>Low</td>
             <td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
             <td>30000 milliseconds (30 seconds)</td>
           </tr>
-          <tr class="row-even"><td>num.standby.replicas</td>
+          <tr class="row-odd"><td>num.standby.replicas</td>
             <td>High</td>
             <td colspan="2">The number of standby replicas for each task.</td>
             <td><code class="docutils literal"><span class="pre">0</span></code></td>
           </tr>
-          <tr class="row-odd"><td>num.stream.threads</td>
+          <tr class="row-even"><td>num.stream.threads</td>
             <td>Medium</td>
             <td colspan="2">The number of threads to execute stream processing.</td>
             <td><code class="docutils literal"><span class="pre">1</span></code></td>
@@ -406,7 +413,7 @@
               clients with different tag values.</td>
             <td>the empty list</td>
           </tr>
-          <tr class="row-even"><td>replication.factor</td>
+          <tr class="row-odd"><td>replication.factor</td>
             <td>Medium</td>
             <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
               The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
@@ -432,22 +439,22 @@
             <td colspan="2">Directory location for state stores.</td>
             <td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
           </tr>
-          <tr class="row-odd"><td>task.timeout.ms</td>
+          <tr class="row-even"><td>task.timeout.ms</td>
             <td>Medium</td>
             <td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
             <td>300000 milliseconds (5 minutes)</td>
           </tr>
-          <tr class="row-even"><td>topology.optimization</td>
+          <tr class="row-odd"><td>topology.optimization</td>
             <td>Medium</td>
-            <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)). </td>
-            <td><code>NO_OPTIMIZATION</code></td>
+            <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)).</td>
+            <td><code> NO_OPTIMIZATION</code></td>
           </tr>
-          <tr class="row-odd"><td>upgrade.from</td>
+          <tr class="row-even"><td>upgrade.from</td>
             <td>Medium</td>
             <td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
             <td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
           </tr>
-          <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+          <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
             <td>Low</td>
             <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
             <td>86400000 milliseconds (1 day)</td>
@@ -676,7 +683,7 @@
             </div></blockquote>
         </div>
         <div class="section" id="default-windowed-key-serde-inner">
-          <h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a></h4>
+          <h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a> (Deprecated.)</h4>
           <blockquote>
             <div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
               whenever data needs to be materialized, for example:</p>
@@ -689,7 +696,7 @@
             </div></blockquote>
         </div>
         <div class="section" id="default-windowed-value-serde-inner">
-          <h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a></h4>
+          <h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a>(Deprecated.)</h4>
           <blockquote>
             <div><p>The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens
               happens whenever data needs to be materialized, for example:</p>
@@ -1029,6 +1036,18 @@
           </p>
         </div></blockquote>
     </div>
+    <div class="section" id="windowed.inner.class.serde">
+      <h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
+      <blockquote>
+        <div>
+          <p>
+            Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
+          </p>
+          <p>
+            Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
+          </p>
+        </div></blockquote>
+    </div>
     <div class="section" id="upgrade-from">
       <span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
       <blockquote>
@@ -1120,7 +1139,7 @@
           <td>Consumer</td>
           <td><code class="docutils literal"><span class="pre">1000</span></code></td>
         </tr>
-        <tr class="row-even">
+        <tr class="row-odd">
           <td>client.id</td>
           <td>-</td>
           <td><code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code></td>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 44c064f..3831346 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -437,7 +437,7 @@
      * If you enable this feature Kafka Streams will use more resources (like broker connections)
      * compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}.
      *
-     * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
+     * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
      */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
@@ -450,7 +450,7 @@
      * If you enable this feature Kafka Streams will use fewer resources (like broker connections)
      * compared to the {@link #EXACTLY_ONCE} (deprecated) case.
      *
-     * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
+     * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
      */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
@@ -499,7 +499,8 @@
     public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
     private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
 
-    /** {@code cache.max.bytes.buffering} */
+    /** {@code cache.max.bytes.buffering}
+     * @deprecated since 3.4.0 Use {@link #STATESTORE_CACHE_MAX_BYTES_CONFIG "statestore.cache.max.bytes"} instead. */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
@@ -571,14 +572,16 @@
     static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
     static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
 
-    /** {@code default.windowed.key.serde.inner} */
+    /** {@code default.windowed.key.serde.inner
+     * @deprecated since 3.0.0  Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
     public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
     private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " +
         "<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
-    /** {@code default.windowed.value.serde.inner} */
+    /** {@code default.windowed.value.serde.inner
+     * @deprecated since 3.0.0  Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
     public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
@@ -589,7 +592,7 @@
     private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
         "<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
         "in an error as it is meant to be used only from Plain consumer client.";
-
+        
     /** {@code default key.serde} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
@@ -654,7 +657,8 @@
     @SuppressWarnings("WeakerAccess")
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
-    /** {@code auto.include.jmx.reporter} */
+    /** {@code auto.include.jmx.reporter
+     * @deprecated and will removed in 4.0.0 Use {@link JMX_REPORTER "jmx.reporter"} instead.} */
     @Deprecated
     public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index ab88b89..c24b9a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -17,13 +17,14 @@
 package org.apache.kafka.streams.processor.api;
 
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Punctuator;
 
 public interface RecordMetadata {
     /**
      * Return the topic name of the current input record; could be {@code null} if it is not
      * available.
      *
-     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
      * punctuation callback}, or while processing a record that was forwarded by a punctuation
      * callback, the record won't have an associated topic.
      * Another example is
@@ -39,7 +40,7 @@
      * Return the partition id of the current input record; could be {@code -1} if it is not
      * available.
      *
-     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
      * punctuation callback}, or while processing a record that was forwarded by a punctuation
      * callback, the record won't have an associated partition id.
      * Another example is
@@ -55,7 +56,7 @@
      * Return the offset of the current input record; could be {@code -1} if it is not
      * available.
      *
-     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
      * punctuation callback}, or while processing a record that was forwarded by a punctuation
      * callback, the record won't have an associated offset.
      * Another example is
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java
new file mode 100644
index 0000000..e78bc9d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
+
+/**
+ * A read-only metadata class representing the state of the application and the current rebalance.
+ * This class wraps all the input parameters to the task assignment, including the current state
+ * of each KafkaStreams client with at least one StreamThread participating in this rebalance, the
+ * assignment-related configs, and the tasks to be assigned.
+ */
+public interface ApplicationState {
+    /**
+     * @param computeTaskLags whether to include task lag information in the returned metadata. Note that passing
+     * in "true" will result in a remote call to fetch changelog topic end offsets, and you should pass in "false" unless
+     * you specifically need the task lag information.
+     *
+     * @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
+     *
+     * @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
+     *                                 this exception to have Kafka Streams retry the rebalance by returning the same
+     *                                 assignment and scheduling an immediate followup rebalance
+     */
+    Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);
+
+    /**
+     * @return a simple container class with the Streams configs relevant to assignment
+     */
+    AssignmentConfigs assignmentConfigs();
+
+    /**
+     * @return the set of all tasks in this topology which must be assigned
+     */
+    Set<TaskId> allTasks();
+
+    /**
+     *
+     * @return the set of stateful and changelogged tasks in this topology
+     */
+    Set<TaskId> statefulTasks();
+
+    /**
+     *
+     * @return the set of stateless or changelog-less tasks in this topology
+     */
+    Set<TaskId> statelessTasks();
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
new file mode 100644
index 0000000..5fbb860
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.streams.StreamsConfig;
+
+/**
+ * Assignment related configs for the Kafka Streams {@link TaskAssignor}.
+ */
+public class AssignmentConfigs {
+    private final long acceptableRecoveryLag;
+    private final int maxWarmupReplicas;
+    private final int numStandbyReplicas;
+    private final long probingRebalanceIntervalMs;
+    private final List<String> rackAwareAssignmentTags;
+    private final int rackAwareTrafficCost;
+    private final int rackAwareNonOverlapCost;
+    private final String rackAwareAssignmentStrategy;
+
+    public AssignmentConfigs(final StreamsConfig configs) {
+        this(
+            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
+            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
+            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
+            configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
+            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
+            configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
+            configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
+            configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+        );
+    }
+
+    public AssignmentConfigs(final long acceptableRecoveryLag,
+                             final int maxWarmupReplicas,
+                             final int numStandbyReplicas,
+                             final long probingRebalanceIntervalMs,
+                             final List<String> rackAwareAssignmentTags,
+                             final int rackAwareTrafficCost,
+                             final int rackAwareNonOverlapCost,
+                             final String rackAwareAssignmentStrategy
+    ) {
+        this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
+        this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
+        this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
+        this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
+        this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
+        this.rackAwareTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+            rackAwareTrafficCost
+        );
+        this.rackAwareNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+            rackAwareNonOverlapCost
+        );
+        this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+            rackAwareAssignmentStrategy
+        );
+    }
+
+    /**
+     * The configured acceptable recovery lag according to
+     * {@link StreamsConfig#ACCEPTABLE_RECOVERY_LAG_CONFIG}
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * The maximum warmup replicas as configured via
+     * {@link StreamsConfig#MAX_WARMUP_REPLICAS_CONFIG}
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * The number of standby replicas as configured via
+     * {@link StreamsConfig#NUM_STANDBY_REPLICAS_CONFIG}
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * The probing rebalance interval in milliseconds as configured via
+     * {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS_CONFIG}
+     */
+    public long probingRebalanceIntervalMs() {
+        return probingRebalanceIntervalMs;
+    }
+
+    /**
+     * The rack-aware assignment tags as configured via
+     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TAGS_CONFIG}
+     */
+    public List<String> rackAwareAssignmentTags() {
+        return rackAwareAssignmentTags;
+    }
+
+    /**
+     * The rack-aware assignment traffic cost as configured via
+     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG}
+     */
+    public int rackAwareTrafficCost() {
+        return rackAwareTrafficCost;
+    }
+
+    /**
+     * The rack-aware assignment non-overlap cost as configured via
+     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG}
+     */
+    public int rackAwareNonOverlapCost() {
+        return rackAwareNonOverlapCost;
+    }
+
+    /**
+     * The rack-aware assignment strategy as configured via
+     * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG}
+     */
+    public String rackAwareAssignmentStrategy() {
+        return rackAwareAssignmentStrategy;
+    }
+
+    private static <T> T validated(final String configKey, final T value) {
+        final ConfigDef.Validator validator = StreamsConfig.configDef().configKeys().get(configKey).validator;
+        if (validator != null) {
+            validator.ensureValid(configKey, value);
+        }
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
new file mode 100644
index 0000000..f0c8fcf
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.time.Instant;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple interface for the assignor to return the desired placement of active and standby tasks on
+ * KafkaStreams clients.
+ */
+public interface KafkaStreamsAssignment {
+    /**
+     *
+     * @return the {@code ProcessID} associated with this {@code KafkaStreamsAssignment}
+     */
+    ProcessId processId();
+
+    /**
+     *
+     * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment}
+     */
+    Set<AssignedTask> assignment();
+
+    /**
+     * @return the followup rebalance deadline in epoch time, after which this KafkaStreams
+     * client will trigger a new rebalance
+     */
+    Instant followupRebalanceDeadline();
+
+    class AssignedTask {
+        private final TaskId id;
+        private final Type taskType;
+
+        public AssignedTask(final TaskId id, final Type taskType) {
+            this.id = id;
+            this.taskType = taskType;
+        }
+
+        public enum Type {
+            ACTIVE,
+            STANDBY
+        }
+
+        /**
+         *
+         * @return the id of the {@code AssignedTask}
+         */
+        public TaskId id() {
+            return id;
+        }
+
+        /**
+         *
+         * @return the type of the {@code AssignedTask}
+         */
+        public Type type() {
+            return taskType;
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java
new file mode 100644
index 0000000..002ca36
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+
+/**
+ * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
+ */
+public interface KafkaStreamsState {
+    /**
+     * @return the processId of the application instance running on this KafkaStreams client
+     */
+    ProcessId processId();
+
+    /**
+     * Returns the number of processing threads available to work on tasks for this KafkaStreams client,
+     * which represents its overall capacity for work relative to other KafkaStreams clients.
+     *
+     * @return the number of processing threads on this KafkaStreams client
+     */
+    int numProcessingThreads();
+
+    /**
+     * @return the set of consumer client ids for this KafkaStreams client
+     */
+    SortedSet<String> consumerClientIds();
+
+    /**
+     * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousActiveTasks();
+
+    /**
+     * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousStandbyTasks();
+
+    /**
+     * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
+     * did not have any state for this task on disk.
+     *
+     * @return end offset sum - offset sum
+     *                    Task.LATEST_OFFSET if this was previously an active running task on this client
+     *
+     * @throws UnsupportedOperationException if the user did not request task lags be computed.
+     */
+    long lagFor(final TaskId task);
+
+    /**
+     * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
+     *
+     * @throws UnsupportedOperationException if the user did not request task lags be computed.
+     */
+    SortedSet<TaskId> prevTasksByLag(final String consumerClientId);
+
+    /**
+     * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
+     * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
+     * and the current offset, summed across all logged state stores in the task.
+     *
+     * @return a map from all stateful tasks to their lag sum
+     *
+     * @throws UnsupportedOperationException if the user did not request task lags be computed.
+     */
+    Map<TaskId, Long> statefulTasksToLagSums();
+
+    /**
+     * The {@link HostInfo} of this KafkaStreams client, if set via the
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
+     *
+     * @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
+     */
+    Optional<HostInfo> hostInfo();
+
+    /**
+     * The client tags for this KafkaStreams client, if set any have been via configs using the
+     * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
+     * <p>
+     * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
+     *
+     * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
+     */
+    Map<String, String> clientTags();
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
new file mode 100644
index 0000000..dfc430c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import org.apache.kafka.common.protocol.types.Field.UUID;
+
+/** A simple wrapper around UUID that abstracts a Process ID */
+public class ProcessId {
+
+    private final UUID id;
+
+    public ProcessId(final UUID id) {
+        this.id = id;
+    }
+
+    /**
+     *
+     * @return the underlying {@code UUID} that this ProcessID is wrapping.
+     */
+    public UUID id() {
+        return id;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
new file mode 100644
index 0000000..3945f95
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A set of utilities to help implement task assignment via the {@link TaskAssignor}
+ */
+public final class TaskAssignmentUtils {
+    /**
+     * Assign standby tasks to KafkaStreams clients according to the default logic.
+     * <p>
+     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
+     *
+     * @param applicationState        the metadata and other info describing the current application state
+     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
+     *         standby assignment
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> defaultStandbyTaskAssignment(
+        final ApplicationState applicationState,
+        final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
+    ) {
+        throw new UnsupportedOperationException("Not Implemented.");
+    }
+
+    /**
+     * Optimize the active task assignment for rack-awareness
+     *
+     * @param applicationState        the metadata and other info describing the current application state
+     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
+     * @param tasks                   the set of tasks to reassign if possible. Must already be assigned
+     *                                to a KafkaStreams client
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
+     *         rack-aware assignment for active tasks
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(
+        final ApplicationState applicationState,
+        final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
+        final SortedSet<TaskId> tasks
+    ) {
+        throw new UnsupportedOperationException("Not Implemented.");
+    }
+
+    /**
+     * Optimize the standby task assignment for rack-awareness
+     *
+     * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
+     * @param applicationState        the metadata and other info describing the current application state
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
+     *         rack-aware assignment for standby tasks
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(
+        final ApplicationState applicationState,
+        final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
+    ) {
+        throw new UnsupportedOperationException("Not Implemented.");
+    }
+
+    /**
+     * Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
+     *
+     * @param applicationState the metadata and other info describing the current application state
+     *
+     * @return a new map containing an assignment that replicates exactly the previous assignment reported
+     *         in the applicationState
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
+        final ApplicationState applicationState
+    ) {
+        throw new UnsupportedOperationException("Not Implemented.");
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java
new file mode 100644
index 0000000..38a9a03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Collection;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupAssignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
+
+/**
+ * A TaskAssignor is responsible for creating a TaskAssignment from a given
+ * {@code ApplicationState}.
+ * The implementation may also override the {@code onAssignmentComputed} callback for insight into
+ * the result of the assignment result.
+ */
+public interface TaskAssignor extends Configurable {
+
+    /**
+     * NONE: no error detected
+     * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
+     * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
+     * INVALID_STANDBY_TASK: stateless task assigned as a standby task
+     * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
+     * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
+     */
+    enum AssignmentError {
+        NONE,
+        ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
+        ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
+        INVALID_STANDBY_TASK,
+        UNKNOWN_PROCESS_ID,
+        UNKNOWN_TASK_ID
+    }
+
+    /**
+     * @param applicationState the metadata for this Kafka Streams application
+     *
+     * @return the assignment of active and standby tasks to KafkaStreams clients
+     *
+     * @throws TaskAssignmentException If an error occurs during assignment, and you wish for the rebalance to be retried,
+     *                                 you can throw this exception to keep the assignment unchanged and automatically
+     *                                 schedule an immediate followup rebalance.
+     */
+    TaskAssignment assign(ApplicationState applicationState);
+
+    /**
+     * This callback can be used to observe the final assignment returned to the brokers and check for any errors that
+     * were detected while processing the returned assignment. If any errors were found, the corresponding
+     * will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
+     * be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
+     * has registered a {@link StreamsUncaughtExceptionHandler}.
+     *
+     * @param assignment:   the final assignment returned to the kafka broker
+     * @param subscription: the original subscription passed into the assignor
+     * @param error:        the corresponding error type if one was detected while processing the returned assignment,
+     *                      or AssignmentError.NONE if the returned assignment was valid
+     */
+    default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}
+
+    /**
+     * Wrapper class for the final assignment of active and standbys tasks to individual
+     * KafkaStreams clients.
+     */
+    class TaskAssignment {
+        private final Collection<KafkaStreamsAssignment> assignment;
+
+        public TaskAssignment(final Collection<KafkaStreamsAssignment> assignment) {
+            this.assignment = assignment;
+        }
+
+        /**
+         * @return the assignment of tasks to kafka streams clients.
+         */
+        public Collection<KafkaStreamsAssignment> assignment() {
+            return assignment;
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 39e429a..3a7680d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -110,6 +110,13 @@
             .collect(Collectors.toSet());
     }
 
+    public Set<String> missingSourceTopics() {
+        return missingInputTopicsBySubtopology.entrySet().stream()
+                .map(entry -> entry.getValue())
+                .flatMap(missingTopicSet -> missingTopicSet.stream())
+                .collect(Collectors.toSet());
+    }
+
     public Queue<StreamsException> missingSourceTopicExceptions() {
         return missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
             final Set<String> missingSourceTopics = entry.getValue();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index fb4e45c..7d1632e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -523,7 +523,9 @@
         final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-                throw new MissingSourceTopicException("Missing source topics.");
+                final String errorMsg = String.format("Missing source topics. %s", repartitionTopics.missingSourceTopics());
+                log.error(errorMsg);
+                throw new MissingSourceTopicException(errorMsg);
             } else {
                 nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index ba2883b..1e73103 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -53,7 +53,9 @@
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
+            log.error("Received error code {}. {}",
+                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.codeName(),
+                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.description());
             taskManager.handleRebalanceComplete();
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
         } else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 2104144..5bf36ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -19,20 +19,31 @@
 public enum AssignorError {
     // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed
     // to throw an exception upon an unrecognized error code.
-    NONE(0),
-    INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-    VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
-    ASSIGNMENT_ERROR(3),
-    SHUTDOWN_REQUESTED(4);
+    NONE(0, "NONE", "NONE"),
+    INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA", "Missing metadata for source topics. Check the group leader logs for details."),
+    VERSION_PROBING(2, "VERSION_PROBING", "Could not read internal rebalance metadata due to unknown encoding version."), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
+    ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Internal task assignment error. Check the group leader logs for details."),
+    SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED", "A KafkaStreams instance encountered a fatal error and requested a shutdown for the entire application.");
 
     private final int code;
+    private final String codeName;
+    private final String description;
 
-    AssignorError(final int code) {
+    AssignorError(final int code, final String codeName, final String description) {
         this.code = code;
+        this.codeName = codeName;
+        this.description = description;
     }
 
     public int code() {
         return code;
     }
 
+    public String codeName() {
+        return codeName;
+    }
+    public String description() {
+        return description;
+    }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index dea8e0e..f8b50f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -66,18 +66,13 @@
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.IMocksControl;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.MockitoSession;
-import org.mockito.quality.Strictness;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.File;
 import java.io.IOException;
@@ -107,7 +102,6 @@
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
-import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
 import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
@@ -128,12 +122,18 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StreamTaskTest {
 
     private static final String APPLICATION_ID = "stream-task-test";
@@ -188,11 +188,11 @@
     private StreamTask task;
     private long punctuatedAt;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
-    @Mock(type = MockType.NICE)
+    @Mock
     private RecordCollector recordCollector;
-    @Mock(type = MockType.NICE)
+    @Mock
     private ThreadCache cache;
 
     private final Punctuator punctuator = new Punctuator() {
@@ -201,7 +201,6 @@
             punctuatedAt = timestamp;
         }
     };
-    private MockitoSession mockito;
 
     private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
                                                            final Map<String, SourceNode<?, ?>> sourcesByTopic,
@@ -265,16 +264,14 @@
 
     @Before
     public void setup() {
-        mockito = Mockito.mockitoSession()
-            .initMocks(this)
-            .strictness(Strictness.STRICT_STUBS)
-            .startMocking();
-        EasyMock.expect(stateManager.taskId()).andStubReturn(taskId);
-        EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
 
         consumer.assign(asList(partition1, partition2));
         consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
         stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), true, false);
+        // Unless we initialise a lock on the state directory we cannot unlock it successfully during teardown
+        stateDirectory.initializeProcessId();
     }
 
     @After
@@ -292,18 +289,16 @@
             task.closeDirty();
             task = null;
         }
+        stateDirectory.close();
         Utils.delete(BASE_DIR);
-        mockito.finishMocking();
     }
 
     @Test
-    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateDirectory, stateManager);
+    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         task = createStatefulTask(createConfig("100"), false);
 
@@ -312,55 +307,43 @@
 
     @Test
     public void shouldNotAttemptToLockIfNoStores() {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.replay(stateDirectory);
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
 
         task = createStatelessTask(createConfig("100"));
 
         task.initializeIfNeeded();
 
         // should fail if lock is called
-        EasyMock.verify(stateDirectory);
+        verify(stateDirectory, never()).lock(any());
     }
 
     @Test
-    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException {
-        final IMocksControl ctrl = EasyMock.createNiceControl();
-        final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class);
-        EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
+    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() {
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
 
-        stateManager.registerGlobalStateStores(emptyList());
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(stateManager.taskId()).andReturn(taskId);
-
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        EasyMock.expectLastCall();
-
-        stateManager.transitionTaskState(SUSPENDED);
-        EasyMock.expectLastCall();
-
-        stateManager.transitionTaskState(CLOSED);
-        EasyMock.expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
-        stateDirectory.unlock(taskId);
-        EasyMock.expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
         task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "100"), true, stateManager);
         task.suspend();
         task.closeDirty();
         task = null;
 
-        ctrl.verify();
+        inOrder.verify(stateManager).taskType();
+        inOrder.verify(stateManager).registerGlobalStateStores(emptyList());
+        inOrder.verify(stateManager).taskId();
+        inOrder.verify(stateDirectory).lock(taskId);
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateManager).baseDir();
+        inOrder.verify(stateDirectory).unlock(taskId);
     }
 
     @Test
@@ -374,17 +357,16 @@
         consumer.seek(partition1, 10L);
         consumer.seek(partition2, 15L);
 
+        @SuppressWarnings("unchecked")
         final java.util.function.Consumer<Set<TopicPartition>> resetter =
-            EasyMock.mock(java.util.function.Consumer.class);
-        resetter.accept(Collections.emptySet());
-        EasyMock.expectLastCall();
-        EasyMock.replay(resetter);
+            mock(java.util.function.Consumer.class);
 
         task.initializeIfNeeded();
         task.completeRestoration(resetter);
 
         assertThat(consumer.position(partition1), equalTo(5L));
         assertThat(consumer.position(partition2), equalTo(15L));
+        verify(resetter).accept(Collections.emptySet());
     }
 
     @Test
@@ -411,25 +393,25 @@
 
         shouldNotSeek.set(new AssertionError("Should not seek"));
 
-        final java.util.function.Consumer<Set<TopicPartition>> resetter =
-            EasyMock.mock(java.util.function.Consumer.class);
-        resetter.accept(Collections.singleton(partition1));
-        EasyMock.expectLastCall();
-        EasyMock.replay(resetter);
+        // We need to keep a separate reference to the arguments of Consumer#accept
+        // because the underlying data-structure is emptied and on verification time
+        // it is reported as empty.
+        final Set<TopicPartition> partitionsAtCall = new HashSet<>();
 
         task.initializeIfNeeded();
-        task.completeRestoration(resetter);
+        task.completeRestoration(partitionsAtCall::addAll);
 
         // because we mocked the `resetter` positions don't change
         assertThat(consumer.position(partition1), equalTo(5L));
         assertThat(consumer.position(partition2), equalTo(15L));
-        EasyMock.verify(resetter);
+        assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1)));
     }
 
     @Test
     public void shouldReadCommittedStreamTimeAndProcessorMetadataOnInitialize() {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.replay(stateDirectory);
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
 
         final ProcessorMetadata processorMetadata = new ProcessorMetadata(mkMap(
             mkEntry("key1", 1L),
@@ -453,8 +435,9 @@
 
     @Test
     public void shouldReadCommittedStreamTimeAndMergeProcessorMetadataOnInitialize() {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.replay(stateDirectory);
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
 
         final ProcessorMetadata processorMetadata1 = new ProcessorMetadata(mkMap(
             mkEntry("key1", 1L),
@@ -493,15 +476,12 @@
     }
 
     @Test
-    public void shouldTransitToRestoringThenRunningAfterCreation() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(changelogPartition, 10L));
-        stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
-        EasyMock.expectLastCall();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateDirectory, stateManager, recordCollector);
+    public void shouldTransitToRestoringThenRunningAfterCreation() {
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.changelogOffsets()).thenReturn(singletonMap(changelogPartition, 10L));
 
         task = createStatefulTask(createConfig("100"), true);
 
@@ -523,8 +503,6 @@
         assertEquals(RUNNING, task.state());
         assertTrue(source1.initialized);
         assertTrue(source2.initialized);
-
-        EasyMock.verify(stateDirectory);
     }
 
     @Test
@@ -727,8 +705,6 @@
             }
         };
 
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
-
         task = createStatelessTaskWithForwardingTopology(evenKeyForwardingSourceNode);
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
@@ -1445,6 +1421,7 @@
         assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
     }
 
+    @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(createConfig("100"));
         task.initializeIfNeeded();
@@ -1577,11 +1554,6 @@
 
     @Test
     public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createFaultyStatefulTask(createConfig("100"));
 
         task.initializeIfNeeded();
@@ -1604,13 +1576,11 @@
     }
 
     @Test
-    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
-
-        EasyMock.replay(recordCollector, stateDirectory, stateManager);
+    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() {
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         task = createDisconnectedTask(createConfig("100"));
 
@@ -1620,15 +1590,11 @@
     }
 
     @Test
-    public void shouldReInitializeTopologyWhenResuming() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap())
-            .andThrow(new AssertionError("Should not try to read offsets")).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-
-        EasyMock.replay(recordCollector, stateDirectory, stateManager);
+    public void shouldReInitializeTopologyWhenResuming() {
+        // Clean up state directory created as part of setup
+        stateDirectory.close();
+        stateDirectory = mock(StateDirectory.class);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         task = createStatefulTask(createConfig("100"), true);
 
@@ -1652,81 +1618,64 @@
         assertTrue(source1.initialized);
         assertTrue(source2.initialized);
 
-        EasyMock.verify(stateManager, recordCollector);
-
-        EasyMock.reset(recordCollector);
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
-        EasyMock.replay(recordCollector);
         assertThat("Map did not contain the partition", task.highWaterMark().containsKey(partition1));
+
+        verify(recordCollector).offsets();
     }
 
     @Test
     public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         final Long offset = 543L;
 
-        EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
-            .andReturn(singletonMap(changelogPartition, 10L))
-            .andReturn(singletonMap(changelogPartition, 20L));
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager, recordCollector);
+        when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset));
+        when(stateManager.changelogOffsets())
+            .thenReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
+            .thenReturn(singletonMap(changelogPartition, 10L))
+            .thenReturn(singletonMap(changelogPartition, 20L));
 
         task = createStatefulTask(createConfig("100"), true);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
         task.prepareCommit();
-        task.postCommit(true);   // should checkpoint
+        task.postCommit(true); // should checkpoint
 
         task.prepareCommit();
-        task.postCommit(false);   // should not checkpoint
+        task.postCommit(false); // should not checkpoint
 
-        EasyMock.verify(stateManager, recordCollector);
         assertThat("Map was empty", task.highWaterMark().size() == 2);
+
+        verify(stateManager, times(2)).checkpoint();
     }
 
     @Test
     public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
         final Long offset = 543L;
 
-        EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().times(2);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
-        EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(singletonMap(changelogPartition, 0L))
-            .andReturn(singletonMap(changelogPartition, 10L))
-            .andReturn(singletonMap(changelogPartition, 12000L));
-        stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager, recordCollector);
+        when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset));
+        when(stateManager.changelogOffsets())
+            .thenReturn(singletonMap(changelogPartition, 0L))
+            .thenReturn(singletonMap(changelogPartition, 10L))
+            .thenReturn(singletonMap(changelogPartition, 12000L));
 
         task = createStatefulTask(createConfig("100"), true);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should checkpoint
         task.prepareCommit();
-        task.postCommit(true);
+        task.postCommit(true); // should checkpoint
 
         task.prepareCommit();
-        task.postCommit(false);
+        task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold
 
-        EasyMock.verify(recordCollector);
         assertThat("Map was empty", task.highWaterMark().size() == 2);
+
+        verify(stateManager, times(3)).checkpoint();
     }
 
     @Test
     public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
-        stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
-        EasyMock.expectLastCall();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "100"), true);
 
         task.initializeIfNeeded();
@@ -1792,14 +1741,6 @@
 
     @Test
     public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expectLastCall();
-        stateManager.close();
-        EasyMock.expectLastCall();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createFaultyStatefulTask(createConfig("100"));
 
         task.initializeIfNeeded();
@@ -1807,8 +1748,7 @@
 
         assertThrows(RuntimeException.class, () -> task.suspend());
         task.closeDirty();
-
-        EasyMock.verify(stateManager);
+        verify(stateManager).close();
     }
 
     @Test
@@ -1823,11 +1763,6 @@
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
-        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
             taskId,
@@ -1875,9 +1810,6 @@
 
     @Test
     public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1));
-        EasyMock.replay(stateManager);
-
         final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
@@ -1912,42 +1844,31 @@
 
     @Test
     public void shouldSkipCheckpointingSuspendedCreatedTask() {
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createStatefulTask(createConfig("100"), true);
         task.suspend();
         task.postCommit(true);
+
+        verify(stateManager, never()).checkpoint();
     }
 
     @Test
     public void shouldCheckpointForSuspendedTask() {
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, 1L));
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L));
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         task.suspend();
         task.postCommit(true);
-        EasyMock.verify(stateManager);
+
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, 0L)) // restoration checkpoint
-                .andReturn(singletonMap(partition1, 1L))
-                .andReturn(singletonMap(partition1, 2L));
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once(); // checkpoint should only be called once
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets())
+                .thenReturn(singletonMap(partition1, 0L)) // restoration checkpoint
+                .thenReturn(singletonMap(partition1, 1L))
+                .thenReturn(singletonMap(partition1, 2L));
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
@@ -1958,64 +1879,58 @@
 
         task.suspend();
         task.postCommit(false);
-        EasyMock.verify(stateManager);
+
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, 0L))
-                .andReturn(singletonMap(partition1, 12000L))
-                .andReturn(singletonMap(partition1, 24000L));
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().times(2);
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets())
+                .thenReturn(singletonMap(partition1, 0L))
+                .thenReturn(singletonMap(partition1, 12000L))
+                .thenReturn(singletonMap(partition1, 24000L));
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
         task.prepareCommit();
-        task.postCommit(false);
+        task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold
 
         task.suspend();
-        task.postCommit(false);
-        EasyMock.verify(stateManager);
+        task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold
+
+        verify(stateManager, times(3)).checkpoint();
     }
 
     @Test
     public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspendedRunningTask() {
         final Map<TopicPartition, Long> checkpointableOffsets = singletonMap(partition1, 1L);
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        stateManager.updateChangelogOffsets(EasyMock.eq(checkpointableOffsets));
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.emptyMap()) // restoration checkpoint
-                .andReturn(checkpointableOffsets);
-        EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).times(2);
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets())
+                .thenReturn(Collections.emptyMap()) // restoration checkpoint
+                .thenReturn(checkpointableOffsets);
+        when(recordCollector.offsets()).thenReturn(checkpointableOffsets);
 
         task = createStatefulTask(createConfig(), true);
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should checkpoint
         task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 10)));
         task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 10)));
         task.process(100L);
         assertTrue(task.commitNeeded());
 
         task.suspend();
-        task.postCommit(true);
-        EasyMock.verify(stateManager, recordCollector);
+        task.postCommit(true); // should checkpoint
+
+        verify(stateManager, times(2)).checkpoint();
+        verify(stateManager, times(2)).updateChangelogOffsets(checkpointableOffsets);
+        verify(recordCollector, times(2)).offsets();
     }
 
     @Test
     public void shouldReturnStateManagerChangelogOffsets() {
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(partition1, 50L)).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1)).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 50L));
+        when(stateManager.changelogPartitions()).thenReturn(singleton(partition1));
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
@@ -2030,13 +1945,6 @@
 
     @Test
     public void shouldNotCheckpointOnCloseCreated() {
-        stateManager.flush();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
@@ -2048,66 +1956,47 @@
         assertFalse(source1.initialized);
         assertFalse(source1.closed);
 
-        EasyMock.verify(stateManager, recordCollector);
-
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+
+        verify(stateManager, never()).flush();
+        verify(stateManager, never()).checkpoint();
     }
 
     @Test
     public void shouldCheckpointOnCloseRestoringIfNoProgress() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should flush and checkpoint
         task.suspend();
         task.prepareCommit();
-        task.postCommit(true);
+        task.postCommit(true); // should flush and checkpoint
         task.closeClean();
 
         assertEquals(Task.State.CLOSED, task.state());
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, times(2)).flush();
+        verify(stateManager, times(2)).checkpoint();
     }
 
     @Test
     public void shouldAlwaysCheckpointStateIfEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        EasyMock.expect(recordCollector.offsets()).andStubReturn(Collections.emptyMap());
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.initializeIfNeeded();
         task.maybeCheckpoint(true);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition1, 50L))
-                .andReturn(Collections.singletonMap(partition1, 11000L))
-                .andReturn(Collections.singletonMap(partition1, 12000L));
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets())
+                .thenReturn(Collections.singletonMap(partition1, 50L))
+                .thenReturn(Collections.singletonMap(partition1, 11000L))
+                .thenReturn(Collections.singletonMap(partition1, 12000L));
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
@@ -2119,7 +2008,8 @@
         task.maybeCheckpoint(false);  // this should not checkpoint
         assertEquals(Collections.singletonMap(partition1, 11000L), task.offsetSnapshotSinceLastFlush);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
     }
 
     @Test
@@ -2127,15 +2017,10 @@
         final long offset = 543L;
         final long consumedOffset = 345L;
 
-        EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, offset + 10000L)) // restoration checkpoint
-                .andReturn(singletonMap(partition1, offset + 12000L));
-        EasyMock.replay(recordCollector, stateManager);
+        when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset));
+        when(stateManager.changelogOffsets())
+                .thenReturn(singletonMap(partition1, offset + 10000L)) // restoration checkpoint
+                .thenReturn(singletonMap(partition1, offset + 12000L));
 
         task = createOptimizedStatefulTask(createConfig(), consumer);
         task.initializeIfNeeded();
@@ -2151,26 +2036,20 @@
 
         assertEquals(SUSPENDED, task.state());
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldThrowExceptionOnCloseCleanError() {
         final long offset = 543L;
 
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
-        EasyMock.replay(recordCollector, stateManager);
+        when(stateManager.changelogOffsets()).thenReturn(singletonMap(changelogPartition, offset));
+        doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).close();
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
         task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
         task.process(100L);
@@ -2178,40 +2057,22 @@
 
         task.suspend();
         task.prepareCommit();
-        task.postCommit(true);
+        task.postCommit(true); // should checkpoint
         assertThrows(ProcessorStateException.class, () -> task.closeClean());
 
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(singleton(changelogPartition));
-        stateManager.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager);
+        verify(stateManager, times(2)).checkpoint();
+        verify(stateManager).close();
     }
 
     @Test
     public void shouldThrowOnCloseCleanFlushError() {
         final long offset = 543L;
 
-        stateManager.flush(); // restoration checkpoint
-        EasyMock.expectLastCall();
-        stateManager.checkpoint(); // checkpoint upon restoration
-        EasyMock.expectLastCall();
-        EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset));
-        stateManager.flushCache();
-        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.flush();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(recordCollector, stateManager);
+        when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset));
+        doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).flushCache();
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
@@ -2229,24 +2090,16 @@
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
+        verify(stateManager, never()).close();
     }
 
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
         final long offset = 54300L;
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, offset));
-        EasyMock.replay(recordCollector, stateManager);
+        doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).checkpoint();
+        when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, offset));
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
@@ -2265,36 +2118,22 @@
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager);
+        verify(stateManager, never()).close();
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
 
         task.suspend();
-        task.closeDirty();
-
-        EasyMock.verify(stateManager);
+        assertDoesNotThrow(() -> task.closeDirty());
     }
 
     @Test
     public void shouldUnregisterMetricsInCloseClean() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
@@ -2305,10 +2144,6 @@
 
     @Test
     public void shouldUnregisterMetricsInCloseDirty() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
@@ -2319,12 +2154,6 @@
 
     @Test
     public void shouldUnregisterMetricsAndCloseInPrepareRecycle() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        stateManager.recycle();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
@@ -2332,19 +2161,18 @@
         task.prepareRecycle();
         assertThat(getTaskMetrics(), empty());
         assertThat(task.state(), is(Task.State.CLOSED));
+
+        verify(stateManager).recycle();
     }
 
     @Test
     public void shouldFlushStateManagerAndRecordCollector() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        recordCollector.flush();
-        EasyMock.expectLastCall().once();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createStatefulTask(createConfig("100"), false);
 
         task.flush();
+
+        verify(stateManager).flushCache();
+        verify(recordCollector).flush();
     }
 
     @Test
@@ -2367,10 +2195,6 @@
 
     @Test
     public void closeShouldBeIdempotent() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
 
         task.suspend();
@@ -2379,9 +2203,6 @@
         // close calls are idempotent since we are already in closed
         task.closeClean();
         task.closeDirty();
-
-        EasyMock.reset(stateManager);
-        EasyMock.replay(stateManager);
     }
 
     @Test
@@ -2427,14 +2248,6 @@
 
     @Test
     public void shouldPrepareRecycleSuspendedTask() {
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        stateManager.recycle();
-        EasyMock.expectLastCall().once();
-        recordCollector.closeClean();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).once();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createStatefulTask(createConfig("100"), true);
         assertThrows(IllegalStateException.class, () -> task.prepareRecycle()); // CREATED
 
@@ -2448,12 +2261,12 @@
         task.prepareRecycle(); // SUSPENDED
         assertThat(task.state(), is(Task.State.CLOSED));
 
-        EasyMock.verify(stateManager, recordCollector);
+        verify(stateManager).recycle();
+        verify(recordCollector).closeClean();
     }
 
     @Test
     public void shouldAlwaysSuspendCreatedTasks() {
-        EasyMock.replay(stateManager);
         task = createStatefulTask(createConfig("100"), true);
         assertThat(task.state(), equalTo(CREATED));
         task.suspend();
@@ -2462,8 +2275,6 @@
 
     @Test
     public void shouldAlwaysSuspendRestoringTasks() {
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
         assertThat(task.state(), equalTo(RESTORING));
@@ -2473,9 +2284,6 @@
 
     @Test
     public void shouldAlwaysSuspendRunningTasks() {
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
         task = createFaultyStatefulTask(createConfig("100"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
@@ -2494,8 +2302,6 @@
                 null
         );
         final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        EasyMock.replay(stateManager);
 
         // The processor topology is missing the topics
         final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -2843,11 +2649,6 @@
         source1.addChild(processorStreamTime);
         source1.addChild(processorSystemTime);
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         final InternalProcessorContext context = new ProcessorContextImpl(
             taskId,
             config,
@@ -2885,11 +2686,6 @@
         source1.addChild(processorSystemTime);
         source2.addChild(processorSystemTime);
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         final InternalProcessorContext context = new ProcessorContextImpl(
             taskId,
             config,
@@ -2924,10 +2720,6 @@
 
         sourceNode.addChild(processorStreamTime);
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         final StreamsConfig config = createConfig();
 
         final InternalProcessorContext context = new ProcessorContextImpl(
@@ -2957,8 +2749,6 @@
     }
 
     private void createTimeoutTask(final String eosConfig) {
-        EasyMock.replay(stateManager);
-
         final ProcessorTopology topology = withSources(
             singletonList(timeoutSource),
             mkMap(mkEntry(topic1, timeoutSource))
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
index c145135..e9eb3cf 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
@@ -17,10 +17,8 @@
 package org.apache.kafka.tools.consumer.group;
 
 import kafka.test.ClusterInstance;
-import kafka.test.annotation.ClusterConfigProperty;
-import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
+import kafka.test.ClusterGenerator;
+import kafka.test.annotation.ClusterTemplate;
 import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
@@ -45,7 +43,6 @@
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -59,28 +56,21 @@
 
 @Disabled
 @Tag("integration")
-@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
-        @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-        @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-        @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")
-})
 @ExtendWith(ClusterTestExtensions.class)
 public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
     public static final String TOPIC_PREFIX = "foo.";
     public static final String GROUP_PREFIX = "test.group.";
     private final ClusterInstance clusterInstance;
 
-    private final Iterable<Map<String, Object>> consumerConfigs;
-
     DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) {
         this.clusterInstance = clusterInstance;
-        this.consumerConfigs = clusterInstance.isKRaftTest()
-                ? Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()),
-                Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()))
-                : Collections.singletonList(Collections.emptyMap());
     }
 
-    @ClusterTest
+    private static void generator(ClusterGenerator clusterGenerator) {
+        ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+    }
+
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsNonExistingGroup() {
         String group = "missing.group";
         String topic = "foo:1";
@@ -90,91 +80,91 @@
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             createTopic(topic);
             Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
-            testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
             removeTopic(topic);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             createTopic(topic);
             Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
-            testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
             removeTopic(topic);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
-            testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
-            testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             createTopic(topic);
             Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.NONE);
-            testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
             removeTopic(topic);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             createTopic(topic);
             Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.NONE);
-            testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
             removeTopic(topic);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
-            testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
         }
     }
 
-    @ClusterTest
+    @ClusterTemplate("generator")
     public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
-        for (Map<String, Object> consumerConfig: consumerConfigs) {
-            String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
-            String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
+        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
             Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
-            testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
+            testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
         }
     }
 
@@ -219,11 +209,11 @@
     }
     private void testWithConsumerGroup(String inputTopic,
                                        String inputGroup,
-                                       Map<String, Object> consumerConfig,
+                                       GroupProtocol groupProtocol,
                                        boolean isStable,
                                        Runnable validateRunnable) {
         produceRecord(inputTopic);
-        try (Consumer<byte[], byte[]> consumer = createConsumer(inputGroup, consumerConfig)) {
+        try (Consumer<byte[], byte[]> consumer = createConsumer(inputGroup, groupProtocol)) {
             consumer.subscribe(Collections.singletonList(inputTopic));
             ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
             Assertions.assertNotEquals(0, records.count());
@@ -253,9 +243,10 @@
         return new KafkaProducer<>(config);
     }
 
-    private Consumer<byte[], byte[]> createConsumer(String group, Map<String, Object> config) {
-        Map<String, Object> consumerConfig = new HashMap<>(config);
+    private Consumer<byte[], byte[]> createConsumer(String group, GroupProtocol groupProtocol) {
+        Map<String, Object> consumerConfig = new HashMap<>();
         consumerConfig.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+        consumerConfig.putIfAbsent(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, group);
         consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());