| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.kstream.internals; |
| |
| import org.apache.kafka.common.serialization.Serde; |
| import org.apache.kafka.common.utils.Bytes; |
| import org.apache.kafka.streams.Consumed; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.errors.TopologyException; |
| import org.apache.kafka.streams.kstream.ForeachAction; |
| import org.apache.kafka.streams.kstream.KGroupedTable; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.Materialized; |
| import org.apache.kafka.streams.kstream.Predicate; |
| import org.apache.kafka.streams.kstream.Serialized; |
| import org.apache.kafka.streams.kstream.ValueJoiner; |
| import org.apache.kafka.streams.kstream.ValueMapper; |
| import org.apache.kafka.streams.kstream.ValueMapperWithKey; |
| import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; |
| import org.apache.kafka.streams.processor.ProcessorSupplier; |
| import org.apache.kafka.streams.processor.StreamPartitioner; |
| import org.apache.kafka.streams.state.KeyValueStore; |
| import org.apache.kafka.streams.state.StoreBuilder; |
| |
| import java.io.FileNotFoundException; |
| import java.io.PrintWriter; |
| import java.io.UnsupportedEncodingException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Objects; |
| import java.util.Set; |
| |
| /** |
| * The implementation class of {@link KTable}. |
| * |
| * @param <K> the key type |
| * @param <S> the source's (parent's) value type |
| * @param <V> the value type |
| */ |
| public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { |
| |
| // TODO: these two fields can be package-private after KStreamBuilder is removed |
| public static final String SOURCE_NAME = "KTABLE-SOURCE-"; |
| |
| public static final String STATE_STORE_NAME = "STATE-STORE-"; |
| |
| private static final String FILTER_NAME = "KTABLE-FILTER-"; |
| |
| private static final String FOREACH_NAME = "KTABLE-FOREACH-"; |
| |
| private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-"; |
| |
| private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-"; |
| |
| private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-"; |
| |
| private static final String MERGE_NAME = "KTABLE-MERGE-"; |
| |
| private static final String PRINTING_NAME = "KSTREAM-PRINTER-"; |
| |
| private static final String SELECT_NAME = "KTABLE-SELECT-"; |
| |
| private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; |
| |
| private final ProcessorSupplier<?, ?> processorSupplier; |
| |
| private final KeyValueMapper<K, V, String> defaultKeyValueMapper; |
| |
| private final String queryableStoreName; |
| private final boolean isQueryable; |
| |
| private boolean sendOldValues = false; |
| private final Serde<K> keySerde; |
| private final Serde<V> valSerde; |
| |
| public KTableImpl(final InternalStreamsBuilder builder, |
| final String name, |
| final ProcessorSupplier<?, ?> processorSupplier, |
| final Set<String> sourceNodes, |
| final String queryableStoreName, |
| final boolean isQueryable) { |
| super(builder, name, sourceNodes); |
| this.processorSupplier = processorSupplier; |
| this.queryableStoreName = queryableStoreName; |
| this.keySerde = null; |
| this.valSerde = null; |
| this.isQueryable = isQueryable; |
| this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() { |
| @Override |
| public String apply(K key, V value) { |
| return String.format("%s, %s", key, value); |
| } |
| }; |
| } |
| |
| public KTableImpl(final InternalStreamsBuilder builder, |
| final String name, |
| final ProcessorSupplier<?, ?> processorSupplier, |
| final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final Set<String> sourceNodes, |
| final String queryableStoreName, |
| final boolean isQueryable) { |
| super(builder, name, sourceNodes); |
| this.processorSupplier = processorSupplier; |
| this.queryableStoreName = queryableStoreName; |
| this.keySerde = keySerde; |
| this.valSerde = valSerde; |
| this.isQueryable = isQueryable; |
| this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() { |
| @Override |
| public String apply(K key, V value) { |
| return String.format("%s, %s", key, value); |
| } |
| }; |
| } |
| |
| @Override |
| public String queryableStoreName() { |
| if (!isQueryable) { |
| return null; |
| } |
| return this.queryableStoreName; |
| } |
| |
| @SuppressWarnings("deprecation") |
| private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, |
| final boolean isFilterNot) { |
| Objects.requireNonNull(predicate, "predicate can't be null"); |
| String name = builder.newProcessorName(FILTER_NAME); |
| String internalStoreName = null; |
| if (storeSupplier != null) { |
| internalStoreName = storeSupplier.name(); |
| } |
| KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName); |
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); |
| if (storeSupplier != null) { |
| builder.internalTopologyBuilder.addStateStore(storeSupplier, name); |
| return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, true); |
| } else { |
| return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false); |
| } |
| } |
| |
| private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, |
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized, |
| final boolean filterNot) { |
| String name = builder.newProcessorName(FILTER_NAME); |
| |
| KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, |
| predicate, |
| filterNot, |
| materialized.storeName()); |
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); |
| |
| final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize(); |
| this.builder.internalTopologyBuilder.addStateStore(builder, name); |
| |
| return new KTableImpl<>(this.builder, |
| name, |
| processorSupplier, |
| this.keySerde, |
| this.valSerde, |
| sourceNodes, |
| builder.name(), |
| true); |
| } |
| |
| @Override |
| public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) { |
| return filter(predicate, (String) null); |
| } |
| |
| @Override |
| public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, |
| final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { |
| Objects.requireNonNull(predicate, "predicate can't be null"); |
| Objects.requireNonNull(materialized, "materialized can't be null"); |
| return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, |
| final String queryableStoreName) { |
| org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null; |
| if (queryableStoreName != null) { |
| storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName); |
| } |
| return doFilter(predicate, storeSupplier, false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doFilter(predicate, storeSupplier, false); |
| } |
| |
| @Override |
| public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) { |
| return filterNot(predicate, (String) null); |
| } |
| |
| @Override |
| public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, |
| final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { |
| Objects.requireNonNull(predicate, "predicate can't be null"); |
| Objects.requireNonNull(materialized, "materialized can't be null"); |
| return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, |
| final String queryableStoreName) { |
| org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null; |
| if (queryableStoreName != null) { |
| storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName); |
| } |
| return doFilter(predicate, storeSupplier, true); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doFilter(predicate, storeSupplier, true); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private <V1> KTable<K, V1> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper, |
| final Serde<V1> valueSerde, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(mapper); |
| String name = builder.newProcessorName(MAPVALUES_NAME); |
| String internalStoreName = null; |
| if (storeSupplier != null) { |
| internalStoreName = storeSupplier.name(); |
| } |
| KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName); |
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); |
| if (storeSupplier != null) { |
| builder.internalTopologyBuilder.addStateStore(storeSupplier, name); |
| return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true); |
| } else { |
| return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false); |
| } |
| } |
| |
| @Override |
| public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) { |
| return doMapValues(withKey(mapper), null, null); |
| } |
| |
| @Override |
| public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) { |
| return doMapValues(mapper, null, null); |
| |
| } |
| |
| @Override |
| public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, |
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { |
| return mapValues(withKey(mapper), materialized); |
| } |
| |
| @Override |
| public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, |
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { |
| Objects.requireNonNull(mapper, "mapper can't be null"); |
| Objects.requireNonNull(materialized, "materialized can't be null"); |
| final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal |
| = new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME); |
| final String name = builder.newProcessorName(MAPVALUES_NAME); |
| final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>( |
| this, |
| mapper, |
| materializedInternal.storeName()); |
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); |
| builder.internalTopologyBuilder.addStateStore( |
| new KeyValueStoreMaterializer<>(materializedInternal).materialize(), |
| name); |
| return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper, |
| final Serde<V1> valueSerde, |
| final String queryableStoreName) { |
| return mapValues(withKey(mapper), Materialized.<K, V1, KeyValueStore<Bytes, byte[]>>as(queryableStoreName). |
| withValueSerde(valueSerde).withKeySerde(this.keySerde)); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper, |
| final Serde<V1> valueSerde, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doMapValues(withKey(mapper), valueSerde, storeSupplier); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void print() { |
| print(null, null, this.name); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void print(final String label) { |
| print(null, null, label); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void print(final Serde<K> keySerde, |
| final Serde<V> valSerde) { |
| print(keySerde, valSerde, this.name); |
| } |
| |
| @SuppressWarnings({"unchecked", "deprecation"}) |
| @Override |
| public void print(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final String label) { |
| Objects.requireNonNull(label, "label can't be null"); |
| final String name = builder.newProcessorName(PRINTING_NAME); |
| builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void writeAsText(final String filePath) { |
| writeAsText(filePath, this.name, null, null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void writeAsText(final String filePath, |
| final String label) { |
| writeAsText(filePath, label, null, null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void writeAsText(final String filePath, |
| final Serde<K> keySerde, |
| final Serde<V> valSerde) { |
| writeAsText(filePath, this.name, keySerde, valSerde); |
| } |
| |
| /** |
| * @throws TopologyException if file is not found |
| */ |
| @SuppressWarnings({"unchecked", "deprecation"}) |
| @Override |
| public void writeAsText(final String filePath, |
| final String label, |
| final Serde<K> keySerde, |
| final Serde<V> valSerde) { |
| Objects.requireNonNull(filePath, "filePath can't be null"); |
| Objects.requireNonNull(label, "label can't be null"); |
| if (filePath.trim().isEmpty()) { |
| throw new TopologyException("filePath can't be an empty string"); |
| } |
| String name = builder.newProcessorName(PRINTING_NAME); |
| try { |
| PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); |
| builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name); |
| } catch (final FileNotFoundException | UnsupportedEncodingException e) { |
| throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage())); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void foreach(final ForeachAction<? super K, ? super V> action) { |
| Objects.requireNonNull(action, "action can't be null"); |
| String name = builder.newProcessorName(FOREACH_NAME); |
| KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() { |
| @Override |
| public void apply(K key, Change<V> value) { |
| action.apply(key, value.newValue); |
| } |
| }, false); |
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic, |
| final String queryableStoreName) { |
| to(keySerde, valSerde, partitioner, topic); |
| |
| return builder.table(topic, |
| new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null), |
| new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde), |
| builder, |
| KTableImpl.TOSTREAM_NAME)); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| to(keySerde, valSerde, partitioner, topic); |
| |
| final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null)); |
| return builder.table(topic, consumed, storeSupplier); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic) { |
| return through(keySerde, valSerde, partitioner, topic, (String) null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final String topic, |
| final String queryableStoreName) { |
| return through(keySerde, valSerde, null, topic, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final String topic, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return through(keySerde, valSerde, null, topic, storeSupplier); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final String topic) { |
| return through(keySerde, valSerde, null, topic, (String) null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic, |
| final String queryableStoreName) { |
| return through(null, null, partitioner, topic, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return through(null, null, partitioner, topic, storeSupplier); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic) { |
| return through(null, null, partitioner, topic, (String) null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final String topic, |
| final String queryableStoreName) { |
| return through(null, null, null, topic, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final String topic, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return through(null, null, null, topic, storeSupplier); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public KTable<K, V> through(final String topic) { |
| return through(null, null, null, topic, (String) null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void to(final String topic) { |
| to(null, null, null, topic); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void to(final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic) { |
| to(null, null, partitioner, topic); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void to(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final String topic) { |
| this.toStream().to(keySerde, valSerde, null, topic); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void to(final Serde<K> keySerde, |
| final Serde<V> valSerde, |
| final StreamPartitioner<? super K, ? super V> partitioner, |
| final String topic) { |
| this.toStream().to(keySerde, valSerde, partitioner, topic); |
| } |
| |
| @Override |
| public KStream<K, V> toStream() { |
| String name = builder.newProcessorName(TOSTREAM_NAME); |
| |
| builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() { |
| @Override |
| public V apply(final K key, final Change<V> change) { |
| return change.newValue; |
| } |
| }), this.name); |
| |
| return new KStreamImpl<>(builder, name, sourceNodes, false); |
| } |
| |
| @Override |
| public <K1> KStream<K1, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { |
| return toStream().selectKey(mapper); |
| } |
| |
| @Override |
| public <V1, R> KTable<K, R> join(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { |
| return doJoin(other, joiner, null, false, false); |
| } |
| |
| @Override |
| public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, |
| final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, |
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { |
| Objects.requireNonNull(other, "other can't be null"); |
| Objects.requireNonNull(joiner, "joiner can't be null"); |
| Objects.requireNonNull(materialized, "materialized can't be null"); |
| return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> join(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final Serde<R> joinSerde, |
| final String queryableStoreName) { |
| return doJoin(other, joiner, false, false, joinSerde, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> join(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doJoin(other, joiner, false, false, storeSupplier); |
| } |
| |
| @Override |
| public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { |
| return doJoin(other, joiner, null, true, true); |
| } |
| |
| @Override |
| public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, |
| final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, |
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { |
| return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final Serde<R> joinSerde, |
| final String queryableStoreName) { |
| return doJoin(other, joiner, true, true, joinSerde, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doJoin(other, joiner, true, true, storeSupplier); |
| } |
| |
| @Override |
| public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { |
| return doJoin(other, joiner, null, true, false); |
| } |
| |
| @Override |
| public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, |
| final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, |
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { |
| return doJoin(other, |
| joiner, |
| new MaterializedInternal<>(materialized, builder, MERGE_NAME), |
| true, |
| false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final Serde<R> joinSerde, |
| final String queryableStoreName) { |
| return doJoin(other, joiner, true, false, joinSerde, queryableStoreName); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); |
| return doJoin(other, joiner, true, false, storeSupplier); |
| } |
| |
| @SuppressWarnings({"unchecked", "deprecation"}) |
| private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final boolean leftOuter, |
| final boolean rightOuter, |
| final Serde<R> joinSerde, |
| final String queryableStoreName) { |
| Objects.requireNonNull(other, "other can't be null"); |
| Objects.requireNonNull(joiner, "joiner can't be null"); |
| |
| final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier |
| = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName); |
| |
| return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier); |
| } |
| |
| @SuppressWarnings({"unchecked", "deprecation"}) |
| private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final boolean leftOuter, |
| final boolean rightOuter, |
| final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { |
| Objects.requireNonNull(other, "other can't be null"); |
| Objects.requireNonNull(joiner, "joiner can't be null"); |
| final String joinMergeName = builder.newProcessorName(MERGE_NAME); |
| final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name(); |
| final KTable<K, R> result = buildJoin((AbstractStream<K>) other, |
| joiner, |
| leftOuter, |
| rightOuter, |
| joinMergeName, |
| internalQueryableName); |
| |
| if (internalQueryableName != null) { |
| builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName); |
| } |
| |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other, |
| final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, |
| final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized, |
| final boolean leftOuter, |
| final boolean rightOuter) { |
| Objects.requireNonNull(other, "other can't be null"); |
| Objects.requireNonNull(joiner, "joiner can't be null"); |
| final String internalQueryableName = materialized == null ? null : materialized.storeName(); |
| final String joinMergeName = builder.newProcessorName(MERGE_NAME); |
| final KTable<K, VR> result = buildJoin((AbstractStream<K>) other, |
| joiner, |
| leftOuter, |
| rightOuter, |
| joinMergeName, |
| internalQueryableName); |
| |
| if (materialized != null) { |
| final StoreBuilder<KeyValueStore<K, VR>> storeBuilder |
| = new KeyValueStoreMaterializer<>(materialized).materialize(); |
| builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName); |
| } |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other, |
| final ValueJoiner<? super V, ? super V1, ? extends R> joiner, |
| final boolean leftOuter, |
| final boolean rightOuter, |
| final String joinMergeName, |
| final String internalQueryableName) { |
| final Set<String> allSourceNodes = ensureJoinableWith(other); |
| |
| if (leftOuter) { |
| enableSendingOldValues(); |
| } |
| if (rightOuter) { |
| ((KTableImpl) other).enableSendingOldValues(); |
| } |
| |
| final String joinThisName = builder.newProcessorName(JOINTHIS_NAME); |
| final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME); |
| |
| |
| final KTableKTableAbstractJoin<K, R, V, V1> joinThis; |
| final KTableKTableAbstractJoin<K, R, V1, V> joinOther; |
| |
| if (!leftOuter) { // inner |
| joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); |
| joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); |
| } else if (!rightOuter) { // left |
| joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); |
| joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); |
| } else { // outer |
| joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); |
| joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); |
| } |
| |
| final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( |
| new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.queryableStoreName, false), |
| new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, |
| ((KTableImpl<K, ?, ?>) other).queryableStoreName, false), |
| internalQueryableName |
| ); |
| |
| builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, this.name); |
| builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); |
| builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); |
| builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); |
| builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); |
| return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, |
| final Serde<K1> keySerde, |
| final Serde<V1> valueSerde) { |
| return groupBy(selector, Serialized.with(keySerde, valueSerde)); |
| } |
| |
| @Override |
| public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) { |
| return this.groupBy(selector, Serialized.<K1, V1>with(null, null)); |
| } |
| |
| @Override |
| public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, |
| final Serialized<K1, V1> serialized) { |
| Objects.requireNonNull(selector, "selector can't be null"); |
| Objects.requireNonNull(serialized, "serialized can't be null"); |
| String selectName = builder.newProcessorName(SELECT_NAME); |
| |
| KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); |
| |
| // select the aggregate key and values (old and new), it would require parent to send old values |
| builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name); |
| this.enableSendingOldValues(); |
| final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized); |
| return new KGroupedTableImpl<>(builder, |
| selectName, |
| this.name, |
| serializedInternal.keySerde(), |
| serializedInternal.valueSerde()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| KTableValueGetterSupplier<K, V> valueGetterSupplier() { |
| if (processorSupplier instanceof KTableSource) { |
| KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; |
| return new KTableSourceValueGetterSupplier<>(source.storeName); |
| } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { |
| return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); |
| } else { |
| return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| void enableSendingOldValues() { |
| if (!sendOldValues) { |
| if (processorSupplier instanceof KTableSource) { |
| KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier; |
| source.enableSendingOldValues(); |
| } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { |
| ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues(); |
| } else { |
| ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues(); |
| } |
| sendOldValues = true; |
| } |
| } |
| |
| boolean sendingOldValueEnabled() { |
| return sendOldValues; |
| } |
| |
| } |