MINOR: Enable streams rebalance protocol in EosIntegrationTest (#20592)

Remove stalling instance in EOSIntegrationTest, since it doesn’t matter
what it thinks what the assignment is but blocks the test with streams
group protocol

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index d874333..46ace65 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -72,9 +72,10 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,6 +170,15 @@
 
     private String stateTmpDir;
 
+    private static java.util.stream.Stream<Arguments> groupProtocolAndProcessingThreadsParameters() {
+        return java.util.stream.Stream.of(
+            Arguments.of("classic", true),
+            Arguments.of("classic", false),
+            Arguments.of("streams", true),
+            Arguments.of("streams", false)
+        );
+    }
+
     @BeforeEach
     public void createTopics() throws Exception {
         applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
@@ -181,16 +191,19 @@
         CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
         CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
         CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+        CLUSTER.setGroupStandbyReplicas(applicationId, 1);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToRunWithEosEnabled(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
-    @Test
-    public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, groupProtocol);
 
         try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())));
              final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(mkMap(
@@ -215,36 +228,42 @@
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
-        runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToRestartAfterClose(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
-    @Test
-    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToCommitToMultiplePartitions(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
-    @Test
-    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
-        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToCommitMultiplePartitionOffsets(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToRunWithTwoSubtopologies(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
-        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false);
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String groupProtocol) throws Exception {
+        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
     }
 
     private void runSimpleCopyTest(final int numberOfRestarts,
                                    final String inputTopic,
                                    final String throughTopic,
                                    final String outputTopic,
-                                   final boolean inputTopicTransactional) throws Exception {
+                                   final boolean inputTopicTransactional,
+                                   final String groupProtocol) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, Long> input = builder.stream(inputTopic);
         KStream<Long, Long> output = input;
@@ -263,6 +282,7 @@
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_INTERVAL_MS - 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
             final Properties config = StreamsTestUtils.getStreamsConfig(
@@ -326,8 +346,9 @@
         return recordsPerKey;
     }
 
-    @Test
-    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
@@ -337,6 +358,7 @@
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
             applicationId,
@@ -374,8 +396,8 @@
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEnabled) throws Exception {
+    @MethodSource("groupProtocolAndProcessingThreadsParameters")
+    public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
 
         // this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to copy all 40 records into the output topic
@@ -386,7 +408,7 @@
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 record got written)
 
-        try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, processingThreadsEnabled)) {
+        try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
             startApplicationAndWaitUntilRunning(streams);
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -476,8 +498,8 @@
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingThreadsEnabled) throws Exception {
+    @MethodSource("groupProtocolAndProcessingThreadsParameters")
+    public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
 
         // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to emit all 40 update records into the output topic
@@ -493,7 +515,7 @@
 
         // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
         // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
-        try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, processingThreadsEnabled)) {
+        try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
             startApplicationAndWaitUntilRunning(streams);
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -594,8 +616,8 @@
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final boolean processingThreadsEnabled) throws Exception {
+    @MethodSource("groupProtocolAndProcessingThreadsParameters")
+    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
         // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
         // the app is supposed to copy all 60 records into the output topic
         //
@@ -607,10 +629,9 @@
         //
         // afterward, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
-
         try (
-            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, processingThreadsEnabled);
-            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, processingThreadsEnabled)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, groupProtocol, processingThreadsEnabled);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, groupProtocol, processingThreadsEnabled)
         ) {
             startApplicationAndWaitUntilRunning(streams1);
             startApplicationAndWaitUntilRunning(streams2);
@@ -667,13 +688,10 @@
                 "Expected a host to start stalling"
             );
             final String observedStallingHost = stallingHost.get();
-            final KafkaStreams stallingInstance;
             final KafkaStreams remainingInstance;
             if ("streams1".equals(observedStallingHost)) {
-                stallingInstance = streams1;
                 remainingInstance = streams2;
             } else if ("streams2".equals(observedStallingHost)) {
-                stallingInstance = streams2;
                 remainingInstance = streams1;
             } else {
                 throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
@@ -683,8 +701,7 @@
             // the assignment is. We only really care that the remaining instance only sees one host
             // that owns both partitions.
             waitForCondition(
-                () -> stallingInstance.metadataForAllStreamsClients().size() == 2
-                    && remainingInstance.metadataForAllStreamsClients().size() == 1
+                () -> remainingInstance.metadataForAllStreamsClients().size() == 1
                     && remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
                 MAX_WAIT_TIME_MS,
                 () -> "Should have rebalanced.\n" +
@@ -755,12 +772,12 @@
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean processingThreadsEnabled) throws Exception {
+    @MethodSource("groupProtocolAndProcessingThreadsParameters")
+    public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
         final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
         final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
 
-        try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, processingThreadsEnabled)) {
+        try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) {
             writeInputData(writtenData);
 
             startApplicationAndWaitUntilRunning(streams);
@@ -787,9 +804,9 @@
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @MethodSource("groupProtocolAndProcessingThreadsParameters")
     public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
-            final boolean processingThreadsEnabled) throws Exception {
+            final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -801,6 +818,7 @@
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
         streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
+        streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
         streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
         final String stateStoreName = "stateStore";
 
@@ -934,8 +952,13 @@
     static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>();
     static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
 
-    @Test
-    public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String groupProtocol) throws Exception {
+        // Reset static variables to ensure test isolation
+        TASK_WITH_DATA.set(null);
+        DID_REVOKE_IDLE_TASK.set(false);
+
         final AtomicBoolean requestCommit = new AtomicBoolean(false);
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -970,6 +993,7 @@
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
         properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), Integer.MAX_VALUE);
         properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TestTaskAssignor.class.getName());
+        properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
             applicationId,
@@ -1003,9 +1027,9 @@
             // add second thread, to trigger rebalance
             // expect idle task to get revoked -- this should not trigger a TX commit
             streams.addStreamThread();
-
-            waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
-
+            if (groupProtocol.equals("classic")) {
+                waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
+            }
             // best-effort sanity check (might pass and not detect issue in slow environments)
             try {
                 readResult(SINGLE_PARTITION_OUTPUT_TOPIC, 1, "consumer", 10_000L);
@@ -1104,6 +1128,7 @@
                                          final boolean withState,
                                          final String appDir,
                                          final int numberOfStreamsThreads,
+                                         final String groupProtocol,
                                          final boolean processingThreadsEnabled) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
@@ -1212,6 +1237,7 @@
         properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
         properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
+        properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
             applicationId,