blob: cc55e5711edd4fce87f9d9e60d697b2384a518a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@Timeout(600)
@Tag("integration")
public class KStreamKStreamIntegrationTest {
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String LEFT_STREAM = "leftStream";
private final static String RIGHT_STREAM = "rightStream";
private final static String OUTPUT = "output";
private Properties streamsConfig;
private KafkaStreams streams;
private final static Properties CONSUMER_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG = new Properties();
@BeforeAll
public static void startCluster() throws Exception {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(LEFT_STREAM, 4, 1);
CLUSTER.createTopic(RIGHT_STREAM, 4, 1);
CLUSTER.createTopic(OUTPUT, 4, 1);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@BeforeEach
public void before(final TestInfo testInfo) throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfig = getStreamsConfig(safeTestName);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
}
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@Test
public void shouldOuterJoin() throws Exception {
final Set<KeyValue<String, String>> expected = new HashSet<>();
expected.add(new KeyValue<>("Key-1", "value1=left-1a,value2=null"));
expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
verifyKStreamKStreamOuterJoin(expected);
}
private void verifyKStreamKStreamOuterJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
streams = prepareTopology(streamsConfig);
startApplicationAndWaitUntilRunning(Collections.singletonList(streams), ofSeconds(120));
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<String, String>> left1 = asList(
new KeyValue<>("Key-1", "left-1a"),
new KeyValue<>("Key-2", "left-2a"),
new KeyValue<>("Key-3", "left-3a"),
new KeyValue<>("Key-4", "left-4a")
);
final List<KeyValue<String, String>> left2 = asList(
new KeyValue<>("Key-1", "left-1b"),
new KeyValue<>("Key-2", "left-2b"),
new KeyValue<>("Key-3", "left-3b"),
new KeyValue<>("Key-4", "left-4b")
);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left1, PRODUCER_CONFIG, MOCK_TIME);
MOCK_TIME.sleep(10000);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left2, PRODUCER_CONFIG, MOCK_TIME);
final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size()));
assertThat(expectedResult, equalTo(result));
}
private Properties getStreamsConfig(final String testName) {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStream-KStream-join" + testName);
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
return streamsConfig;
}
private static KafkaStreams prepareTopology(final Properties streamsConfig) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream1 = builder.stream(LEFT_STREAM);
final KStream<String, String> stream2 = builder.stream(RIGHT_STREAM);
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
stream1.outerJoin(stream2, joiner, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10))).to(OUTPUT);
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
}