blob: 0d1e49af64507fffc308cab03e3487f032aaee92 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.test.framework;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.execution.TestStreamManager;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.test.harness.IntegrationTestHarness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
* Harness for writing integration tests for {@link SamzaApplication}s.
* <p> This provides the following features for its sub-classes:
* <ul>
* <li>
* Automatic Setup and teardown: Any non-trivial integration test brings up components like Zookeeper
* servers, Kafka brokers, Kafka producers and Kafka consumers. This harness initializes each
* of these components in {@link #setUp()} and shuts down each of them cleanly in {@link #tearDown()}.
* {@link #setUp()} and {@link #tearDown()} are automatically invoked from the Junit runner.
* <li>
* Interaction with Kafka: The harness provides convenience methods to interact with Kafka brokers, consumers
* and producers - for instance, methods to create topics, produce and consume messages.
* <li>
* Config defaults: Often Samza integration tests have to setup config boiler plate
* to perform even simple tasks like producing and consuming messages. This harness provides default string
* serdes for producing / consuming messages and a default system-alias named "kafka" that uses the
* {@link org.apache.samza.system.kafka.KafkaSystemFactory}.
* <li>
* Debugging: At times, it is convenient to debug integration tests locally from an IDE. This harness
* runs all its components (including Kafka brokers, Zookeeper servers and Samza) locally.
* </ul>
* <p> <i> Implementation Notes: </i> <br/>
* State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and
* Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()} <br/>
* Execution model: {@link SamzaApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s.
* Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads.
* {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls.
* <h3>Usage Example</h3>
* Here is an actual test that publishes a message into Kafka, runs an application, and verifies consumption
* from the output topic.
* <pre> {@code
* class MyTest extends StreamApplicationIntegrationTestHarness {
* private final StreamApplication myApp = new MyStreamApplication();
* private final Collection<String> outputTopics = Collections.singletonList("output-topic");
* @Test
* public void test() {
* createTopic("mytopic", 1);
* produceMessage("mytopic", 0, "key1", "val1");
* runApplication(myApp, "myApp", null);
* List<ConsumerRecord<String, String>> messages = consumeMessages(outputTopics)
* Assert.assertEquals(messages.size(), 1);
* }
* }}</pre>
public class StreamApplicationIntegrationTestHarness extends IntegrationTestHarness {
private static final Logger LOG = LoggerFactory.getLogger(StreamApplicationIntegrationTestHarness.class);
private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
private static final int DEFAULT_REPLICATION_FACTOR = 1;
private int numEmptyPolls = 3;
* Creates a kafka topic with the provided name and the number of partitions
* @param topicName the name of the topic
* @param numPartitions the number of partitions in the topic
public boolean createTopic(String topicName, int numPartitions) {
return createTopic(topicName, numPartitions, DEFAULT_REPLICATION_FACTOR);
* Produces a message to the provided topic partition.
* @param topicName the topic to produce messages to
* @param partitionId the topic partition to produce messages to
* @param key the key in the message
* @param val the value in the message
public void produceMessage(String topicName, int partitionId, String key, String val) {
producer.send(new ProducerRecord<>(topicName, partitionId, key, val));
producer.flush();"Sent key: {} val: {} to topic: {} on partition: {}", key, val, topicName, partitionId);
public int clusterSize() {
return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
* Read messages from the provided list of topics until {@param threshold} messages have been read or until
* {@link #numEmptyPolls} polls return no messages.
* The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
* determined by {@link #numEmptyPolls}
* @param topics the list of topics to consume from
* @param threshold the number of messages to consume
* @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
int emptyPollCount = 0;
List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) {
ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS);"Read {} messages from topics: {}", records.count(), StringUtils.join(topics, ","));
if (!records.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext() && recordList.size() < threshold) {
ConsumerRecord record =;"Read key: {} val: {} from topic: {} on partition: {}",
record.key(), record.value(), record.topic(), record.partition());
emptyPollCount = 0;
} else {
return recordList;
* Executes the provided {@link org.apache.samza.application.StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The
* {@link org.apache.samza.application.StreamApplication} runs in its own separate thread.
* @param appName the name of the application
* @param overriddenConfigs configs to override
* @return RunApplicationContext which contains objects created within runApplication, to be used for verification
* if necessary
protected RunApplicationContext runApplication(SamzaApplication streamApplication,
String appName,
Map<String, String> overriddenConfigs) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
configMap.put(ApplicationConfig.APP_NAME, appName);
configMap.put(ApplicationConfig.APP_CLASS, streamApplication.getClass().getCanonicalName());
configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory");
configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
configMap.put("systems.kafka.samza.key.serde", "string");
configMap.put("systems.kafka.samza.msg.serde", "string");
configMap.put("systems.kafka.samza.offset.default", "oldest");
configMap.put("job.coordinator.system", "kafka");
configMap.put("job.default.system", "kafka");
configMap.put("job.coordinator.replication.factor", "1");
configMap.put("", "1000");
configMap.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName());
// This is to prevent tests from taking a long time to stop after they're done. The issue is that
// tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
// The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any
// store changelogs, which then get stuck trying to produce to the stopped Kafka server.
// Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when
// kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create
// changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK
// since the test method has already executed by the time the shutdown hook is called. The side effect is
// that buffered state (e.g. changelog contents) might not be flushed correctly after the test run.
configMap.put("", "1");
if (overriddenConfigs != null) {
Config config = new MapConfig(configMap);
ApplicationRunner runner = ApplicationRunners.getApplicationRunner(streamApplication, config);
executeRun(runner, config);
return new RunApplicationContext(runner, config);
* Container for any necessary context created during runApplication. Allows tests to access objects created within
* runApplication in order to do verification.
protected static class RunApplicationContext {
private final ApplicationRunner runner;
private final Config config;
private RunApplicationContext(ApplicationRunner runner, Config config) {
this.runner = runner;
this.config = config;
public ApplicationRunner getRunner() {
return this.runner;
public Config getConfig() {
return this.config;
protected Properties createProducerConfigs() {
Properties producerProps = super.createProducerConfigs();
* Stream application tests uses string serde for both key and value.
* The default serde for the test producer in IntegrationTestHarness uses
* byte serde for value and string for key.
return producerProps;
protected Properties createConsumerConfigs() {
Properties consumerProps = super.createConsumerConfigs();
* Stream application tests uses string serde for both key and value.
* The default serde for the test producer in IntegrationTestHarness uses
* byte serde for value and string for key.
return consumerProps;