KAFKA-19666: Remove old restoration codepath from PauseResumeIntegrationTest [2/N] (#20463)
Clean up `PauseResumeIntegrationTest`
Reviewers: Lucas Brutschy <lucasbru@apache.org>
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index 29f5276..e0f080f 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -43,9 +43,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
@@ -122,7 +121,7 @@
appId = safeUniqueTestName(testInfo);
}
- private Properties props(final boolean stateUpdaterEnabled) {
+ private Properties props() {
final Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -134,7 +133,6 @@
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
- properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
return properties;
}
@@ -151,10 +149,9 @@
IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ @Test
+ public void shouldPauseAndResumeKafkaStreams() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
@@ -176,10 +173,9 @@
assertTopicSize(OUTPUT_STREAM_1, 10);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ @Test
+ public void shouldAllowForTopologiesToStartPaused() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
@@ -197,11 +193,10 @@
assertTopicSize(OUTPUT_STREAM_1, 5);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -233,11 +228,10 @@
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -270,11 +264,10 @@
assertTopicSize(OUTPUT_STREAM_2, 5);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+ streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -301,19 +294,18 @@
awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception {
+ @Test
+ public void pauseResumeShouldWorkAcrossInstances() throws Exception {
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams.isPaused());
- kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
kafkaStreams2.pause();
kafkaStreams2.start();
waitForApplicationState(singletonList(kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT);
@@ -331,12 +323,11 @@
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception {
- final Properties properties1 = props(stateUpdaterEnabled);
+ @Test
+ public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+ final Properties properties1 = props();
properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
- final Properties properties2 = props(stateUpdaterEnabled);
+ final Properties properties2 = props();
properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
@@ -380,8 +371,8 @@
assertEquals(stateStoreLag1, stateStoreLag2);
}
- private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) {
- return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
+ private KafkaStreams buildKafkaStreams(final String outputTopic) {
+ return buildKafkaStreams(outputTopic, props());
}
private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {