KAFKA-15623 (1/N) Migrate streams tests module to JUnit 5 (#16254)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 4cc2c1a..e9b950e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -35,17 +35,20 @@
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.Random;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class AbstractStreamTest {
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
index e0c66ab..f56b9fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
@@ -22,8 +22,7 @@
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -32,7 +31,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class ChangedSerdeTest {
private static final String TOPIC = "some-topic";
@@ -107,7 +106,7 @@
buffer.position(serialized.length - 1);
buffer.put((byte) -1);
- Assert.assertThrows(
+ assertThrows(
StreamsException.class,
() -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index 59a922c..7744ba4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -18,8 +18,8 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -54,8 +54,8 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
public class CogroupedKStreamImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
@@ -80,7 +80,7 @@
private static final Initializer<Integer> SUM_INITIALIZER = () -> 0;
- @Before
+ @BeforeEach
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index e7e0c88..aed6c6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index cb0218b..e091d81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -29,14 +29,14 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class GlobalKTableJoinsTest {
@@ -47,7 +47,7 @@
private KStream<String, String> stream;
private KeyValueMapper<String, String, String> keyValueMapper;
- @Before
+ @BeforeEach
public void setUp() {
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
global = builder.globalTable(globalTopic, consumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 3517dfc6..f822cf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -53,7 +53,7 @@
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashSet;
@@ -68,12 +68,12 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
public class InternalStreamsBuilderTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 8076475..cbe51f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -44,8 +44,8 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
@@ -57,9 +57,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class KGroupedStreamImplTest {
@@ -70,7 +70,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @Before
+ @BeforeEach
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 3b5295c..ce96a70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -39,17 +39,17 @@
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class KGroupedTableImplTest {
@@ -59,7 +59,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
private final String topic = "input";
- @Before
+ @BeforeEach
public void before() {
groupedTable = builder
.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index d1b2477..c580971 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -28,12 +28,12 @@
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamBranchTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index c4e9de6..1065b4d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -27,11 +27,11 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamFilterTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 69bb32c..02fd466 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -30,7 +30,7 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -39,8 +39,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class KStreamFlatMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 413b628..a6fdaf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -28,14 +28,14 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Properties;
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class KStreamFlatMapValuesTest {
private final String topicName = "topic";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 9ea0863..0490c6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -23,13 +23,15 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.Arrays;
import java.util.Collections;
@@ -41,7 +43,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamFlatTransformTest {
private Number inputKey;
@@ -55,7 +58,7 @@
private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
- @Before
+ @BeforeEach
public void setUp() {
inputKey = 1;
inputValue = 10;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index 4b9e2f8..9f541ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -33,15 +33,18 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamFlatTransformValuesTest {
private Integer inputKey;
@@ -55,7 +58,7 @@
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
- @Before
+ @BeforeEach
public void setUp() {
inputKey = 1;
inputValue = 10;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index 35db245..8e1b9d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -27,7 +27,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,7 +35,7 @@
import java.util.Locale;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamForeachTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index f38c95b..9a5a0dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -35,9 +35,9 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -45,7 +45,7 @@
import java.util.Properties;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamGlobalKTableJoinTest {
private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -60,7 +60,7 @@
private MockApiProcessor<Integer, String, Void, Void> processor;
private StreamsBuilder builder;
- @Before
+ @BeforeEach
public void setUp() {
// use un-versioned store by default
init(Optional.empty());
@@ -104,7 +104,7 @@
inputTableTopic = driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer());
}
- @After
+ @AfterEach
public void cleanup() {
driver.close();
}
@@ -140,7 +140,7 @@
final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
- assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+ assertEquals(0, copartitionGroups.size(), "KStream-GlobalKTable joins do not need to be co-partitioned");
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index acd61d7..f933adc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -35,9 +35,9 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -45,7 +45,7 @@
import java.util.Properties;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamGlobalKTableLeftJoinTest {
private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -60,7 +60,7 @@
private TopologyTestDriver driver;
private StreamsBuilder builder;
- @Before
+ @BeforeEach
public void setUp() {
// use un-versioned store by default
init(Optional.empty());
@@ -104,7 +104,7 @@
inputTableTopic = driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
}
- @After
+ @AfterEach
public void cleanup() {
driver.close();
}
@@ -140,7 +140,7 @@
final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
- assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
+ assertEquals(0, copartitionGroups.size(), "KStream-GlobalKTable joins do not need to be co-partitioned");
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 81a2f1d..d51e01c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -77,8 +77,8 @@
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -105,10 +105,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNull.notNullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamImplTest {
@@ -203,7 +203,7 @@
private final Serde<String> mySerde = new Serdes.StringSerde();
- @Before
+ @BeforeEach
public void before() {
builder = new StreamsBuilder();
testStream = builder.stream("source");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
index b35a7b2..27bc3ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
@@ -36,8 +36,8 @@
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
@@ -45,7 +45,7 @@
import static java.time.Duration.ofHours;
import static java.time.Duration.ofMillis;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamImplValueJoinerWithKeyTest {
@@ -72,7 +72,7 @@
private final KeyValueMapper<String, Integer, String> keyValueMapper =
(k, v) -> k;
- @Before
+ @BeforeEach
public void setup() {
builder = new StreamsBuilder();
leftStream = builder.stream(leftTopic, Consumed.with(Serdes.String(), Serdes.Integer()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 3c602ee..d312543 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -61,7 +61,7 @@
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.time.Duration;
@@ -89,9 +89,9 @@
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamKStreamJoinTest {
private final String topic1 = "topic1";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 0db1f28..0645864 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -34,7 +34,7 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -46,7 +46,7 @@
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamKStreamLeftJoinTest {
private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index c836617..b985bf0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -44,7 +44,7 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -58,7 +58,7 @@
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java
index acd53b0..5cdd55e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoinTest.java
@@ -37,7 +37,7 @@
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public class KStreamKStreamSelfJoinTest {
private final String topic1 = "topic1";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index cfb40ee..eb4537a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -19,8 +19,8 @@
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Duration;
import java.time.Instant;
@@ -52,9 +52,9 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
public class KStreamKTableJoinTest {
private static final KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
@@ -70,7 +70,7 @@
private StreamsBuilder builder;
private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
- @Before
+ @BeforeEach
public void setUp() {
builder = new StreamsBuilder();
@@ -89,7 +89,7 @@
processor = supplier.theCapturedProcessor();
}
- @After
+ @AfterEach
public void cleanup() {
driver.close();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 5699274..059454c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -31,9 +31,9 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -44,7 +44,7 @@
import java.util.Random;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamKTableLeftJoinTest {
private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -59,7 +59,7 @@
private MockApiProcessor<Integer, String, Void, Void> processor;
private StreamsBuilder builder;
- @Before
+ @BeforeEach
public void setUp() {
builder = new StreamsBuilder();
@@ -80,7 +80,7 @@
processor = supplier.theCapturedProcessor();
}
- @After
+ @AfterEach
public void cleanup() {
driver.close();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index c171b87..bcbd64e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -29,7 +29,7 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -37,8 +37,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class KStreamMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index f3a773b..a904fa5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -28,11 +28,11 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Properties;
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class KStreamMapValuesTest {
private final String topicName = "topic";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index f04278b..3bbeba6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -27,14 +27,14 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
public class KStreamPeekTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 591b81a..7e7151f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -20,20 +20,23 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamPrintTest {
private ByteArrayOutputStream byteOutStream;
@@ -42,7 +45,7 @@
@Mock
private ProcessorContext<Void, Void> processorContext;
- @Before
+ @BeforeEach
public void setUp() {
byteOutStream = new ByteArrayOutputStream();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
index 9dfabea..1eeacba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
@@ -35,10 +35,12 @@
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.time.Duration;
import java.time.Instant;
@@ -50,9 +52,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -61,7 +63,8 @@
import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamRepartitionTest {
private final String inputTopic = "input-topic";
@@ -69,7 +72,7 @@
private StreamsBuilder builder;
- @Before
+ @BeforeEach
public void setUp() {
builder = new StreamsBuilder();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index b1e9d4b..6ec5c1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -27,7 +27,7 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -35,7 +35,7 @@
import java.util.Map;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamSelectKeyTest {
private final String topicName = "topic_key_select";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f72f089..52020e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -50,21 +50,17 @@
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
-import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -72,11 +68,10 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
public class KStreamSessionWindowAggregateProcessorTest {
private static final long GAP_MS = 5 * 60 * 1000L;
@@ -95,27 +90,14 @@
private KStreamSessionWindowAggregate<String, String, Long> sessionAggregator;
private Processor<String, String, Windowed<String>, Change<Long>> processor;
private SessionStore<String, Long> sessionStore;
-
- @Parameterized.Parameter
+
public EmitStrategy.StrategyType type;
- @Parameterized.Parameters(name = "{0}")
- public static Collection<Object[]> data() {
- return asList(new Object[][] {
- {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
- {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
- });
- }
-
private EmitStrategy emitStrategy;
private boolean emitFinal;
- @Before
- public void setup() {
- setup(true);
- }
-
- private void setup(final boolean enableCache) {
+ private void setup(final EmitStrategy.StrategyType inputType, final boolean enableCaching) {
+ type = inputType;
// Always process
final Properties prop = StreamsTestUtils.getStreamsConfig();
prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0);
@@ -159,7 +141,7 @@
context.setTime(0L);
TaskMetrics.droppedRecordsSensor(threadId, context.taskId().toString(), streamsMetrics);
- initStore(enableCache);
+ initStore(enableCaching);
processor.init(context);
}
@@ -182,14 +164,16 @@
sessionStore.init((StateStoreContext) context, sessionStore);
}
- @After
+ @AfterEach
public void closeStore() {
sessionStore.close();
processor.close();
}
- @Test
- public void shouldCreateSingleSessionWhenWithinGap() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldCreateSingleSessionWhenWithinGap(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
processor.process(new Record<>("john", "first", 0L));
processor.process(new Record<>("john", "second", 500L));
@@ -200,8 +184,10 @@
}
}
- @Test
- public void shouldMergeSessions() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldMergeSessions(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
final String sessionId = "mel";
processor.process(new Record<>(sessionId, "first", 0L));
assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
@@ -223,8 +209,10 @@
}
}
- @Test
- public void shouldUpdateSessionIfTheSameTime() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldUpdateSessionIfTheSameTime(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
processor.process(new Record<>("mel", "first", 0L));
processor.process(new Record<>("mel", "second", 0L));
try (final KeyValueIterator<Windowed<String>, Long> iterator =
@@ -234,8 +222,10 @@
}
}
- @Test
- public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
final String sessionId = "mel";
long now = 0;
processor.process(new Record<>(sessionId, "first", now));
@@ -284,8 +274,10 @@
}
}
- @Test
- public void shouldRemoveMergedSessionsFromStateStore() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldRemoveMergedSessionsFromStateStore(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
processor.process(new Record<>("a", "1", 0L));
// first ensure it is in the store
@@ -305,8 +297,10 @@
}
}
- @Test
- public void shouldHandleMultipleSessionsAndMerging() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldHandleMultipleSessionsAndMerging(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
processor.process(new Record<>("a", "1", 0L));
processor.process(new Record<>("b", "1", 0L));
processor.process(new Record<>("c", "1", 0L));
@@ -376,8 +370,10 @@
}
}
- @Test
- public void shouldGetAggregatedValuesFromValueGetter() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldGetAggregatedValuesFromValueGetter(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
getter.init(context);
processor.process(new Record<>("a", "1", 0L));
@@ -389,8 +385,10 @@
assertEquals(2L, t1);
}
- @Test
- public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldImmediatelyForwardNewSessionWhenNonCachedStore(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
if (emitFinal)
return;
@@ -420,8 +418,10 @@
);
}
- @Test
- public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldImmediatelyForwardRemovedSessionsWhenMerging(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, true);
if (emitFinal)
return;
@@ -449,9 +449,10 @@
);
}
- @Test
- public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics() {
- setup(false);
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, false);
context.setRecordContext(
new ProcessorRecordContext(-1, -2, -3, "topic", new RecordHeaders())
);
@@ -476,9 +477,10 @@
);
}
- @Test
- public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
- setup(false);
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)),
STORE_NAME,
@@ -542,9 +544,10 @@
);
}
- @Test
- public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
- setup(false);
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace(final EmitStrategy.StrategyType inputType) {
+ setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)),
STORE_NAME,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 0eed798..c947566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -57,15 +57,13 @@
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.Arguments;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -74,7 +72,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
-import org.junit.runners.Parameterized.Parameter;
+import java.util.stream.Stream;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
@@ -87,32 +85,25 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
public class KStreamSlidingWindowAggregateTest {
-
- @Parameterized.Parameters(name = "{0}_inorder:{1}_cache:{2}")
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] {
- {StrategyType.ON_WINDOW_UPDATE, true, true},
- {StrategyType.ON_WINDOW_UPDATE, true, false},
- {StrategyType.ON_WINDOW_UPDATE, false, true},
- {StrategyType.ON_WINDOW_UPDATE, false, false},
- {StrategyType.ON_WINDOW_CLOSE, true, true},
- {StrategyType.ON_WINDOW_CLOSE, true, false},
- {StrategyType.ON_WINDOW_CLOSE, false, true},
- {StrategyType.ON_WINDOW_CLOSE, false, false}
- });
+
+ public static Stream<Arguments> data() {
+ return Stream.of(
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false)
+ );
}
- @Parameter
public StrategyType type;
-
- @Parameter(1)
public boolean inOrderIterator;
-
- @Parameter(2)
public boolean withCache;
private boolean emitFinal;
@@ -120,15 +111,19 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
-
- @Before
- public void before() {
+
+ public void setup(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ type = inputType;
+ inOrderIterator = inputInOrderIterator;
+ withCache = inputWithCache;
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
emitStrategy = StrategyType.forType(type);
}
- @Test
- public void testAggregateSmallInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testAggregateSmallInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -223,8 +218,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testReduceSmallInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testReduceSmallInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier = setupWindowBytesStoreSupplier(1);
@@ -288,8 +285,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testAggregateLargeInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testAggregateLargeInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
final long grace = emitFinal ? 10L : 50L;
@@ -507,8 +506,10 @@
}
}
- @Test
- public void testJoin() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testJoin(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
final String topic2 = "topic2";
@@ -655,8 +656,10 @@
}
}
- @Test
- public void testEarlyRecordsSmallInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testEarlyRecordsSmallInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -777,8 +780,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testEarlyRecordsRepeatedInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testEarlyRecordsRepeatedInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -859,8 +864,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testEarlyRecordsLargeInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testEarlyRecordsLargeInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier = setupWindowBytesStoreSupplier(1);
@@ -1012,8 +1019,10 @@
}
}
- @Test
- public void testEarlyNoGracePeriodSmallInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testEarlyNoGracePeriodSmallInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -1110,8 +1119,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testNoGracePeriodSmallInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testNoGracePeriodSmallInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -1200,8 +1211,10 @@
assertEquals(expected, actual);
}
- @Test
- public void testEarlyNoGracePeriodLargeInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testEarlyNoGracePeriodLargeInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
@@ -1319,8 +1332,10 @@
}
}
- @Test
- public void testNoGracePeriodLargeInput() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testNoGracePeriodLargeInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final WindowBytesStoreSupplier storeSupplier =
@@ -1442,8 +1457,10 @@
}
}
- @Test
- public void shouldLogAndMeterWhenSkippingNullKey() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldLogAndMeterWhenSkippingNullKey(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -1473,8 +1490,10 @@
}
}
- @Test
- public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -1550,9 +1569,10 @@
}
}
- @Test
- public void testAggregateRandomInput() {
-
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testAggregateRandomInput(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) {
+ setup(inputType, inputInOrderIterator, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
final WindowBytesStoreSupplier storeSupplier =
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
index 29eaf1a..c85685e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
@@ -32,14 +32,14 @@
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamSplitTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 00b9d10..23235bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -32,13 +32,13 @@
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamTransformTest {
private static final String TOPIC_NAME = "topic";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 0063c65..eab907e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -34,24 +34,26 @@
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.mockito.Mockito.mock;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamTransformValuesTest {
private final String topicName = "topic";
private final MockProcessorSupplier<Integer, Integer, Void, Void> supplier = new MockProcessorSupplier<>();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
- @Mock
- private InternalProcessorContext context;
+ private InternalProcessorContext context = mock(InternalProcessorContext.class);
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 4562795..58b04d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -18,7 +18,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
+
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -63,15 +63,14 @@
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
-import org.junit.Before;
-import org.junit.Test;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.Arguments;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
@@ -83,44 +82,43 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
public class KStreamWindowAggregateTest {
private static final String WINDOW_STORE_NAME = "dummy-store-name";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
- @Parameter
public StrategyType type;
- @Parameter(1)
public boolean withCache;
private EmitStrategy emitStrategy;
private boolean emitFinal;
- @Parameterized.Parameters(name = "{0}_cache:{1}")
- public static Collection<Object[]> getEmitStrategy() {
- return asList(new Object[][] {
- {StrategyType.ON_WINDOW_UPDATE, true},
- {StrategyType.ON_WINDOW_UPDATE, false},
- {StrategyType.ON_WINDOW_CLOSE, true},
- {StrategyType.ON_WINDOW_CLOSE, false}
- });
+ public static Stream<Arguments> getEmitStrategy() {
+ return Stream.of(
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false)
+ );
}
-
- @Before
- public void before() {
+
+ public void setup(final StrategyType inputType, final boolean inputWithCache) {
+ type = inputType;
+ withCache = inputWithCache;
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
emitStrategy = StrategyType.forType(type);
}
- @Test
- public void testAggBasic() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void testAggBasic(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
@@ -213,8 +211,10 @@
}
}
- @Test
- public void testJoin() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void testJoin(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
final String topic2 = "topic2";
@@ -461,8 +461,10 @@
);
}
- @Test
- public void shouldLogAndMeterWhenSkippingNullKey() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldLogAndMeterWhenSkippingNullKey(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -487,8 +489,10 @@
}
}
- @Test
- public void shouldLogAndMeterWhenSkippingExpiredWindow() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldLogAndMeterWhenSkippingExpiredWindow(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -571,8 +575,10 @@
}
}
- @Test
- public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
@@ -626,8 +632,10 @@
}
}
- @Test
- public void shouldNotEmitFinalIfNotProgressEnough() throws IOException {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldNotEmitFinalIfNotProgressEnough(final StrategyType inputType, final boolean inputWithCache) throws IOException {
+ setup(inputType, inputWithCache);
final File stateDir = TestUtils.tempDirectory();
final long windowSize = 10L;
final Windows<TimeWindow> windows = TimeWindows.ofSizeAndGrace(ofMillis(windowSize), ofMillis(5)).advanceBy(ofMillis(5));
@@ -714,8 +722,10 @@
}
}
- @Test
- public void shouldEmitWithInterval0() throws IOException {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldEmitWithInterval0(final StrategyType inputType, final boolean inputWithCache) throws IOException {
+ setup(inputType, inputWithCache);
final File stateDir = TestUtils.tempDirectory();
final long windowSize = 10L;
final Windows<TimeWindow> windows = TimeWindows.ofSizeAndGrace(ofMillis(windowSize), ofMillis(5)).advanceBy(ofMillis(5));
@@ -781,8 +791,10 @@
}
}
- @Test
- public void shouldEmitWithLargeInterval() throws IOException {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldEmitWithLargeInterval(final StrategyType inputType, final boolean inputWithCache) throws IOException {
+ setup(inputType, inputWithCache);
final File stateDir = TestUtils.tempDirectory();
final long windowSize = 10L;
final Windows<TimeWindow> windows = TimeWindows.ofSizeAndGrace(ofMillis(windowSize), ofMillis(5)).advanceBy(ofMillis(5));
@@ -880,8 +892,10 @@
}
}
- @Test
- public void shouldEmitFromLastEmitTime() throws IOException {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void shouldEmitFromLastEmitTime(final StrategyType inputType, final boolean inputWithCache) throws IOException {
+ setup(inputType, inputWithCache);
final File stateDir = TestUtils.tempDirectory();
final long windowSize = 10L;
final Windows<TimeWindow> windows = TimeWindows.ofSizeAndGrace(ofMillis(windowSize), ofMillis(5)).advanceBy(ofMillis(5));
@@ -960,8 +974,10 @@
}
}
- @Test
- public void showThrowIfEmitFinalUsedWithUnlimitedWindow() {
+ @ParameterizedTest
+ @MethodSource("getEmitStrategy")
+ public void showThrowIfEmitFinalUsedWithUnlimitedWindow(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
if (emitFinal) {
final IllegalArgumentException e = assertThrows(
IllegalArgumentException.class, () -> new KStreamWindowAggregate<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index ab48892..63c743c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -45,8 +45,7 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -60,7 +59,9 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
public class KTableAggregateTest {
private final Serde<String> stringSerde = Serdes.String();
@@ -479,8 +480,8 @@
final NoEqualsImpl a = new NoEqualsImpl("1");
final NoEqualsImpl b = new NoEqualsImpl("1");
- Assert.assertNotEquals(a, b);
- Assert.assertNotSame(a, b);
+ assertNotEquals(a, b);
+ assertNotSame(a, b);
inputTopic.pipeInput(a, a, 8);
inputTopic.pipeInput(b, b, 9);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index cb39813..d744ee2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -41,8 +41,8 @@
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -51,15 +51,15 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
@SuppressWarnings("unchecked")
public class KTableFilterTest {
private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
- @Before
+ @BeforeEach
public void setUp() {
// disable caching at the config level
props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index dee1488..d379944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -52,10 +52,12 @@
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.lang.reflect.Field;
import java.util.List;
@@ -64,14 +66,15 @@
import static java.util.Arrays.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@SuppressWarnings("unchecked")
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KTableImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
@@ -81,7 +84,7 @@
private KTable<String, String> table;
- @Before
+ @BeforeEach
public void setUp() {
table = new StreamsBuilder().table("test");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index f85596c..2987b2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -35,9 +35,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
@@ -47,7 +45,6 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -57,9 +54,6 @@
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
- @Rule
- public TestName testName = new TestName();
-
@Test
public void shouldWorkWithDefaultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -240,7 +234,6 @@
private void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties();
- final String safeTestName = safeUniqueTestName(testName);
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index e0f5799..551417e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -37,7 +37,7 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -50,9 +50,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KTableKTableInnerJoinTest {
private static final KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index ee73849..a48fc5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -42,7 +42,7 @@
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -58,9 +58,9 @@
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KTableKTableLeftJoinTest {
private final String topic1 = "topic1";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 12e503b..c50a948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -35,7 +35,7 @@
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -48,9 +48,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KTableKTableOuterJoinTest {
private final String topic1 = "topic1";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index 9fcffac..8bce70b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -26,7 +26,7 @@
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogCaptureAppender.Event;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Properties;
import java.util.stream.Collectors;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 360c37e..8b348fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -28,13 +28,13 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class KTableMapKeysTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index a7858b6..d7e5487 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -35,7 +35,7 @@
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -44,9 +44,9 @@
import static java.util.Arrays.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
@SuppressWarnings("unchecked")
public class KTableMapValuesTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 89aa17a..53bb310 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -23,15 +23,15 @@
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.GenericInMemoryTimestampedKeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.Set;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class KTableReduceTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 2fc5834..2342a50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -40,8 +40,8 @@
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
@@ -52,9 +52,9 @@
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KTableSourceTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
@@ -91,7 +91,7 @@
supplier.theCapturedProcessor().processed());
}
- @Ignore // we have disabled KIP-557 until KAFKA-12508 can be properly addressed
+ @Disabled // we have disabled KIP-557 until KAFKA-12508 can be properly addressed
@Test
public void testKTableSourceEmitOnChange() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 3138c71..43e8f1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -48,12 +48,14 @@
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.ArrayList;
import java.util.Arrays;
@@ -65,15 +67,16 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableTransformValuesTest {
private static final String QUERYABLE_NAME = "queryable-store";
@@ -101,7 +104,7 @@
@Mock
private ValueTransformerWithKey<String, String, String> transformer;
- @After
+ @AfterEach
public void cleanup() {
if (driver != null) {
driver.close();
@@ -109,7 +112,7 @@
}
}
- @Before
+ @BeforeEach
public void setUp() {
capture = new MockProcessorSupplier<>();
builder = new StreamsBuilder();
@@ -335,7 +338,7 @@
new KeyValueTimestamp<>("B", "B->b!", 10),
new KeyValueTimestamp<>("D", "D->null!", 15)
));
- assertNull("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME));
+ assertNull(driver.getKeyValueStore(QUERYABLE_NAME), "Store should not be materialized");
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 6a9f872..c7f2676 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -33,10 +33,11 @@
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.Properties;
@@ -44,15 +45,15 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class MaterializedInternalTest {
- @Mock
- private InternalNameProvider nameProvider;
- @Mock
- private KeyValueBytesStoreSupplier supplier;
+ private InternalNameProvider nameProvider = mock(InternalNameProvider.class);
+ private KeyValueBytesStoreSupplier supplier = mock(KeyValueBytesStoreSupplier.class);
private final String prefix = "prefix";
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
index 1c4c700..fef5aa7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class NamedInternalTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index 87cf257..7373a77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -19,16 +19,19 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class SessionCacheFlushListenerTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
index df23ddd..ba7483d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class SessionWindowTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index eee7cc5..5415c46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -20,7 +20,7 @@
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
@@ -47,8 +47,8 @@
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
@SuppressWarnings("deprecation")
public class SessionWindowedCogroupedKStreamImplTest {
@@ -67,7 +67,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @Before
+ @BeforeEach
public void setup() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed
.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 7bd0636..5156721 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -46,14 +46,11 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -63,10 +60,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-@RunWith(Parameterized.class)
public class SessionWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
@@ -75,21 +71,12 @@
private SessionWindowedKStream<String, String> stream;
- @Parameterized.Parameter
public EmitStrategy.StrategyType type;
private boolean emitFinal;
- @Parameterized.Parameters(name = "{0}")
- public static Collection<Object[]> data() {
- return asList(new Object[][] {
- {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
- {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
- });
- }
-
- @Before
- public void before() {
+ public void setup(final EmitStrategy.StrategyType inputType) {
+ type = inputType;
final EmitStrategy emitStrategy = EmitStrategy.StrategyType.forType(type);
emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
@@ -99,14 +86,18 @@
.emitStrategy(emitStrategy);
}
- @Test
- public void shouldCountSessionWindowedWithCachingDisabled() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldCountSessionWindowedWithCachingDisabled(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
shouldCountSessionWindowed();
}
- @Test
- public void shouldCountSessionWindowedWithCachingEnabled() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldCountSessionWindowedWithCachingEnabled(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
shouldCountSessionWindowed();
}
@@ -146,8 +137,10 @@
}
}
- @Test
- public void shouldReduceWindowed() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldReduceWindowed(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.reduce(MockReducer.STRING_ADDER)
.toStream()
@@ -183,8 +176,10 @@
}
}
- @Test
- public void shouldAggregateSessionWindowed() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -222,8 +217,10 @@
}
}
- @Test
- public void shouldMaterializeCount() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldMaterializeCount(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
stream.count(Materialized.as("count-store"));
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -248,8 +245,10 @@
}
}
- @Test
- public void shouldMaterializeReduced() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldMaterializeReduced(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced"));
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -275,8 +274,10 @@
}
}
- @Test
- public void shouldMaterializeAggregated() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -305,28 +306,38 @@
}
}
- @Test
- public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(null, MockAggregator.TOSTRING_ADDER, sessionMerger));
}
- @Test
- public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger));
}
- @Test
- public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnAggregateIfMergerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null));
}
- @Test
- public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.reduce(null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(
null,
MockAggregator.TOSTRING_ADDER,
@@ -334,8 +345,10 @@
Materialized.as("store")));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
null,
@@ -343,8 +356,10 @@
Materialized.as("store")));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -353,8 +368,10 @@
}
@SuppressWarnings("unchecked")
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -362,29 +379,39 @@
(Materialized) null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.reduce(null, Materialized.as("store")));
}
- @Test
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
@SuppressWarnings("unchecked")
- public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.reduce(MockReducer.STRING_ADDER, (Materialized) null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.reduce(MockReducer.STRING_ADDER, (Named) null));
}
- @Test
- public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
assertThrows(NullPointerException.class, () -> stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null));
}
- @Test
- public void shouldNotEnableCachingWithEmitFinal() {
+ @ParameterizedTest
+ @EnumSource(EmitStrategy.StrategyType.class)
+ public void shouldNotEnableCachingWithEmitFinal(final EmitStrategy.StrategyType inputType) {
+ setup(inputType);
if (!emitFinal)
return;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 52ff858..4e4bc70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -20,8 +20,8 @@
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.LinkedList;
import java.util.List;
@@ -51,8 +51,8 @@
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
@SuppressWarnings("deprecation")
public class SlidingWindowedCogroupedKStreamImplTest {
@@ -69,7 +69,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @Before
+ @BeforeEach
public void setup() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed
.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
index 64df966..23a835c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
@@ -41,8 +41,8 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
@@ -52,7 +52,7 @@
import static java.time.Instant.ofEpochMilli;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class SlidingWindowedKStreamImplTest {
@@ -61,7 +61,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private TimeWindowedKStream<String, String> windowedStream;
- @Before
+ @BeforeEach
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
windowedStream = stream.
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 7b521ab..ea4bcc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -49,7 +49,7 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Comparator;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
index 7493cfd..578f3e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
@@ -32,7 +32,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -42,7 +42,7 @@
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings("deprecation")
public class SuppressTopologyTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index bdfdb16..f766366 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -17,15 +17,15 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Map;
import static java.time.Duration.ofMillis;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TimeWindowTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
index cd9ca19..2fbee12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
@@ -20,7 +20,7 @@
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
@@ -48,8 +48,8 @@
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
@SuppressWarnings("deprecation")
public class TimeWindowedCogroupedKStreamImplTest {
@@ -68,7 +68,7 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @Before
+ @BeforeEach
public void setup() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed
.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index a963e50..104cf15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -17,8 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
-import java.util.ArrayList;
-import java.util.Collection;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
@@ -45,25 +43,24 @@
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.Arguments;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Collections;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
+import java.util.stream.Stream;
import static java.time.Duration.ofMillis;
import static java.time.Instant.ofEpochMilli;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-@RunWith(Parameterized.class)
public class TimeWindowedKStreamImplTest {
private static final String TOPIC = "input";
private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1", new TimeWindow(0L, 500L));
@@ -75,27 +72,25 @@
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private TimeWindowedKStream<String, String> windowedStream;
- @Parameter
public StrategyType type;
- @Parameter(1)
public boolean withCache;
private EmitStrategy emitStrategy;
private boolean emitFinal;
- @Parameterized.Parameters(name = "{0}_cache:{1}")
- public static Collection<Object[]> data() {
- return asList(new Object[][] {
- {StrategyType.ON_WINDOW_UPDATE, true},
- {StrategyType.ON_WINDOW_UPDATE, false},
- {StrategyType.ON_WINDOW_CLOSE, true},
- {StrategyType.ON_WINDOW_CLOSE, false}
- });
+ public static Stream<Arguments> data() {
+ return Stream.of(
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, true),
+ Arguments.of(StrategyType.ON_WINDOW_UPDATE, false),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, true),
+ Arguments.of(StrategyType.ON_WINDOW_CLOSE, false)
+ );
}
- @Before
- public void before() {
+ public void setup(final StrategyType inputType, final boolean inputWithCache) {
+ type = inputType;
+ withCache = inputWithCache;
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
emitStrategy = StrategyType.forType(type);
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
@@ -103,8 +98,10 @@
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
}
- @Test
- public void shouldCountWindowed() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldCountWindowed(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.emitStrategy(emitStrategy)
@@ -141,8 +138,10 @@
}
}
- @Test
- public void shouldReduceWindowed() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldReduceWindowed(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.emitStrategy(emitStrategy)
@@ -179,8 +178,10 @@
}
}
- @Test
- public void shouldAggregateWindowed() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldAggregateWindowed(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.emitStrategy(emitStrategy)
@@ -221,8 +222,10 @@
}
}
- @Test
- public void shouldMaterializeCount() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldMaterializeCount(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
windowedStream
.emitStrategy(emitStrategy)
.count(
@@ -286,8 +289,10 @@
}
}
- @Test
- public void shouldMaterializeReduced() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldMaterializeReduced(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
windowedStream.reduce(
MockReducer.STRING_ADDER,
setMaterializedCache(Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
@@ -336,8 +341,10 @@
}
}
- @Test
- public void shouldMaterializeAggregated() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldMaterializeAggregated(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
windowedStream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -386,31 +393,41 @@
}
}
- @Test
- public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER));
}
- @Test
- public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(MockInitializer.STRING_INIT, null));
}
- @Test
- public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnReduceIfReducerIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.reduce(null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
null,
MockAggregator.TOSTRING_ADDER,
setMaterializedCache(Materialized.as("store"))));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
MockInitializer.STRING_INIT,
null,
@@ -418,38 +435,48 @@
}
@SuppressWarnings("unchecked")
- @Test
- public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
(Materialized) null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.reduce(
null,
setMaterializedCache(Materialized.as("store"))));
}
- @Test
+ @ParameterizedTest
+ @MethodSource("data")
@SuppressWarnings("unchecked")
- public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.reduce(
MockReducer.STRING_ADDER,
(Materialized) null));
}
- @Test
- public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.reduce(
MockReducer.STRING_ADDER,
(Named) null));
}
- @Test
- public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldThrowNullPointerOnCountIfMaterializedIsNull(final StrategyType inputType, final boolean inputWithCache) {
+ setup(inputType, inputWithCache);
assertThrows(NullPointerException.class, () -> windowedStream.count((Materialized<String, Long, WindowStore<Bytes, byte[]>>) null));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 530cfad..f580273 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -19,16 +19,19 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class TimestampedCacheFlushListenerTest {
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 24c3526..b13df64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -21,15 +21,18 @@
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class TimestampedTupleForwarderTest {
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
index 4d8b95f..8949b1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
@@ -24,10 +24,11 @@
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsSame.sameInstance;
@@ -37,17 +38,17 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class TransformerSupplierAdapterTest {
- @Mock
- private ProcessorContext context;
- @Mock
- private Transformer<String, String, KeyValue<Integer, Integer>> transformer;
- @Mock
- private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier;
- @Mock
- private Set<StoreBuilder<?>> stores;
+ private ProcessorContext context = mock(ProcessorContext.class);
+ @SuppressWarnings("unchecked")
+ private Transformer<String, String, KeyValue<Integer, Integer>> transformer = mock(Transformer.class);
+ @SuppressWarnings("unchecked")
+ private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier = mock(TransformerSupplier.class);
+ @SuppressWarnings("unchecked")
+ private Set<StoreBuilder<?>> stores = mock(Set.class);
final String key = "Hello";
final String value = "World";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
index f8e5731..2c78bb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnlimitedWindowTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index dc87c1e..8c186e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -23,14 +23,14 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class WindowedStreamPartitionerTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index eb2c57e..61f3127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -18,12 +18,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class CombinedKeySchemaTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index 6818fd4..58decf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -30,7 +30,7 @@
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.apache.kafka.test.MockInternalNewProcessorContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
@@ -41,8 +41,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class ResponseJoinProcessorSupplierTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
index 5cfe5ad..5e647e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
@@ -29,8 +29,8 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class SubscriptionJoinProcessorSupplierTest {
final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap(
@@ -46,7 +46,7 @@
@Test
public void shouldDetectVersionChange() {
// This test serves as a reminder to add new tests once we bump SubscriptionWrapper version.
- Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
+ assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
}
@Test
@@ -69,8 +69,8 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -103,8 +103,8 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -138,8 +138,8 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -173,8 +173,8 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -208,8 +208,8 @@
processor.process(record);
// propagate matched FK
List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -229,8 +229,8 @@
processor.process(record);
// propagate null if there is no match
forwarded = context.forwarded();
- Assert.assertEquals(2, forwarded.size());
- Assert.assertEquals(
+ assertEquals(2, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -262,8 +262,8 @@
);
processor.process(record);
List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(1, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -283,8 +283,8 @@
processor.process(record);
// propagate null if there is no match
forwarded = context.forwarded();
- Assert.assertEquals(2, forwarded.size());
- Assert.assertEquals(
+ assertEquals(2, forwarded.size());
+ assertEquals(
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
@@ -316,7 +316,7 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(0, forwarded.size());
+ assertEquals(0, forwarded.size());
}
@Test
@@ -337,7 +337,7 @@
);
processor.process(record);
final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
- Assert.assertEquals(0, forwarded.size());
+ assertEquals(0, forwarded.size());
}
private KTableValueGetterSupplier<String, String> valueGetterSupplier(final Map<String, ValueAndTimestamp<String>> map) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index dc04d56..e90ba15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -40,10 +40,12 @@
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class SubscriptionReceiveProcessorSupplierTest {
@@ -64,13 +66,13 @@
Serdes.String()
);
- @Before
+ @BeforeEach
public void before() {
stateDir = TestUtils.tempDirectory();
context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir);
}
- @After
+ @AfterEach
public void after() throws IOException {
if (stateStore != null) {
stateStore.close();
@@ -81,7 +83,7 @@
@Test
public void shouldDetectVersionChange() {
// This test serves as a reminder to add new tests once we bump SubscriptionWrapper version.
- Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
+ assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
}
@Test
@@ -127,9 +129,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertNull(stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertNull(stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -178,9 +180,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertNull(stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertNull(stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -230,9 +232,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertNull(stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertNull(stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -282,9 +284,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertNull(stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertNull(stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -334,9 +336,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertEquals(newValue, stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(newValue, stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -386,9 +388,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertEquals(newValue, stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(newValue, stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -438,9 +440,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertEquals(newValue, stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(newValue, stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
@@ -490,9 +492,9 @@
final List<CapturedForward<? extends CombinedKey<String, String>,
? extends Change<ValueAndTimestamp<SubscriptionWrapper<String>>>>> forwarded = context.forwarded();
- Assert.assertEquals(newValue, stateStore.get(key));
- Assert.assertEquals(1, forwarded.size());
- Assert.assertEquals(
+ assertEquals(newValue, stateStore.get(key));
+ assertEquals(1, forwarded.size());
+ assertEquals(
record.withKey(new CombinedKey<>(FK, PK1))
.withValue(new Change<>(newValue, oldValue)),
forwarded.get(0).record()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 167c1f9..886fd57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -22,15 +22,15 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.internals.Murmur3;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Map;
import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class SubscriptionResponseWrapperSerdeTest {
private static final class NonNullableSerde<T> implements Serde<T>, Serializer<T>, Deserializer<T> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index 2f36676..fc77593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -25,7 +25,7 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.apache.kafka.test.MockInternalNewProcessorContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric;
@@ -37,8 +37,8 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class SubscriptionSendProcessorSupplierTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 709a94b..42f3998 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -21,12 +21,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.internals.Murmur3;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SubscriptionWrapperSerdeTest {
@@ -246,16 +246,16 @@
assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper));
}
- @Test (expected = UnsupportedVersionException.class)
+ @Test
public void shouldThrowExceptionOnUnsupportedVersionTest() {
final String originalKey = "originalKey";
final long[] hashedValue = null;
final Integer primaryPartition = 10;
- new SubscriptionWrapper<>(
+ assertThrows(UnsupportedVersionException.class, () -> new SubscriptionWrapper<>(
hashedValue,
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
originalKey,
(byte) 0x80,
- primaryPartition);
+ primaryPartition));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 91e8b61..87262b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -26,12 +26,12 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.fail;
public class GraphGraceSearchUtilTest {
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 8edf3c4..5f03025 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -39,7 +39,7 @@
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.ArrayList;
@@ -51,7 +51,7 @@
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("deprecation")
public class StreamsGraphTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
index 6a0e326..2ddf399 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
@@ -19,9 +19,9 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TableProcessorNodeTest {
private static class TestProcessor implements Processor<String, String, String, String> {
@@ -44,11 +44,11 @@
final String asString = node.toString();
final String expected = "storeFactory=null";
assertTrue(
- String.format(
- "Expected toString to return string with \"%s\", received: %s",
- expected,
- asString),
- asString.contains(expected)
+ asString.contains(expected),
+ String.format(
+ "Expected toString to return string with \"%s\", received: %s",
+ expected,
+ asString)
);
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
index 892b90a..b741a90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
@@ -23,23 +23,24 @@
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class TableSourceNodeTest {
private static final String STORE_NAME = "store-name";
private static final String TOPIC = "input-topic";
- @Mock
- private InternalTopologyBuilder topologyBuilder;
+ private InternalTopologyBuilder topologyBuilder = mock(InternalTopologyBuilder.class);
@Test
public void shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 4ad3700..16eb91c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -35,9 +35,11 @@
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.time.Duration;
import java.util.Map;
@@ -51,7 +53,8 @@
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KTableSuppressProcessorMetricsTest {
private static final long ARBITRARY_LONG = 5L;
private static final TaskId TASK_ID = new TaskId(0, 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 991b5a8..6cb9011 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -40,9 +40,11 @@
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -61,10 +63,11 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KTableSuppressProcessorTest {
private static final long ARBITRARY_LONG = 5L;