MINOR: Enable streams rebalance protocol in EosIntegrationTest (#20592)
Remove stalling instance in EOSIntegrationTest, since it doesn’t matter
what it thinks what the assignment is but blocks the test with streams
group protocol
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index d874333..46ace65 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -72,9 +72,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,6 +170,15 @@
private String stateTmpDir;
+ private static java.util.stream.Stream<Arguments> groupProtocolAndProcessingThreadsParameters() {
+ return java.util.stream.Stream.of(
+ Arguments.of("classic", true),
+ Arguments.of("classic", false),
+ Arguments.of("streams", true),
+ Arguments.of("streams", false)
+ );
+ }
+
@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
@@ -181,16 +191,19 @@
CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+ CLUSTER.setGroupStandbyReplicas(applicationId, 1);
}
- @Test
- public void shouldBeAbleToRunWithEosEnabled() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToRunWithEosEnabled(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
- @Test
- public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, groupProtocol);
try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())));
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(mkMap(
@@ -215,36 +228,42 @@
}
}
- @Test
- public void shouldBeAbleToRestartAfterClose() throws Exception {
- runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToRestartAfterClose(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
- @Test
- public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToCommitToMultiplePartitions(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
- @Test
- public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
- runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToCommitMultiplePartitionOffsets(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
- @Test
- public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToRunWithTwoSubtopologies(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
- @Test
- public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
- runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false);
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String groupProtocol) throws Exception {
+ runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
private void runSimpleCopyTest(final int numberOfRestarts,
final String inputTopic,
final String throughTopic,
final String outputTopic,
- final boolean inputTopicTransactional) throws Exception {
+ final boolean inputTopicTransactional,
+ final String groupProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input;
@@ -263,6 +282,7 @@
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_INTERVAL_MS - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+ properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
for (int i = 0; i < numberOfRestarts; ++i) {
final Properties config = StreamsTestUtils.getStreamsConfig(
@@ -326,8 +346,9 @@
return recordsPerKey;
}
- @Test
- public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
@@ -337,6 +358,7 @@
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,
@@ -374,8 +396,8 @@
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEnabled) throws Exception {
+ @MethodSource("groupProtocolAndProcessingThreadsParameters")
+ public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
@@ -386,7 +408,7 @@
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50 record got written)
- try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, processingThreadsEnabled)) {
+ try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -476,8 +498,8 @@
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingThreadsEnabled) throws Exception {
+ @MethodSource("groupProtocolAndProcessingThreadsParameters")
+ public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
@@ -493,7 +515,7 @@
// We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
// to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
- try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, processingThreadsEnabled)) {
+ try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -594,8 +616,8 @@
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final boolean processingThreadsEnabled) throws Exception {
+ @MethodSource("groupProtocolAndProcessingThreadsParameters")
+ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
//
@@ -607,10 +629,9 @@
//
// afterward, the "stalling" thread resumes, and another rebalance should get triggered
// we write the remaining 20 records and verify to read 60 result records
-
try (
- final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, processingThreadsEnabled);
- final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, processingThreadsEnabled)
+ final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, groupProtocol, processingThreadsEnabled);
+ final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, groupProtocol, processingThreadsEnabled)
) {
startApplicationAndWaitUntilRunning(streams1);
startApplicationAndWaitUntilRunning(streams2);
@@ -667,13 +688,10 @@
"Expected a host to start stalling"
);
final String observedStallingHost = stallingHost.get();
- final KafkaStreams stallingInstance;
final KafkaStreams remainingInstance;
if ("streams1".equals(observedStallingHost)) {
- stallingInstance = streams1;
remainingInstance = streams2;
} else if ("streams2".equals(observedStallingHost)) {
- stallingInstance = streams2;
remainingInstance = streams1;
} else {
throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
@@ -683,8 +701,7 @@
// the assignment is. We only really care that the remaining instance only sees one host
// that owns both partitions.
waitForCondition(
- () -> stallingInstance.metadataForAllStreamsClients().size() == 2
- && remainingInstance.metadataForAllStreamsClients().size() == 1
+ () -> remainingInstance.metadataForAllStreamsClients().size() == 1
&& remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
MAX_WAIT_TIME_MS,
() -> "Should have rebalanced.\n" +
@@ -755,12 +772,12 @@
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean processingThreadsEnabled) throws Exception {
+ @MethodSource("groupProtocolAndProcessingThreadsParameters")
+ public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
- try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, processingThreadsEnabled)) {
+ try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) {
writeInputData(writtenData);
startApplicationAndWaitUntilRunning(streams);
@@ -787,9 +804,9 @@
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
- final boolean processingThreadsEnabled) throws Exception {
+ final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -801,6 +818,7 @@
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
final String stateStoreName = "stateStore";
@@ -934,8 +952,13 @@
static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>();
static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
- @Test
- public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String groupProtocol) throws Exception {
+ // Reset static variables to ensure test isolation
+ TASK_WITH_DATA.set(null);
+ DID_REVOKE_IDLE_TASK.set(false);
+
final AtomicBoolean requestCommit = new AtomicBoolean(false);
final StreamsBuilder builder = new StreamsBuilder();
@@ -970,6 +993,7 @@
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), Integer.MAX_VALUE);
properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TestTaskAssignor.class.getName());
+ properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,
@@ -1003,9 +1027,9 @@
// add second thread, to trigger rebalance
// expect idle task to get revoked -- this should not trigger a TX commit
streams.addStreamThread();
-
- waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
-
+ if (groupProtocol.equals("classic")) {
+ waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
+ }
// best-effort sanity check (might pass and not detect issue in slow environments)
try {
readResult(SINGLE_PARTITION_OUTPUT_TOPIC, 1, "consumer", 10_000L);
@@ -1104,6 +1128,7 @@
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
+ final String groupProtocol,
final boolean processingThreadsEnabled) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
@@ -1212,6 +1237,7 @@
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
+ properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,