KAFKA-19668: processValue() must be declared as value-changing operation (#20470)
With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.
However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.
The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy
<lbrutschy@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index e56a4cb..2dbf48d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -75,7 +75,7 @@
public StreamsBuilder() {
topology = new Topology();
internalTopologyBuilder = topology.internalTopologyBuilder;
- internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+ internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false);
}
/**
@@ -87,7 +87,14 @@
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = newTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
- internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+ internalStreamsBuilder = new InternalStreamsBuilder(
+ internalTopologyBuilder,
+ TopologyConfig.InternalConfig.getBoolean(
+ topologyConfigs.originals(),
+ TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
+ false
+ )
+ );
}
protected Topology newTopology(final TopologyConfig topologyConfigs) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index da8c246..fd76f07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
@@ -86,6 +87,28 @@
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {
+
+ public static class InternalConfig {
+ // Cf https://issues.apache.org/jira/browse/KAFKA-19668
+ public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__";
+
+ public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
+ final Object value = configs.getOrDefault(key, defaultValue);
+ if (value instanceof Boolean) {
+ return (boolean) value;
+ } else if (value instanceof String) {
+ return Boolean.parseBoolean((String) value);
+ } else {
+ log.warn(
+ "Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
+ value,
+ key
+ );
+ return defaultValue;
+ }
+ }
+ }
+
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 968276b..6460313 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,7 @@
private static final String TABLE_SOURCE_SUFFIX = "-source";
final InternalTopologyBuilder internalTopologyBuilder;
+ private final boolean processProcessValueFixEnabled;
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
@@ -91,8 +92,10 @@
}
};
- public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
+ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
+ final boolean processProcessValueFixEnabled) {
this.internalTopologyBuilder = internalTopologyBuilder;
+ this.processProcessValueFixEnabled = processProcessValueFixEnabled;
}
public <K, V> KStream<K, V> stream(final Collection<String> topics,
@@ -709,4 +712,7 @@
return internalTopologyBuilder;
}
+ public boolean processProcessValueFixEnabled() {
+ return processProcessValueFixEnabled;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1927aed..02342a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1306,7 +1306,12 @@
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
- stateStoreNames);
+ stateStoreNames
+ );
+ if (builder.processProcessValueFixEnabled()) {
+ processNode.setKeyChangingOperation(true);
+ processNode.setValueChangingOperation(true);
+ }
builder.addGraphNode(graphNode, processNode);
@@ -1350,7 +1355,11 @@
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
- stateStoreNames);
+ stateStoreNames
+ );
+ if (builder.processProcessValueFixEnabled()) {
+ processNode.setValueChangingOperation(true);
+ }
builder.addGraphNode(graphNode, processNode);
// cannot inherit value serde
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 9ac3ff7..0a13a41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -81,7 +81,7 @@
private static final String APP_ID = "app-id";
- private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+ private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
private final String storePrefix = "prefix-";
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
@@ -93,7 +93,7 @@
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
- final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+ final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
@@ -106,7 +106,7 @@
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
- final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+ final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 24600c5..a3eaadd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -95,7 +95,7 @@
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));
- final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
+ final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -113,7 +113,7 @@
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));
- final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
+ final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -129,7 +129,7 @@
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));
- final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
+ final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e046ba1..1675619 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
@@ -36,6 +37,8 @@
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -127,7 +130,7 @@
initializer = () -> "";
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
final ProcessorSupplier<String, String, String, String> processorSupplier =
- () -> new Processor<String, String, String, String>() {
+ () -> new Processor<>() {
private ProcessorContext<String, String> context;
@Override
@@ -185,14 +188,163 @@
}
@Test
- public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
+ public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixDisabled() {
+ final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, false);
+ final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, false);
- final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
- final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);
+ System.out.println(attemptedOptimize.describe().toString());
+ System.out.println(noOptimization.describe().toString());
+ assertEquals("Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+ " --> KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+ " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000014\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000006\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000022\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000005\n" +
+ " <-- KSTREAM-MAPVALUES-0000000002\n" +
+ " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000013\n" +
+ " <-- KSTREAM-FLATMAPVALUES-0000000010\n" +
+ " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000021\n" +
+ " <-- KSTREAM-PROCESSVALUES-0000000018\n" +
+ " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000006\n" +
+ " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000014\n" +
+ " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000022\n" +
+ "\n" +
+ " Sub-topology: 1\n" +
+ " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000004\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" +
+ " --> KTABLE-TOSTREAM-0000000008\n" +
+ " <-- KSTREAM-SOURCE-0000000007\n" +
+ " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000009\n" +
+ " <-- KSTREAM-AGGREGATE-0000000004\n" +
+ " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000008\n" +
+ "\n" +
+ " Sub-topology: 2\n" +
+ " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000012\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" +
+ " --> KTABLE-TOSTREAM-0000000016\n" +
+ " <-- KSTREAM-SOURCE-0000000015\n" +
+ " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000017\n" +
+ " <-- KSTREAM-AGGREGATE-0000000012\n" +
+ " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000016\n" +
+ "\n" +
+ " Sub-topology: 3\n" +
+ " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000020\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" +
+ " --> KTABLE-TOSTREAM-0000000024\n" +
+ " <-- KSTREAM-SOURCE-0000000023\n" +
+ " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000025\n" +
+ " <-- KSTREAM-AGGREGATE-0000000020\n" +
+ " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000024\n" +
+ "\n",
+ noOptimization.describe().toString()
+ );
+ assertEquals("Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+ " --> KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+ " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000014\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000006\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000022\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000005\n" +
+ " <-- KSTREAM-MAPVALUES-0000000002\n" +
+ " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000013\n" +
+ " <-- KSTREAM-FLATMAPVALUES-0000000010\n" +
+ " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000021\n" +
+ " <-- KSTREAM-PROCESSVALUES-0000000018\n" +
+ " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000006\n" +
+ " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000014\n" +
+ " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000022\n" +
+ "\n" +
+ " Sub-topology: 1\n" +
+ " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000004\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" +
+ " --> KTABLE-TOSTREAM-0000000008\n" +
+ " <-- KSTREAM-SOURCE-0000000007\n" +
+ " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000009\n" +
+ " <-- KSTREAM-AGGREGATE-0000000004\n" +
+ " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000008\n" +
+ "\n" +
+ " Sub-topology: 2\n" +
+ " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000012\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" +
+ " --> KTABLE-TOSTREAM-0000000016\n" +
+ " <-- KSTREAM-SOURCE-0000000015\n" +
+ " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000017\n" +
+ " <-- KSTREAM-AGGREGATE-0000000012\n" +
+ " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000016\n" +
+ "\n" +
+ " Sub-topology: 3\n" +
+ " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" +
+ " --> KSTREAM-AGGREGATE-0000000020\n" +
+ " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" +
+ " --> KTABLE-TOSTREAM-0000000024\n" +
+ " <-- KSTREAM-SOURCE-0000000023\n" +
+ " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000025\n" +
+ " <-- KSTREAM-AGGREGATE-0000000020\n" +
+ " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000024\n\n",
+ noOptimization.describe().toString()
+ );
+ assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+ assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+ }
+
+ @Test
+ public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() {
+ final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true);
+ final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true);
assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
- assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
- assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+ assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+ assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}
@Test
@@ -227,20 +379,30 @@
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}
- private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {
-
- final StreamsBuilder builder = new StreamsBuilder();
+ private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig,
+ final boolean enableFix) {
final Properties properties = new Properties();
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
+ properties.put(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, enableFix);
+
+ final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
final KStream<String, String> inputStream = builder.stream("input");
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
+ mappedKeyStream.processValues(
+ () -> new ContextualFixedKeyProcessor<>() {
+ @Override
+ public void process(final FixedKeyRecord<String, String> record) {
+ context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault())));
+ }
+ }).groupByKey().count().toStream().to("output");
return builder.build(properties);
-
}
private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index afb89fa..57753df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -30,6 +30,7 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
@@ -42,6 +43,8 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
@@ -220,6 +223,38 @@
assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues)));
}
+ @Test
+ public void shouldNotPushRepartitionAcrossValueChangingOperation() {
+ streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ streamsConfiguration.setProperty(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, "true");
+
+ final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(streamsConfiguration)));
+
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream"))
+ .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v))
+ .processValues(() -> new ContextualFixedKeyProcessor<String, String, Integer>() {
+ @Override
+ public void process(final FixedKeyRecord<String, String> record) {
+ context().forward(record.withValue(record.value().length()));
+ }
+ })
+ .groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde()))
+ .reduce(Integer::sum)
+ .toStream()
+ .to(AGGREGATION_TOPIC);
+
+ final Topology topology = builder.build(streamsConfiguration);
+
+ topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
+
+ final TestInputTopic<String, String> inputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer);
+ final TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer());
+
+ inputTopic.pipeKeyValueList(getKeyValues());
+
+ assertThat(outputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues)));
+ }
+
private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) {
final Map<K, V> map = new HashMap<>();
for (final KeyValue<K, V> pair : keyValuePairs) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 213ff8a..8fcc449 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -206,7 +206,7 @@
private final ChangelogReader changelogReader = new MockChangelogReader();
private StateDirectory stateDirectory = null;
private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
- private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+ private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false);
private StreamThread thread = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 89989a1..fc04015 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -2622,7 +2622,7 @@
builder = new CorruptedInternalTopologyBuilder();
topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps(parameterizedConfig)));
- final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder);
+ final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder, false);
final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>(Consumed.with(null, null)));
final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, null)), new MaterializedInternal<>(Materialized.as("store")));