blob: 85e2cf72b4678aec1110af7d579899a3f410b2f7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
public class KTableKTableJoinIntegrationTest {
private final static int NUM_BROKERS = 1;
@ClassRule
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
private final static String TABLE_2 = "table2";
private final static String TABLE_3 = "table3";
private final static String OUTPUT = "output-";
private static Properties streamsConfig;
private KafkaStreams streams;
private final static Properties CONSUMER_CONFIG = new Properties();
@Parameterized.Parameter(value = 0)
public JoinType joinType1;
@Parameterized.Parameter(value = 1)
public JoinType joinType2;
@Parameterized.Parameter(value = 2)
public List<KeyValue<String, String>> expectedResult;
//Single parameter, use Object[]
@Parameterized.Parameters
public static Object[] parameters() {
return new Object[][]{
{JoinType.INNER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null))
},
{JoinType.INNER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
)},
{JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C3")
)},
{JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C3")
)},
{JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)}
};
}
public static Object[] data() {
return new Object[]{0, 10 * 1024 * 1024L};
}
@BeforeClass
public static void beforeTest() throws Exception {
CLUSTER.createTopic(TABLE_1);
CLUSTER.createTopic(TABLE_2);
CLUSTER.createTopic(TABLE_3);
CLUSTER.createTopic(OUTPUT);
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<String, String>> table1 = Arrays.asList(
new KeyValue<>("a", "A1"),
new KeyValue<>("b", "B1")
);
final List<KeyValue<String, String>> table2 = Arrays.asList(
new KeyValue<>("b", "B2"),
new KeyValue<>("c", "C2")
);
final List<KeyValue<String, String>> table3 = Arrays.asList(
new KeyValue<>("a", "A3"),
new KeyValue<>("b", "B3"),
new KeyValue<>("c", "C3")
);
// put table 3 first, to make sure data is there when joining T1 with T2
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@Before
public void before() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@After
public void after() throws Exception {
if (streams != null) {
streams.close();
streams = null;
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
private enum JoinType {
INNER, LEFT, OUTER
}
private KafkaStreams prepareTopology() {
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
}
private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + "-" + value2;
}
};
switch (joinType) {
case INNER:
return first.join(second, joiner);
case LEFT:
return first.leftJoin(second, joiner);
case OUTER:
return first.outerJoin(second, joiner);
}
throw new RuntimeException("Unknown join type.");
}
@Test
public void KTableKTableJoin() throws Exception {
System.out.println("join: " + joinType1 + "-" + joinType2);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
streams = prepareTopology();
streams.start();
final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size());
assertThat(result, equalTo(expectedResult));
}
}