blob: 235365db638d3258f11eeb1861cd6a55a3f31918 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tests.util.kafka;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.util.FileUtils;
import org.apache.flink.util.OperatingSystem;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* {@link KafkaResource} that downloads kafka and sets up a local kafka cluster with the bundled
* zookeeper.
*/
public class LocalStandaloneKafkaResource implements KafkaResource {
private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResource.class);
private static final Pattern ZK_DATA_DIR_PATTERN = Pattern.compile(".*(dataDir=).*");
private static final Pattern KAFKA_LOG_DIR_PATTERN = Pattern.compile(".*(log\\.dirs=).*");
private static final String ZOOKEEPER_HOST = "localhost";
private static final int ZOOKEEPER_PORT = 2181;
private static final String ZOOKEEPER_ADDRESS = ZOOKEEPER_HOST + ':' + ZOOKEEPER_PORT;
private static final String KAFKA_HOST = "localhost";
private static final int KAFKA_PORT = 9092;
private static final String KAFKA_ADDRESS = KAFKA_HOST + ':' + KAFKA_PORT;
private final TemporaryFolder tmp = new TemporaryFolder();
private final DownloadCache downloadCache = DownloadCache.get();
private final String kafkaVersion;
private Path kafkaDir;
@Nullable private Path logBackupDirectory;
LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path logBackupDirectory) {
OperatingSystemRestriction.forbid(
String.format(
"The %s relies on UNIX utils and shell scripts.",
getClass().getSimpleName()),
OperatingSystem.WINDOWS);
this.kafkaVersion = kafkaVersion;
this.logBackupDirectory = logBackupDirectory;
}
private static String getKafkaDownloadUrl(final String kafkaVersion) {
return String.format(
"https://archive.apache.org/dist/kafka/%s/kafka_2.11-%s.tgz",
kafkaVersion, kafkaVersion);
}
@Override
public void before() throws Exception {
tmp.create();
downloadCache.before();
this.kafkaDir = tmp.newFolder("kafka").toPath().toAbsolutePath();
setupKafkaDist();
setupKafkaCluster();
}
private void setupKafkaDist() throws IOException {
final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
final Path kafkaArchive =
downloadCache.getOrDownload(getKafkaDownloadUrl(kafkaVersion), downloadDirectory);
LOG.info("Kafka location: {}", kafkaDir.toAbsolutePath());
AutoClosableProcess.runBlocking(
CommandLineWrapper.tar(kafkaArchive)
.extract()
.zipped()
.strip(1)
.targetDir(kafkaDir)
.build());
LOG.info("Updating ZooKeeper properties");
FileUtils.replace(
kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
ZK_DATA_DIR_PATTERN,
matcher ->
matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()));
LOG.info("Updating Kafka properties");
FileUtils.replace(
kafkaDir.resolve(Paths.get("config", "server.properties")),
KAFKA_LOG_DIR_PATTERN,
matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()));
}
private void setupKafkaCluster() throws IOException {
LOG.info("Starting zookeeper");
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "zookeeper-server-start.sh")).toString(),
"-daemon",
kafkaDir.resolve(Paths.get("config", "zookeeper.properties")).toString());
LOG.info("Starting kafka");
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "kafka-server-start.sh")).toString(),
"-daemon",
kafkaDir.resolve(Paths.get("config", "server.properties")).toString());
while (!isZookeeperRunning(kafkaDir)) {
try {
LOG.info("Waiting for ZooKeeper to start.");
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
while (!isKafkaRunning(kafkaDir)) {
try {
LOG.info("Waiting for Kafka to start.");
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
@Override
public void afterTestSuccess() {
shutdownResource();
downloadCache.afterTestSuccess();
tmp.delete();
}
@Override
public void afterTestFailure() {
shutdownResource();
backupLogs();
downloadCache.afterTestFailure();
tmp.delete();
}
private void shutdownResource() {
try {
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString());
while (isKafkaRunning(kafkaDir)) {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (IOException ioe) {
LOG.warn("Error while shutting down kafka.", ioe);
}
try {
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "zookeeper-server-stop.sh")).toString());
while (isZookeeperRunning(kafkaDir)) {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (IOException ioe) {
LOG.warn("Error while shutting down zookeeper.", ioe);
}
}
private void backupLogs() {
if (logBackupDirectory != null) {
final Path targetDirectory =
logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
try {
Files.createDirectories(targetDirectory);
TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory);
LOG.info("Backed up logs to {}.", targetDirectory);
} catch (IOException e) {
LOG.warn("An error has occurred while backing up logs to {}.", targetDirectory, e);
}
}
}
private static boolean isZookeeperRunning(final Path kafkaDir) {
try {
queryBrokerStatus(kafkaDir, line -> {});
return true;
} catch (final IOException ioe) {
// we get an exception if zookeeper isn't running
return false;
}
}
private static boolean isKafkaRunning(final Path kafkaDir) throws IOException {
try {
final AtomicBoolean atomicBrokerStarted = new AtomicBoolean(false);
queryBrokerStatus(
kafkaDir,
line -> {
atomicBrokerStarted.compareAndSet(false, line.contains("\"port\":"));
});
return atomicBrokerStarted.get();
} catch (final IOException ioe) {
// we get an exception if zookeeper isn't running
return false;
}
}
private static void queryBrokerStatus(
final Path kafkaDir, final Consumer<String> stderrProcessor) throws IOException {
AutoClosableProcess.create(
kafkaDir.resolve(Paths.get("bin", "zookeeper-shell.sh")).toString(),
ZOOKEEPER_ADDRESS,
"get",
"/brokers/ids/0")
.setStdoutProcessor(stderrProcessor)
.runBlocking();
}
@Override
public void createTopic(int replicationFactor, int numPartitions, String topic)
throws IOException {
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
"--create",
"--zookeeper",
ZOOKEEPER_ADDRESS,
"--replication-factor",
String.valueOf(replicationFactor),
"--partitions",
String.valueOf(numPartitions),
"--topic",
topic);
}
@Override
public void sendMessages(String topic, String... messages) throws IOException {
List<String> args = createSendMessageArguments(topic);
sendMessagesAndWait(args, messages);
}
@Override
public void sendKeyedMessages(String topic, String keySeparator, String... messages)
throws IOException {
List<String> args = new ArrayList<>(createSendMessageArguments(topic));
args.add("--property");
args.add("parse.key=true");
args.add("--property");
args.add("key.separator=" + keySeparator);
sendMessagesAndWait(args, messages);
}
private List<String> createSendMessageArguments(String topic) {
return Arrays.asList(
kafkaDir.resolve(Paths.get("bin", "kafka-console-producer.sh")).toString(),
"--broker-list",
KAFKA_ADDRESS,
"--topic",
topic);
}
private void sendMessagesAndWait(List<String> kafkaArgs, String... messages)
throws IOException {
try (AutoClosableProcess autoClosableProcess =
AutoClosableProcess.runNonBlocking(kafkaArgs.toArray(new String[0]))) {
try (PrintStream printStream =
new PrintStream(
autoClosableProcess.getProcess().getOutputStream(),
true,
StandardCharsets.UTF_8.name())) {
for (final String message : messages) {
printStream.println(message);
}
printStream.flush();
}
try {
// wait until the process shuts down on it's own
// this is the only reliable way to ensure the producer has actually processed our
// input
autoClosableProcess.getProcess().waitFor();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
public List<String> readMessage(int expectedNumMessages, String groupId, String topic)
throws IOException {
final List<String> messages =
Collections.synchronizedList(new ArrayList<>(expectedNumMessages));
try (final AutoClosableProcess kafka =
AutoClosableProcess.create(
kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh"))
.toString(),
"--bootstrap-server",
KAFKA_ADDRESS,
"--from-beginning",
"--max-messages",
String.valueOf(expectedNumMessages),
"--topic",
topic,
"--consumer-property",
"group.id=" + groupId)
.setStdoutProcessor(messages::add)
.runNonBlocking()) {
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) {
try {
LOG.info(
"Waiting for messages. Received {}/{}.",
messages.size(),
expectedNumMessages);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (messages.size() != expectedNumMessages) {
throw new IOException("Could not read expected number of messages.");
}
return messages;
}
}
@Override
public void setNumPartitions(int numPartitions, String topic) throws IOException {
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
"--alter",
"--topic",
topic,
"--partitions",
String.valueOf(numPartitions),
"--zookeeper",
ZOOKEEPER_ADDRESS);
}
@Override
public int getNumPartitions(String topic) throws IOException {
final Pattern partitionCountPattern = Pattern.compile(".*PartitionCount:\\s*([0-9]+).*");
final AtomicReference<Integer> partitionCountFound = new AtomicReference<>(-1);
AutoClosableProcess.create(
kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
"--describe",
"--topic",
topic,
"--zookeeper",
ZOOKEEPER_ADDRESS)
.setStdoutProcessor(
line -> {
final Matcher matcher = partitionCountPattern.matcher(line);
if (matcher.matches()) {
partitionCountFound.compareAndSet(
-1, Integer.parseInt(matcher.group(1)));
}
})
.runBlocking();
return partitionCountFound.get();
}
@Override
public long getPartitionOffset(String topic, int partition) throws IOException {
final Pattern partitionOffsetPattern = Pattern.compile(".*:.*:([0-9]+)");
final AtomicReference<Integer> partitionOffsetFound = new AtomicReference<>(-1);
AutoClosableProcess.create(
kafkaDir.resolve(Paths.get("bin", "kafka-run-class.sh")).toString(),
"kafka.tools.GetOffsetShell",
"--broker-list",
KAFKA_ADDRESS,
"--topic",
topic,
"--partitions",
String.valueOf(partition),
"--time",
"-1")
.setStdoutProcessor(
line -> {
final Matcher matcher = partitionOffsetPattern.matcher(line);
if (matcher.matches()) {
partitionOffsetFound.compareAndSet(
-1, Integer.parseInt(matcher.group(1)));
}
})
.runBlocking();
final int partitionOffset = partitionOffsetFound.get();
if (partitionOffset == -1) {
throw new IOException("Could not determine partition offset.");
}
return partitionOffset;
}
@Override
public Collection<InetSocketAddress> getBootstrapServerAddresses() {
return Collections.singletonList(
InetSocketAddress.createUnresolved(KAFKA_HOST, KAFKA_PORT));
}
@Override
public InetSocketAddress getZookeeperAddress() {
return InetSocketAddress.createUnresolved(KAFKA_HOST, KAFKA_PORT);
}
}