blob: a065cf1d3f5d16df37d7963446368eb14b679feb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Optional;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.TestInfo;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@Timeout(600)
@Tag("integration")
public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
private final static String TABLE_2 = "table2";
private final static String OUTPUT = "output-";
private Properties streamsConfig;
private Properties streamsConfigTwo;
private Properties streamsConfigThree;
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
private final static Properties CONSUMER_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG_1 = new Properties();
private final static Properties PRODUCER_CONFIG_2 = new Properties();
static class MultiPartitioner implements StreamPartitioner<String, Void> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Void value, final int numPartitions) {
return null;
}
@Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Void value, final int numPartitions) {
return Optional.of(new HashSet<>(Arrays.asList(0, 1, 2)));
}
}
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(TABLE_1, 4, 1);
CLUSTER.createTopic(TABLE_2, 4, 1);
CLUSTER.createTopic(OUTPUT, 4, 1);
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<String, String>> table1 = asList(
new KeyValue<>("ID123-1", "ID123-A1"),
new KeyValue<>("ID123-2", "ID123-A2"),
new KeyValue<>("ID123-3", "ID123-A3"),
new KeyValue<>("ID123-4", "ID123-A4")
);
final List<KeyValue<String, String>> table2 = asList(
new KeyValue<>("ID123", "BBB")
);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@BeforeEach
public void before(final TestInfo testInfo) throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfig = getStreamsConfig(safeTestName);
streamsConfigTwo = getStreamsConfig(safeTestName);
streamsConfigThree = getStreamsConfig(safeTestName);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
}
@Test
public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
final Set<KeyValue<String, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>("ID123-1", "value1=ID123-A1,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-2", "value1=ID123-A2,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-3", "value1=ID123-A3,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-4", "value1=ID123-A4,value2=BBB"));
verifyKTableKTableJoin(expectedOne);
}
@Test
public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions() throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
streams = prepareTopologyWithNonSingletonPartitions(queryableName, streamsConfig);
streamsTwo = prepareTopologyWithNonSingletonPartitions(queryableName, streamsConfigTwo);
streamsThree = prepareTopologyWithNonSingletonPartitions(queryableName, streamsConfigThree);
final List<KafkaStreams> kafkaStreamsList = asList(streams, streamsTwo, streamsThree);
for (final KafkaStreams stream: kafkaStreamsList) {
stream.setUncaughtExceptionHandler(e -> {
assertThat(e.getCause().getMessage(), equalTo("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set"));
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
}
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
// the streams applications should have shut down into `ERROR` due to the IllegalStateException
waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.ERROR, ofSeconds(60));
}
private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
streams = prepareTopology(queryableName, streamsConfig);
streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
streamsThree = prepareTopology(queryableName, streamsConfigThree);
final List<KafkaStreams> kafkaStreamsList = asList(streams, streamsTwo, streamsThree);
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size()));
assertThat(expectedResult, equalTo(result));
}
private Properties getStreamsConfig(final String testName) {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner-" + testName);
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return streamsConfig;
}
private static KafkaStreams prepareTopology(final String queryableName, final Properties streamsConfig) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table1 = builder.stream(TABLE_1,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionA())
.toTable(Named.as("table.a"));
final KTable<String, String> table2 = builder
.stream(TABLE_2,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionB())
.toTable(Named.as("table.b"));
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) {
materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true))
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final TableJoined<String, String> tableJoined = TableJoined.with(
(topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions,
(topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions
);
table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
.toStream()
.to(OUTPUT,
Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
private static KafkaStreams prepareTopologyWithNonSingletonPartitions(final String queryableName, final Properties streamsConfig) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table1 = builder.stream(TABLE_1,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionA())
.toTable(Named.as("table.a"));
final KTable<String, String> table2 = builder
.stream(TABLE_2,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionB())
.toTable(Named.as("table.b"));
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) {
materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true))
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final TableJoined<String, String> tableJoined = TableJoined.with(
new MultiPartitioner(),
(topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions
);
table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
.toStream()
.to(OUTPUT,
Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
private static Repartitioned<String, String> repartitionA() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("a");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions)
.withNumberOfPartitions(4);
}
private static Repartitioned<String, String> repartitionB() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("b");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions)
.withNumberOfPartitions(4);
}
private static String getKeyB(final String value) {
return value.substring(0, value.indexOf("-"));
}
}