blob: 72d9cb56330d7af7ad48655bd1558f31318bdce6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import scala.collection.mutable.ArraySeq;
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* An implementation of the KafkaServerProvider for Kafka 0.10 .
*/
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
private File tmpZkDir;
private File tmpKafkaParent;
private List<File> tmpKafkaDirs;
private List<KafkaServer> brokers;
private TestingServer zookeeper;
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
private Config config;
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000;
public String getBrokerConnectionString() {
return brokerConnectionString;
}
@Override
public Properties getStandardProperties() {
return standardProps;
}
@Override
public Properties getSecureProperties() {
Properties prop = new Properties();
if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
//add special timeout for Travis
prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
prop.setProperty("metadata.fetch.timeout.ms", "120000");
}
return prop;
}
@Override
public String getVersion() {
return "0.10";
}
@Override
public List<KafkaServer> getBrokers() {
return brokers;
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer010<>(topics, readSchema, props);
}
@Override
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List<ConsumerRecord<K, V>> result = new ArrayList<>();
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord = false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
result.add(record);
processedAtLeastOneRecord = true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}
@Override
public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return new StreamSink<>(prod);
}
@Override
public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
}
@Override
public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props);
prod.setFlushOnCheckpoint(true);
prod.setWriteTimestampToKafka(true);
return stream.addSink(prod);
}
@Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
@Override
public void restartBroker(int leaderId) throws Exception {
brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
}
@Override
public int getLeaderToShutDown(String topic) throws Exception {
ZkUtils zkUtils = getZkUtils();
try {
MetadataResponse.PartitionMetadata firstPart = null;
do {
if (firstPart != null) {
LOG.info("Unable to find leader. error code {}", firstPart.error().code());
// not the first try. Sleep a bit
Thread.sleep(150);
}
List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
firstPart = partitionMetadata.get(0);
}
while (firstPart.error().code() != 0);
return firstPart.leader().id();
} finally {
zkUtils.close();
}
}
@Override
public int getBrokerId(KafkaServer server) {
return server.config().brokerId();
}
@Override
public boolean isSecureRunSupported() {
return true;
}
@Override
public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15;
}
this.config = config;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
}
zookeeper = null;
brokers = null;
try {
zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(config.getKafkaServersNumber());
ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
brokers.add(kafkaServer);
brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
brokerConnectionString += ",";
}
LOG.info("ZK and KafkaServer started.");
}
catch (Throwable t) {
t.printStackTrace();
fail("Test setup failed: " + t.getMessage());
}
standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
}
@Override
public void shutdown() throws Exception {
for (KafkaServer broker : brokers) {
if (broker != null) {
broker.shutdown();
}
}
brokers.clear();
if (zookeeper != null) {
try {
zookeeper.stop();
}
catch (Exception e) {
LOG.warn("ZK.stop() failed", e);
}
zookeeper = null;
}
// clean up the temp spaces
if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
try {
FileUtils.deleteDirectory(tmpKafkaParent);
}
catch (Exception e) {
// ignore
}
}
if (tmpZkDir != null && tmpZkDir.exists()) {
try {
FileUtils.deleteDirectory(tmpZkDir);
}
catch (Exception e) {
// ignore
}
}
super.shutdown();
}
public ZkUtils getZkUtils() {
ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
return ZkUtils.apply(creator, false);
}
@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
// create topic with one client
LOG.info("Creating topic {}", topic);
ZkUtils zkUtils = getZkUtils();
try {
AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
} finally {
zkUtils.close();
}
// validate that the topic has been created
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Thread.sleep(wait);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
// restore interrupted state
}
// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
// not always correct.
// create a new ZK utils connection
ZkUtils checkZKConn = getZkUtils();
if (AdminUtils.topicExists(checkZKConn, topic)) {
checkZKConn.close();
return;
}
checkZKConn.close();
}
while (System.nanoTime() < deadline);
fail("Test topic could not be created");
}
@Override
public void deleteTestTopic(String topic) {
ZkUtils zkUtils = getZkUtils();
try {
LOG.info("Deleting topic {}", topic);
ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
AdminUtils.deleteTopic(zkUtils, topic);
zk.close();
} finally {
zkUtils.close();
}
}
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
*/
protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
Properties kafkaProperties = new Properties();
// properties have to be Strings
kafkaProperties.put("advertised.host.name", KAFKA_HOST);
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(config.getKafkaServerProperties());
}
final int numTries = 5;
for (int i = 1; i <= numTries; i++) {
int kafkaPort = NetUtils.getAvailablePort();
kafkaProperties.put("port", Integer.toString(kafkaPort));
if (config.isHideKafkaBehindProxy()) {
NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
kafkaProperties.put("advertised.port", proxy.getLocalPort());
}
//to support secure kafka cluster
if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.putAll(getSecureProperties());
}
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
try {
scala.Option<String> stringNone = scala.Option.apply(null);
KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
server.startup();
return server;
}
catch (KafkaException e) {
if (e.getCause() instanceof BindException) {
// port conflict, retry...
LOG.info("Port conflict when starting Kafka Broker. Retrying...");
}
else {
throw e;
}
}
}
throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
}
private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
private final KafkaConsumer<byte[], byte[]> offsetClient;
public KafkaOffsetHandlerImpl() {
Properties props = new Properties();
props.putAll(standardProps);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
offsetClient = new KafkaConsumer<>(props);
}
@Override
public Long getCommittedOffset(String topicName, int partition) {
OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
return (committed != null) ? committed.offset() : null;
}
@Override
public void setCommittedOffset(String topicName, int partition, long offset) {
Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
offsetClient.commitSync(partitionAndOffset);
}
@Override
public void close() {
offsetClient.close();
}
}
}