blob: e7e7e7ca24665fc28eb2e24253ca571c2d08abbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.kafka.client;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Futures;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.Services;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class KafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
private static InMemoryZKServer zkServer;
private static EmbeddedKafkaServer kafkaServer;
private static ZKClientService zkClientService;
private static KafkaClientService kafkaClient;
@BeforeClass
public static void init() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
zkServer.startAndWait();
// Extract the kafka.tgz and start the kafka server
kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr()));
kafkaServer.startAndWait();
zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
kafkaClient = new ZKKafkaClientService(zkClientService);
Services.chainStart(zkClientService, kafkaClient).get();
}
@AfterClass
public static void finish() throws Exception {
Services.chainStop(kafkaClient, zkClientService).get();
kafkaServer.stopAndWait();
zkServer.stopAndWait();
}
@Test
public void testKafkaClientReconnect() throws Exception {
String topic = "backoff";
Properties kafkaServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig);
ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
zkClient.startAndWait();
try {
zkClient.create("/", null, CreateMode.PERSISTENT).get();
ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
kafkaClient.startAndWait();
try {
server.startAndWait();
try {
// Publish a messages
createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
// Create a consumer
final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
.consume(new KafkaConsumer.MessageCallback() {
@Override
public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
queue.offer(Charsets.UTF_8.decode(message.getPayload()).toString());
}
return nextOffset;
}
@Override
public void finished() {
}
});
// Wait for the first message
Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
// Shutdown the server
server.stopAndWait();
// Start the server again.
// Needs to create a new instance with the same config since guava service cannot be restarted
server = new EmbeddedKafkaServer(kafkaServerConfig);
server.startAndWait();
// Wait a little while to make sure changes is reflected in broker service
TimeUnit.SECONDS.sleep(3);
// Publish another message
createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
// Should be able to get the second message
Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS));
cancel.cancel();
} finally {
kafkaClient.stopAndWait();
}
} finally {
server.stopAndWait();
}
} finally {
zkClient.stopAndWait();
}
}
@Test
public void testKafkaClient() throws Exception {
String topic = "testClient";
Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
t1.start();
t2.start();
Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
t2.join();
t3.start();
final CountDownLatch latch = new CountDownLatch(30);
final CountDownLatch stopLatch = new CountDownLatch(1);
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
.MessageCallback() {
@Override
public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
latch.countDown();
}
return nextOffset;
}
@Override
public void finished() {
stopLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
cancel.cancel();
Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testKafkaClientSkipNext() throws Exception {
String topic = "testClientSkipNext";
// Publish 30 messages with indecies the same as offsets within the range 0 - 29
Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
t1.start();
t1.join();
Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
t2.start();
t2.join();
Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
t3.start();
t3.join();
final CountDownLatch stopLatch = new CountDownLatch(1);
final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(
new KafkaConsumer.MessageCallback() {
@Override
public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
if (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset() + 1;
offsetQueue.offer(message.getOffset());
LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
return nextOffset;
}
return nextOffset;
}
@Override
public void finished() {
stopLatch.countDown();
}
});
// 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read
for (long i = 0; i < 30; i += 2) {
Assert.assertEquals(i, (long) offsetQueue.poll(60, TimeUnit.SECONDS));
}
Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS));
cancel.cancel();
Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testBrokerChange() throws Exception {
// Create a new namespace in ZK for Kafka server for this test case
String connectionStr = zkServer.getConnectionStr() + "/broker_change";
ZKClientService zkClient = ZKClientService.Builder.of(connectionStr).build();
zkClient.startAndWait();
zkClient.create("/", null, CreateMode.PERSISTENT).get();
// Start a new kafka server
File logDir = TMP_FOLDER.newFolder();
EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
server.startAndWait();
// Start a Kafka client
KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
kafkaClient.startAndWait();
// Attach a consumer
final BlockingQueue<String> consumedMessages = Queues.newLinkedBlockingQueue();
kafkaClient.getConsumer()
.prepare().addFromBeginning("test", 0).consume(new KafkaConsumer.MessageCallback() {
@Override
public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString());
}
return nextOffset;
}
@Override
public void finished() {
// No-op
}
});
// Get a publisher and publish a message
KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.FIRE_AND_FORGET, Compression.NONE);
publisher.prepare("test").add(Charsets.UTF_8.encode("Message 0"), 0).send().get();
// Should receive one message
Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS));
// Now shutdown and restart the server on different port
server.stopAndWait();
server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
server.startAndWait();
// Wait a little while to make sure changes is reflected in broker service
TimeUnit.SECONDS.sleep(3);
// Now publish again with the same publisher. It should succeed and the consumer should receive the message.
publisher.prepare("test").add(Charsets.UTF_8.encode("Message 1"), 0).send().get();
Assert.assertEquals("Message 1", consumedMessages.poll(5, TimeUnit.SECONDS));
kafkaClient.stopAndWait();
zkClient.stopAndWait();
server.stopAndWait();
}
private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
final Compression compression, final String message, final int count) {
return createPublishThread(kafkaClient, topic, compression, message, count, 0);
}
private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
final String message, final int count, final int base) {
return new Thread(() -> {
KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
KafkaPublisher.Preparer preparer = publisher.prepare(topic);
for (int i = 0; i < count; i++) {
preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
}
Futures.getUnchecked(preparer.send());
});
}
private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
return generateKafkaConfig(zkConnectStr, TMP_FOLDER.newFolder());
}
private static Properties generateKafkaConfig(String zkConnectStr, File logDir) throws IOException {
int port = Networks.getRandomPort();
Preconditions.checkState(port > 0, "Failed to get random port.");
Properties prop = new Properties();
prop.setProperty("log.dir", logDir.getAbsolutePath());
prop.setProperty("port", Integer.toString(port));
prop.setProperty("broker.id", "1");
prop.setProperty("socket.send.buffer.bytes", "1048576");
prop.setProperty("socket.receive.buffer.bytes", "1048576");
prop.setProperty("socket.request.max.bytes", "104857600");
prop.setProperty("num.partitions", "1");
prop.setProperty("log.retention.hours", "1");
prop.setProperty("log.flush.interval.messages", "10000");
prop.setProperty("log.flush.interval.ms", "1000");
prop.setProperty("log.segment.bytes", "536870912");
prop.setProperty("message.send.max.retries", "10");
prop.setProperty("zookeeper.connect", zkConnectStr);
prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
prop.setProperty("default.replication.factor", "1");
// Use a really small file size to force some flush to happen
prop.setProperty("log.file.size", "1024");
prop.setProperty("log.default.flush.interval.ms", "1000");
return prop;
}
}