KAFKA-16046: also fix stores for outer join (#15073)
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index b54ba53..3d11c45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -45,7 +45,7 @@
@Override
public StateStore build() {
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
- ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
+ ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 123d1ea..978bf39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -95,11 +95,12 @@
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
+ final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
// case 1: dslStoreSuppliers was explicitly passed in
- supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
+ supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams);
} else if (streamJoined.thisStoreSupplier() != null) {
// case 2: thisStoreSupplier was explicitly passed in, we match
// the type for that one
@@ -110,12 +111,12 @@
} else {
// couldn't determine the type of bytes store for thisStoreSupplier,
// fallback to the default
- supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+ supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
} else {
// case 3: nothing was explicitly passed in, fallback to default which
// was configured via either the TopologyConfig or StreamsConfig globally
- supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+ supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 71e7ac8..eedcafd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -36,7 +36,9 @@
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
- return Stores.persistentTimestampedKeyValueStore(params.name());
+ return params.isTimestamped()
+ ? Stores.persistentTimestampedKeyValueStore(params.name())
+ : Stores.persistentKeyValueStore(params.name());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 1077da4..7447d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -25,19 +25,26 @@
public class DslKeyValueParams {
private final String name;
+ private final boolean isTimestamped;
/**
- * @param name the name of the store (cannot be {@code null})
+ * @param name the name of the store (cannot be {@code null})
+ * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
*/
- public DslKeyValueParams(final String name) {
+ public DslKeyValueParams(final String name, final boolean isTimestamped) {
Objects.requireNonNull(name);
this.name = name;
+ this.isTimestamped = isTimestamped;
}
public String name() {
return name;
}
+ public boolean isTimestamped() {
+ return isTimestamped;
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -47,18 +54,20 @@
return false;
}
final DslKeyValueParams that = (DslKeyValueParams) o;
- return Objects.equals(name, that.name);
+ return isTimestamped == that.isTimestamped
+ && Objects.equals(name, that.name);
}
@Override
public int hashCode() {
- return Objects.hash(name);
+ return Objects.hash(name, isTimestamped);
}
@Override
public String toString() {
return "DslKeyValueParams{" +
"name='" + name + '\'' +
+ "isTimestamped=" + isTimestamped +
'}';
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
index 672afc6..9a7f0fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
@@ -108,7 +108,8 @@
&& Objects.equals(retentionPeriod, that.retentionPeriod)
&& Objects.equals(windowSize, that.windowSize)
&& Objects.equals(emitStrategy, that.emitStrategy)
- && Objects.equals(isSlidingWindow, that.isSlidingWindow);
+ && Objects.equals(isSlidingWindow, that.isSlidingWindow)
+ && Objects.equals(isTimestamped, that.isTimestamped);
}
@Override
@@ -119,7 +120,8 @@
windowSize,
retainDuplicates,
emitStrategy,
- isSlidingWindow
+ isSlidingWindow,
+ isTimestamped
);
}
@@ -132,6 +134,7 @@
", retainDuplicates=" + retainDuplicates +
", emitStrategy=" + emitStrategy +
", isSlidingWindow=" + isSlidingWindow +
+ ", isTimestamped=" + isTimestamped +
'}';
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e059da4..72422f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,7 @@
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -1299,7 +1299,7 @@
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
- RocksDBTimestampedStore.class);
+ RocksDBStore.class);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 8133e25..099dc5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -33,8 +33,12 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -50,9 +54,11 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKStreamOuterJoinTest {
@@ -1308,4 +1314,47 @@
new KeyValueTimestamp<>(0, "dummy+null", 1103L)
);
}
+
+ public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
+
+ final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>();
+
+ @Override
+ public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
+ final KeyValueBytesStoreSupplier result = super.keyValueStore(params);
+ capture.set(result);
+ return result;
+ }
+ }
+
+ @Test
+ public void shouldJoinWithNonTimestampedStore() {
+ final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
+ final StreamJoined<Integer, String, String> streamJoined =
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ .withDslStoreSuppliers(suppliers);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ streamJoined
+ );
+ joined.process(supplier);
+
+ // create a TTD so that the topology gets built
+ try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
+ assertThat("Expected stream joined to supply builders that create non-timestamped stores",
+ !WrappedStateStore.isTimestamped(suppliers.capture.get().get()));
+ }
+ }
}