blob: 2a3e7670e2878ad107f122b1010a40292f626914 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import kafka.admin.AdminUtils;
import kafka.log.LogConfig;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
/**
* Tests related to internal topics in streams
*/
public class InternalTopicIntegrationTest {
@ClassRule
public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster();
private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
@BeforeClass
public static void startKafkaCluster() throws Exception {
cluster.createTopic(DEFAULT_INPUT_TOPIC);
cluster.createTopic(DEFAULT_OUTPUT_TOPIC);
}
/**
* Validates that any state changelog topics are compacted
* @return true if topics have a valid config, false otherwise
*/
private boolean isUsingCompactionForStateChangelogTopics() {
boolean valid = true;
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
cluster.zKConnectString(),
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
ZKStringSerializer$.MODULE$);
boolean isSecure = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure);
Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
Iterator it = topicConfigs.iterator();
while (it.hasNext()) {
Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
String topic = topicConfig._1;
Properties prop = topicConfig._2;
// state changelogs should be compacted
if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
!prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
valid = false;
break;
}
}
}
zkClient.close();
return valid;
}
@Test
public void shouldCompactTopicsForStateChangelogs() throws Exception {
List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
//
// Step 1: Configure and start a simple word count topology
//
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
KStream<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(value, value);
}
}).countByKey("Counts").toStream();
wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
//
// Step 3: Verify the state changelog topics are compact
//
streams.close();
assertEquals(isUsingCompactionForStateChangelogTopics(), true);
}
}