blob: 2d6ea36e2165b561b72fecf1143a3592bccec634 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.fail;
/**
* The base for the Kafka tests. It brings up:
* <ul>
* <li>A ZooKeeper mini cluster</li>
* <li>Three Kafka Brokers (mini clusters)</li>
* <li>A Flink mini cluster</li>
* </ul>
*
* <p>Code in this test is based on the following GitHub repository:
* <a href="https://github.com/sakserv/hadoop-mini-clusters">
* https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
* as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
*/
@SuppressWarnings("serial")
public abstract class KafkaTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
protected static final int NUM_TMS = 1;
protected static final int TM_SLOTS = 8;
protected static String brokerConnectionStrings;
protected static Properties standardProps;
@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getFlinkConfiguration(), NUM_TMS, TM_SLOTS), true);
protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
protected static KafkaTestEnvironment kafkaServer;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
protected static Properties secureProps = new Properties();
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
@BeforeClass
public static void prepare() throws ClassNotFoundException {
prepare(true);
}
public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaTestBase ");
LOG.info("-------------------------------------------------------------------------");
startClusters(false, hideKafkaBehindProxy);
}
@AfterClass
public static void shutDownServices() throws Exception {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Shut down KafkaTestBase ");
LOG.info("-------------------------------------------------------------------------");
TestStreamEnvironment.unsetAsContext();
shutdownClusters();
LOG.info("-------------------------------------------------------------------------");
LOG.info(" KafkaTestBase finished");
LOG.info("-------------------------------------------------------------------------");
}
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
// dynamically load the implementation for the test
Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
kafkaServer.prepare(kafkaServer.createConfig()
.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
.setSecureMode(secureMode)
.setHideKafkaBehindProxy(hideKafkaBehindProxy));
standardProps = kafkaServer.getStandardProperties();
brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
if (secureMode) {
if (!kafkaServer.isSecureRunSupported()) {
throw new IllegalStateException(
"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
}
secureProps = kafkaServer.getSecureProperties();
}
}
protected static void shutdownClusters() throws Exception {
if (secureProps != null) {
secureProps.clear();
}
kafkaServer.shutdown();
}
// ------------------------------------------------------------------------
// Execution utilities
// ------------------------------------------------------------------------
protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
try {
see.execute(name);
}
catch (ProgramInvocationException | JobExecutionException root) {
Throwable cause = root.getCause();
// search for nested SuccessExceptions
int depth = 0;
while (!(cause instanceof SuccessException)) {
if (cause == null || depth++ == 20) {
throw root;
}
else {
cause = cause.getCause();
}
}
}
}
protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
}
protected static void deleteTestTopic(String topic) {
kafkaServer.deleteTestTopic(topic);
}
/**
* We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
* After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
*/
protected void assertAtLeastOnceForTopic(
Properties properties,
String topic,
int partition,
Set<Integer> expectedElements,
long timeoutMillis) throws Exception {
long startMillis = System.currentTimeMillis();
Set<Integer> actualElements = new HashSet<>();
// until we timeout...
while (System.currentTimeMillis() < startMillis + timeoutMillis) {
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
// query kafka for new records ...
Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
for (ConsumerRecord<Integer, Integer> record : records) {
actualElements.add(record.value());
}
// succeed if we got all expectedElements
if (actualElements.containsAll(expectedElements)) {
return;
}
}
fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
}
/**
* We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
* After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
*/
protected void assertExactlyOnceForTopic(
Properties properties,
String topic,
int partition,
List<Integer> expectedElements,
long timeoutMillis) throws Exception {
long startMillis = System.currentTimeMillis();
List<Integer> actualElements = new ArrayList<>();
Properties consumerProperties = new Properties();
consumerProperties.putAll(properties);
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProperties.put("isolation.level", "read_committed");
// until we timeout...
while (System.currentTimeMillis() < startMillis + timeoutMillis) {
// query kafka for new records ...
Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000);
for (ConsumerRecord<Integer, Integer> record : records) {
actualElements.add(record.value());
}
// succeed if we got all expectedElements
if (actualElements.equals(expectedElements)) {
return;
}
// fail early if we already have too many elements
if (actualElements.size() > expectedElements.size()) {
break;
}
}
fail(String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size()));
}
}