KAFKA-16046: also fix stores for outer join (#15073)

This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index b54ba53..3d11c45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -45,7 +45,7 @@
     @Override
     public StateStore build() {
         final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
-                ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
+                ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
                 : (KeyValueBytesStoreSupplier) materialized.storeSupplier();
 
         final StoreBuilder<?> builder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 123d1ea..978bf39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -95,11 +95,12 @@
         final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
         final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
 
+        final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
         final KeyValueBytesStoreSupplier supplier;
 
         if (passedInDslStoreSuppliers != null) {
             // case 1: dslStoreSuppliers was explicitly passed in
-            supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
+            supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams);
         } else if (streamJoined.thisStoreSupplier() != null) {
             // case 2: thisStoreSupplier was explicitly passed in, we match
             // the type for that one
@@ -110,12 +111,12 @@
             } else {
                 // couldn't determine the type of bytes store for thisStoreSupplier,
                 // fallback to the default
-                supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+                supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
             }
         } else {
             // case 3: nothing was explicitly passed in, fallback to default which
             // was configured via either the TopologyConfig or StreamsConfig globally
-            supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+            supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
         }
 
         final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 71e7ac8..eedcafd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -36,7 +36,9 @@
 
         @Override
         public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
-            return Stores.persistentTimestampedKeyValueStore(params.name());
+            return params.isTimestamped()
+                    ? Stores.persistentTimestampedKeyValueStore(params.name())
+                    : Stores.persistentKeyValueStore(params.name());
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 1077da4..7447d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -25,19 +25,26 @@
 public class DslKeyValueParams {
 
     private final String name;
+    private final boolean isTimestamped;
 
     /**
-     * @param name the name of the store (cannot be {@code null})
+     * @param name          the name of the store (cannot be {@code null})
+     * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
      */
-    public DslKeyValueParams(final String name) {
+    public DslKeyValueParams(final String name, final boolean isTimestamped) {
         Objects.requireNonNull(name);
         this.name = name;
+        this.isTimestamped = isTimestamped;
     }
 
     public String name() {
         return name;
     }
 
+    public boolean isTimestamped() {
+        return isTimestamped;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -47,18 +54,20 @@
             return false;
         }
         final DslKeyValueParams that = (DslKeyValueParams) o;
-        return Objects.equals(name, that.name);
+        return isTimestamped == that.isTimestamped
+                && Objects.equals(name, that.name);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name);
+        return Objects.hash(name, isTimestamped);
     }
 
     @Override
     public String toString() {
         return "DslKeyValueParams{" +
                 "name='" + name + '\'' +
+                "isTimestamped=" + isTimestamped +
                 '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
index 672afc6..9a7f0fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
@@ -108,7 +108,8 @@
                 && Objects.equals(retentionPeriod, that.retentionPeriod)
                 && Objects.equals(windowSize, that.windowSize)
                 && Objects.equals(emitStrategy, that.emitStrategy)
-                && Objects.equals(isSlidingWindow, that.isSlidingWindow);
+                && Objects.equals(isSlidingWindow, that.isSlidingWindow)
+                && Objects.equals(isTimestamped, that.isTimestamped);
     }
 
     @Override
@@ -119,7 +120,8 @@
                 windowSize,
                 retainDuplicates,
                 emitStrategy,
-                isSlidingWindow
+                isSlidingWindow,
+                isTimestamped
         );
     }
 
@@ -132,6 +134,7 @@
                 ", retainDuplicates=" + retainDuplicates +
                 ", emitStrategy=" + emitStrategy +
                 ", isSlidingWindow=" + isSlidingWindow +
+                ", isTimestamped=" + isTimestamped +
                 '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e059da4..72422f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,7 @@
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemorySessionStore;
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -1299,7 +1299,7 @@
         assertTypesForStateStore(topology.stateStores(),
                 InMemoryWindowStore.class,
                 RocksDBWindowStore.class,
-                RocksDBTimestampedStore.class);
+                RocksDBStore.class);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 8133e25..099dc5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -33,8 +33,12 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -50,9 +54,11 @@
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamOuterJoinTest {
@@ -1308,4 +1314,47 @@
             new KeyValueTimestamp<>(0, "dummy+null", 1103L)
         );
     }
+
+    public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
+
+        final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>();
+
+        @Override
+        public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
+            final KeyValueBytesStoreSupplier result = super.keyValueStore(params);
+            capture.set(result);
+            return result;
+        }
+    }
+
+    @Test
+    public void shouldJoinWithNonTimestampedStore() {
+        final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
+        final StreamJoined<Integer, String, String> streamJoined =
+                StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                        .withDslStoreSuppliers(suppliers);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+                streamJoined
+        );
+        joined.process(supplier);
+
+        // create a TTD so that the topology gets built
+        try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
+            assertThat("Expected stream joined to supply builders that create non-timestamped stores",
+                    !WrappedStateStore.isTimestamped(suppliers.capture.get().get()));
+        }
+    }
 }