blob: 97a7aac411baba371f912e394cf00705597b9c1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.HashSet;
import java.util.Set;
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
private static final String FILTER_NAME = "KSTREAM-FILTER-";
private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
private static final String MAP_NAME = "KSTREAM-MAP-";
private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
public static final String MERGE_NAME = "KSTREAM-MERGE-";
public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
}
@Override
public KStream<K, V> filter(Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
String name = topology.newName(MAP_NAME);
topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
return new KStreamImpl<>(topology, name, null);
}
@Override
public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
String name = topology.newName(MAPVALUES_NAME);
topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
public void print() {
print(null, null);
}
@Override
public void print(Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
}
@Override
public void writeAsText(String filePath) {
writeAsText(filePath, null, null);
}
@Override
public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
try {
PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
} catch (FileNotFoundException e) {
String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
throw new TopologyBuilderException(message);
}
}
@Override
public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
String name = topology.newName(FLATMAP_NAME);
topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
return new KStreamImpl<>(topology, name, null);
}
@Override
public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
String name = topology.newName(FLATMAPVALUES_NAME);
topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
String branchName = topology.newName(BRANCH_NAME);
topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
for (int i = 0; i < predicates.length; i++) {
String childName = topology.newName(BRANCHCHILD_NAME);
topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes);
}
return branchChildren;
}
public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K, V>[] streams) {
String name = topology.newName(MERGE_NAME);
String[] parentNames = new String[streams.length];
Set<String> allSourceNodes = new HashSet<>();
for (int i = 0; i < streams.length; i++) {
KStreamImpl stream = (KStreamImpl) streams[i];
parentNames[i] = stream.name;
if (allSourceNodes != null) {
if (stream.sourceNodes != null)
allSourceNodes.addAll(stream.sourceNodes);
else
allSourceNodes = null;
}
}
topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
return new KStreamImpl<>(topology, name, allSourceNodes);
}
@Override
public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
to(keySerde, valSerde, partitioner, topic);
return topology.stream(keySerde, valSerde, topic);
}
@Override
public void foreach(ForeachAction<K, V> action) {
String name = topology.newName(FOREACH_NAME);
topology.addProcessor(name, new KStreamForeach<>(action), this.name);
}
@Override
public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
@Override
public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
return through(null, null, partitioner, topic);
}
@Override
public KStream<K, V> through(String topic) {
return through(null, null, null, topic);
}
@Override
public void to(String topic) {
to(null, null, null, topic);
}
@Override
public void to(StreamPartitioner<K, V> partitioner, String topic) {
to(null, null, partitioner, topic);
}
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
to(keySerde, valSerde, null, topic);
}
@SuppressWarnings("unchecked")
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
String name = topology.newName(SINK_NAME);
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}
@Override
public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
String name = topology.newName(TRANSFORM_NAME);
topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, null);
}
@Override
public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
String name = topology.newName(TRANSFORMVALUES_NAME);
topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
String name = topology.newName(PROCESSOR_NAME);
topology.addProcessor(name, processorSupplier, this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
}
@Override
public <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde) {
return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
}
@Override
public <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows) {
return join(other, joiner, windows, null, null, null, false);
}
@Override
public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde) {
return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
}
@Override
public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows) {
return join(other, joiner, windows, null, null, null, true);
}
@SuppressWarnings("unchecked")
private <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde,
boolean outer) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
.withKeys(keySerde)
.withValues(thisValueSerde)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
.withKeys(keySerde)
.withValues(otherValueSerde)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V1> otherValueSerde) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
.withKeys(keySerde)
.withValues(otherValueSerde)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = topology.newName(LEFTJOIN_NAME);
topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
}
@Override
public <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows) {
return leftJoin(other, joiner, windows, null, null);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String name = topology.newName(LEFTJOIN_NAME);
topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes);
}
@Override
public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
Windows<W> windows,
Serde<K> keySerde,
Serde<V> aggValueSerde) {
String reduceName = topology.newName(REDUCE_NAME);
KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
StateStoreSupplier reduceStore = Stores.create(windows.name())
.withKeys(keySerde)
.withValues(aggValueSerde)
.persistent()
.windowed(windows.maintainMs(), windows.segments, false)
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(reduceName, reduceSupplier, this.name);
topology.addStateStore(reduceStore, reduceName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
}
@Override
public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
Windows<W> windows) {
return reduceByKey(reducer, windows, null, null);
}
@Override
public KTable<K, V> reduceByKey(Reducer<V> reducer,
Serde<K> keySerde,
Serde<V> aggValueSerde,
String name) {
String reduceName = topology.newName(REDUCE_NAME);
KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
StateStoreSupplier reduceStore = Stores.create(name)
.withKeys(keySerde)
.withValues(aggValueSerde)
.persistent()
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(reduceName, reduceSupplier, this.name);
topology.addStateStore(reduceStore, reduceName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
}
@Override
public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
return reduceByKey(reducer, null, null, name);
}
@Override
public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serde<K> keySerde,
Serde<T> aggValueSerde) {
String aggregateName = topology.newName(AGGREGATE_NAME);
KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
StateStoreSupplier aggregateStore = Stores.create(windows.name())
.withKeys(keySerde)
.withValues(aggValueSerde)
.persistent()
.windowed(windows.maintainMs(), windows.segments, false)
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(aggregateName, aggregateSupplier, this.name);
topology.addStateStore(aggregateStore, aggregateName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
@Override
public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
Windows<W> windows) {
return aggregateByKey(initializer, aggregator, windows, null, null);
}
@Override
public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
Serde<K> keySerde,
Serde<T> aggValueSerde,
String name) {
String aggregateName = topology.newName(AGGREGATE_NAME);
KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerde)
.withValues(aggValueSerde)
.persistent()
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(aggregateName, aggregateSupplier, this.name);
topology.addStateStore(aggregateStore, aggregateName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
@Override
public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
String name) {
return aggregateByKey(initializer, aggregator, null, null, name);
}
@Override
public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
Serde<K> keySerde) {
return this.aggregateByKey(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
},
new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
}, windows, keySerde, Serdes.Long());
}
@Override
public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) {
return countByKey(windows, null);
}
@Override
public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
return this.aggregateByKey(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
},
new Aggregator<K, V, Long>() {
@Override
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1L;
}
}, keySerde, Serdes.Long(), name);
}
@Override
public KTable<K, Long> countByKey(String name) {
return countByKey(null, name);
}
}