blob: 51d4cb46f64ed75bf7dba83792ecb2fcf21418b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Set;
/**
* The implementation class of {@link KTable}.
* @param <K> the key type
* @param <S> the source's (parent's) value type
* @param <V> the value type
*/
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
private static final String FILTER_NAME = "KTABLE-FILTER-";
private static final String FOREACH_NAME = "KTABLE-FOREACH-";
public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
public static final String MERGE_NAME = "KTABLE-MERGE-";
public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
private static final String SELECT_NAME = "KTABLE-SELECT-";
public static final String SOURCE_NAME = "KTABLE-SOURCE-";
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
public final ProcessorSupplier<?, ?> processorSupplier;
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private boolean sendOldValues = false;
public KTableImpl(KStreamBuilder topology,
String name,
ProcessorSupplier<?, ?> processorSupplier,
Set<String> sourceNodes) {
this(topology, name, processorSupplier, sourceNodes, null, null);
}
public KTableImpl(KStreamBuilder topology,
String name,
ProcessorSupplier<?, ?> processorSupplier,
Set<String> sourceNodes,
Serde<K> keySerde,
Serde<V> valSerde) {
super(topology, name, sourceNodes);
this.processorSupplier = processorSupplier;
this.keySerde = keySerde;
this.valSerde = valSerde;
}
@Override
public KTable<K, V> filter(Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
topology.addProcessor(name, processorSupplier, this.name);
return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
topology.addProcessor(name, processorSupplier, this.name);
return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
String name = topology.newName(MAPVALUES_NAME);
KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
topology.addProcessor(name, processorSupplier, this.name);
return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
public void print() {
print(null, null);
}
@Override
public void print(Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
}
@Override
public void writeAsText(String filePath) {
writeAsText(filePath, null, null);
}
/**
* @throws TopologyBuilderException if file is not found
*/
@Override
public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
try {
PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
} catch (FileNotFoundException e) {
String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
throw new TopologyBuilderException(message);
}
}
@Override
public KTable<K, V> through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K, V> partitioner,
String topic) {
to(keySerde, valSerde, partitioner, topic);
return topology.table(keySerde, valSerde, topic);
}
@Override
public void foreach(final ForeachAction<K, V> action) {
String name = topology.newName(FOREACH_NAME);
KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
@Override
public void apply(K key, Change<V> value) {
action.apply(key, value.newValue);
}
});
topology.addProcessor(name, processorSupplier, this.name);
}
@Override
public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
@Override
public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
return through(null, null, partitioner, topic);
}
@Override
public KTable<K, V> through(String topic) {
return through(null, null, null, topic);
}
@Override
public void to(String topic) {
to(null, null, null, topic);
}
@Override
public void to(StreamPartitioner<K, V> partitioner, String topic) {
to(null, null, partitioner, topic);
}
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
this.toStream().to(keySerde, valSerde, null, topic);
}
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
this.toStream().to(keySerde, valSerde, partitioner, topic);
}
@Override
public KStream<K, V> toStream() {
String name = topology.newName(TOSTREAM_NAME);
topology.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
@Override
public V apply(Change<V> change) {
return change.newValue;
}
}), this.name);
return new KStreamImpl<>(topology, name, sourceNodes);
}
@Override
public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) {
return toStream().selectKey(mapper);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(JOINTHIS_NAME);
String joinOtherName = topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(OUTERTHIS_NAME);
String joinOtherName = topology.newName(OUTEROTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(LEFTTHIS_NAME);
String joinOtherName = topology.newName(LEFTOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes)
);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serde<K1> keySerde,
Serde<V1> valueSerde) {
String selectName = topology.newName(SELECT_NAME);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
// select the aggregate key and values (old and new), it would require parent to send old values
topology.addProcessor(selectName, selectSupplier, this.name);
this.enableSendingOldValues();
return new KGroupedTableImpl<>(topology, selectName, this.name, keySerde, valueSerde);
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) {
return this.groupBy(selector, null, null);
}
@SuppressWarnings("unchecked")
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
return new KTableSourceValueGetterSupplier<>(source.topic);
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
}
@SuppressWarnings("unchecked")
void enableSendingOldValues() {
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
materialize(source);
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
} else {
((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
}
sendOldValues = true;
}
}
boolean sendingOldValueEnabled() {
return sendOldValues;
}
private void materialize(KTableSource<K, ?> source) {
synchronized (source) {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
new KTableStoreSupplier<>(source.topic, keySerde, valSerde, null);
// mark this state as non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}
}
}