blob: d9a976872c256995042e005f44f1f18754f74e5f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@RunWith(Parameterized.class)
public class KStreamRepartitionJoinTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final MockTime mockTime = CLUSTER.time;
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
private KStreamBuilder builder;
private Properties streamsConfiguration;
private KStream<Long, Integer> streamOne;
private KStream<Integer, String> streamTwo;
private KStream<Integer, String> streamFour;
private ValueJoiner<Integer, String, String> valueJoiner;
private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
private final List<String>
expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
private KafkaStreams kafkaStreams;
private String streamOneInput;
private String streamTwoInput;
private String streamFourInput;
private static volatile int testNo = 0;
@Parameter
public long cacheSizeBytes;
//Single parameter, use Object[]
@Parameters
public static Object[] data() {
return new Object[] {0, 10 * 1024 * 1024L};
}
@Before
public void before() {
testNo++;
String applicationId = "kstream-repartition-join-test-" + testNo;
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
valueJoiner = new ValueJoiner<Integer, String, String>() {
@Override
public String apply(final Integer value1, final String value2) {
return value1 + ":" + value2;
}
};
keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}
@After
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@Test
public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
produceMessages();
final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
final ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin();
final ExpectedOutputOnTopic flatMapJoin = flatMapJoin();
final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream();
final ExpectedOutputOnTopic mapJoinJoin = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin();
startStreams();
verifyCorrectOutput(mapOne);
verifyCorrectOutput(mapBoth);
verifyCorrectOutput(mapMapJoin);
verifyCorrectOutput(selectKeyJoin);
verifyCorrectOutput(flatMapJoin);
verifyCorrectOutput(mapRhs);
verifyCorrectOutput(mapJoinJoin);
verifyLeftJoin(leftJoin);
}
private ExpectedOutputOnTopic mapStreamOneAndJoin() {
String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
}
private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
}
private ExpectedOutputOnTopic mapMapJoin() throws Exception {
final KStream<Integer, Integer> mapMapStream = streamOne.map(
new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
@Override
public KeyValue<Long, Integer> apply(final Long key, final Integer value) {
if (value == null) {
return new KeyValue<>(null, null);
}
return new KeyValue<>(key + value, value);
}
}).map(keyMapper);
final String outputTopic = "map-map-join-" + testNo;
doJoin(mapMapStream, streamTwo, outputTopic);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
final KStream<Integer, Integer> keySelected =
streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
final String outputTopic = "select-key-join-" + testNo;
doJoin(keySelected, streamTwo, outputTopic);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
private ExpectedOutputOnTopic flatMapJoin() throws Exception {
final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
@Override
public Iterable<KeyValue<Integer, Integer>> apply(final Long key, final Integer value) {
return Collections.singletonList(new KeyValue<>(value, value));
}
});
final String outputTopic = "flat-map-join-" + testNo;
doJoin(flatMapped, streamTwo, outputTopic);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
final ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
@Override
public String apply(final String value1, final Integer value2) {
return value1 + ":" + value2;
}
};
final String output = "join-rhs-stream-mapped-" + testNo;
CLUSTER.createTopic(output);
streamTwo
.join(streamOne.map(keyMapper),
joiner,
getJoinWindow(),
Serdes.Integer(),
Serdes.String(),
Serdes.Integer())
.to(Serdes.Integer(), Serdes.String(), output);
return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
}
public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
final String outputTopic = "left-join-" + testNo;
CLUSTER.createTopic(outputTopic);
map1.leftJoin(map2,
valueJoiner,
getJoinWindow(),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String())
.to(Serdes.Integer(), Serdes.String(), outputTopic);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
kvMapper = MockKeyValueMapper.NoOpKeyValueMapper();
final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
final KStream<Integer, String> join = map1.join(map2,
valueJoiner,
getJoinWindow(),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String());
final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
};
final String topic = "map-join-join-" + testNo;
CLUSTER.createTopic(topic);
join.map(kvMapper)
.join(streamFour.map(kvMapper),
joiner,
getJoinWindow(),
Serdes.Integer(),
Serdes.String(),
Serdes.String())
.to(Serdes.Integer(), Serdes.String(), topic);
return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic);
}
private JoinWindows getJoinWindow() {
return (JoinWindows) JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
}
private class ExpectedOutputOnTopic {
private final List<String> expectedOutput;
private final String outputTopic;
ExpectedOutputOnTopic(final List<String> expectedOutput, final String outputTopic) {
this.expectedOutput = expectedOutput;
this.outputTopic = outputTopic;
}
}
private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
throws InterruptedException {
assertThat(receiveMessages(new StringDeserializer(),
expectedOutputOnTopic.expectedOutput.size(),
expectedOutputOnTopic.outputTopic),
is(expectedOutputOnTopic.expectedOutput));
}
private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic)
throws InterruptedException, ExecutionException {
final List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
.expectedOutput.size(), expectedOutputOnTopic.outputTopic);
if (!received.equals(expectedOutputOnTopic.expectedOutput)) {
produceToStreamOne();
verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic);
}
}
private void produceMessages()
throws ExecutionException, InterruptedException {
produceToStreamOne();
produceStreamTwoInputTo(streamTwoInput);
produceStreamTwoInputTo(streamFourInput);
}
private void produceStreamTwoInputTo(final String streamTwoInput)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamTwoInput,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(2, "B"),
new KeyValue<>(3, "C"),
new KeyValue<>(4, "D"),
new KeyValue<>(5, "E")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
private void produceToStreamOne()
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOneInput,
Arrays.asList(
new KeyValue<>(10L, 1),
new KeyValue<>(5L, 2),
new KeyValue<>(12L, 3),
new KeyValue<>(15L, 4),
new KeyValue<>(20L, 5),
new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
IntegerSerializer.class,
new Properties()),
mockTime);
}
private void createTopics() {
streamOneInput = "stream-one-" + testNo;
streamTwoInput = "stream-two-" + testNo;
streamFourInput = "stream-four-" + testNo;
CLUSTER.createTopic(streamOneInput);
CLUSTER.createTopic(streamTwoInput);
CLUSTER.createTopic(streamFourInput);
}
private void startStreams() {
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
}
private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
final int numMessages, final String topic) throws InterruptedException {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
numMessages,
60 * 1000);
Collections.sort(received);
return received;
}
private void verifyCorrectOutput(final List<String> expectedMessages,
final String topic) throws InterruptedException {
assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic),
is(expectedMessages));
}
private void doJoin(final KStream<Integer, Integer> lhs,
final KStream<Integer, String> rhs,
final String outputTopic) {
CLUSTER.createTopic(outputTopic);
lhs.join(rhs,
valueJoiner,
getJoinWindow(),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String())
.to(Serdes.Integer(), Serdes.String(), outputTopic);
}
}