blob: 997d846e3bf3434ced3e9e6e3665a3a5310a2ebc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
private final static String TABLE_2 = "table2";
private final static String TABLE_3 = "table3";
private final static String OUTPUT = "output-";
private final Properties streamsConfig = getStreamsConfig();
private final Properties streamsConfigTwo = getStreamsConfig();
private final Properties streamsConfigThree = getStreamsConfig();
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
private final static Properties CONSUMER_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG_1 = new Properties();
private final static Properties PRODUCER_CONFIG_2 = new Properties();
private final static Properties PRODUCER_CONFIG_3 = new Properties();
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(TABLE_1, 3, 1);
CLUSTER.createTopic(TABLE_2, 5, 1);
CLUSTER.createTopic(TABLE_3, 7, 1);
CLUSTER.createTopic(OUTPUT, 11, 1);
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_3.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<Integer, Float>> table1 = asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
new KeyValue<>(3, -1.22f), //Won't be joined in yet.
new KeyValue<>(4, -2.22f) //Won't be joined in at all.
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<String, Long>> table2 = asList(
new KeyValue<>("0", 0L), //partition 2
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L), //partition 2
new KeyValue<>("4", 40L), //partition 1
new KeyValue<>("5", 50L), //partition 0
new KeyValue<>("6", 60L), //partition 1
new KeyValue<>("7", 70L), //partition 0
new KeyValue<>("8", 80L), //partition 0
new KeyValue<>("9", 90L) //partition 2
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<Integer, String>> table3 = Collections.singletonList(
new KeyValue<>(10, "waffle")
);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, PRODUCER_CONFIG_3, MOCK_TIME);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@BeforeEach
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
}
@Test
public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
final Set<KeyValue<Integer, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>(1, "value1=1.33,value2=10,value3=waffle"));
verifyKTableKTableJoin(expectedOne);
}
private void verifyKTableKTableJoin(final Set<KeyValue<Integer, String>> expectedResult) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
final String queryableNameTwo = innerJoinType + "-store2";
streams = prepareTopology(queryableName, queryableNameTwo, streamsConfig);
streamsTwo = prepareTopology(queryableName, queryableNameTwo, streamsConfigTwo);
streamsThree = prepareTopology(queryableName, queryableNameTwo, streamsConfigThree);
final List<KafkaStreams> kafkaStreamsList = asList(streams, streamsTwo, streamsThree);
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size()));
assertEquals(expectedResult, result);
}
private static Properties getStreamsConfig() {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return streamsConfig;
}
private static KafkaStreams prepareTopology(final String queryableName,
final String queryableNameTwo,
final Properties streamsConfig) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, Float> table1 = builder.table(
TABLE_1,
Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.Float(), streamsConfig, false))
);
final KTable<String, Long> table2 = builder.table(
TABLE_2,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.Long(), streamsConfig, false))
);
final KTable<Integer, String> table3 = builder.table(
TABLE_3,
Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) {
materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true))
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materializedTwo;
if (queryableNameTwo != null) {
materializedTwo = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableNameTwo)
.withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true))
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final Function<Float, String> tableOneKeyExtractor = value -> Integer.toString((int) value.floatValue());
final Function<String, Integer> joinedTableKeyExtractor = value -> {
//Hardwired to return the desired foreign key as a test shortcut
if (value.contains("value2=10"))
return 10;
else
return 0;
};
final ValueJoiner<Float, Long, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final ValueJoiner<String, String, String> joinerTwo = (value1, value2) -> value1 + ",value3=" + value2;
table1.join(table2, tableOneKeyExtractor, joiner, materialized)
.join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo)
.toStream()
.to(OUTPUT,
Produced.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
}