KAFKA-19666: Remove old restoration codepath from PauseResumeIntegrationTest [2/N] (#20463)

Clean up `PauseResumeIntegrationTest`

Reviewers: Lucas Brutschy <lucasbru@apache.org>
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index 29f5276..e0f080f 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -43,9 +43,8 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -122,7 +121,7 @@
         appId = safeUniqueTestName(testInfo);
     }
 
-    private Properties props(final boolean stateUpdaterEnabled) {
+    private Properties props() {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -134,7 +133,6 @@
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
-        properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
         return properties;
     }
 
@@ -151,10 +149,9 @@
         IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
 
@@ -176,10 +173,9 @@
         assertTopicSize(OUTPUT_STREAM_1, 10);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+    @Test
+    public void shouldAllowForTopologiesToStartPaused() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.pause();
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
@@ -197,11 +193,10 @@
         assertTopicSize(OUTPUT_STREAM_1, 5);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -233,11 +228,10 @@
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -270,11 +264,10 @@
         assertTopicSize(OUTPUT_STREAM_2, 5);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+        streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -301,19 +294,18 @@
         awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception {
+    @Test
+    public void pauseResumeShouldWorkAcrossInstances() throws Exception {
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.pause();
         kafkaStreams.start();
 
         waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
         assertTrue(kafkaStreams.isPaused());
 
-        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
         kafkaStreams2.pause();
         kafkaStreams2.start();
         waitForApplicationState(singletonList(kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT);
@@ -331,12 +323,11 @@
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception {
-        final Properties properties1 = props(stateUpdaterEnabled);
+    @Test
+    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+        final Properties properties1 = props();
         properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        final Properties properties2 = props(stateUpdaterEnabled);
+        final Properties properties2 = props();
         properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
@@ -380,8 +371,8 @@
         assertEquals(stateStoreLag1, stateStoreLag2);
     }
 
-    private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) {
-        return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
+    private KafkaStreams buildKafkaStreams(final String outputTopic) {
+        return buildKafkaStreams(outputTopic, props());
     }
 
     private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {