blob: eb694bb9a811f99744d1e447330641cafddde1a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.kafka;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
public class KafkaUtilsTest {
private String TEST_TOPIC = "testTopic";
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class);
private KafkaTestBroker broker;
private SimpleConsumer simpleConsumer;
private KafkaConfig config;
private BrokerHosts brokerHosts;
@Before
public void setup() {
broker = new KafkaTestBroker();
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
brokerHosts = new StaticHosts(globalPartitionInformation);
config = new KafkaConfig(brokerHosts, TEST_TOPIC);
simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
}
@After
public void shutdown() {
simpleConsumer.close();
broker.shutdown();
}
@Test(expected = FailedFetchException.class)
public void topicDoesNotExist() throws Exception {
KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), 0);
}
@Test(expected = FailedFetchException.class)
public void brokerIsDown() throws Exception {
int port = broker.getPort();
broker.shutdown();
SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient");
try {
KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), OffsetRequest.LatestTime());
} finally {
simpleConsumer.close();
}
}
@Test
public void fetchMessage() throws Exception {
String value = "test";
createTopicAndSendMessage(value);
long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer,
new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offset);
String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
assertThat(message, is(equalTo(value)));
}
@Test(expected = FailedFetchException.class)
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
config.useStartOffsetTimeIfOffsetOutOfRange = false;
KafkaUtils.fetchMessages(config, simpleConsumer,
new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99);
}
@Test(expected = TopicOffsetOutOfRangeException.class)
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
config = new KafkaConfig(brokerHosts, "newTopic");
String value = "test";
createTopicAndSendMessage(value);
KafkaUtils.fetchMessages(config, simpleConsumer,
new Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 0), -99);
}
@Test
public void getOffsetFromConfigAndDontForceFromStart() {
config.ignoreZkOffsets = false;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(latestOffset, is(equalTo(offsetFromConfig)));
}
@Test
public void getOffsetFromConfigAndFroceFromStart() {
config.ignoreZkOffsets = true;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
}
@Test
public void generateTuplesWithoutKeyAndKeyValueScheme() {
config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
runGetValueOnlyTuplesTest();
}
@Test
public void generateTuplesWithKeyAndKeyValueScheme() {
config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
config.useStartOffsetTimeIfOffsetOutOfRange = false;
String value = "value";
String key = "key";
createTopicAndSendMessage(key, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic);
assertEquals(ImmutableMap.of(key, value), lists.iterator().next().get(0));
}
}
@Test
public void generateTupelsWithValueScheme() {
config.scheme = new SchemeAsMultiScheme(new StringScheme());
runGetValueOnlyTuplesTest();
}
@Test
public void generateTuplesWithValueAndStringMultiSchemeWithTopic() {
config.scheme = new StringMultiSchemeWithTopic();
String value = "value";
createTopicAndSendMessage(value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic);
List<Object> list = lists.iterator().next();
assertEquals(value, list.get(0));
assertEquals(config.topic, list.get(1));
}
}
@Test
public void generateTuplesWithValueSchemeAndKeyValueMessage() {
config.scheme = new SchemeAsMultiScheme(new StringScheme());
String value = "value";
String key = "key";
createTopicAndSendMessage(key, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic);
assertEquals(value, lists.iterator().next().get(0));
}
}
@Test
public void generateTuplesWithMessageAndMetadataScheme() {
String value = "value";
Partition mockPartition = Mockito.mock(Partition.class);
mockPartition.partition = 0;
long offset = 0L;
MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
createTopicAndSendMessage(null, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset);
List<Object> values = lists.iterator().next();
assertEquals("Message is incorrect", value, values.get(0));
assertEquals("Partition is incorrect", mockPartition.partition, values.get(1));
assertEquals("Offset is incorrect", offset, values.get(2));
}
}
private ByteBufferMessageSet getLastMessage() {
long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offsetOfLastMessage);
}
private void runGetValueOnlyTuplesTest() {
String value = "value";
createTopicAndSendMessage(null, value);
ByteBufferMessageSet messageAndOffsets = getLastMessage();
for (MessageAndOffset msg : messageAndOffsets) {
Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic);
assertEquals(value, lists.iterator().next().get(0));
}
}
private void createTopicAndSendMessage() {
createTopicAndSendMessage(null, "someValue");
}
private void createTopicAndSendMessage(String value) {
createTopicAndSendMessage(null, value);
}
private void createTopicAndSendMessage(String key, String value) {
Properties p = new Properties();
p.put("acks", "1");
p.put("bootstrap.servers", broker.getBrokerConnectionString());
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("metadata.fetch.timeout.ms", 1000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
try {
producer.send(new ProducerRecord<String, String>(config.topic, key, value)).get();
} catch (Exception e) {
Assert.fail(e.getMessage());
LOG.error("Failed to do synchronous sending due to " + e, e);
} finally {
producer.close();
}
}
@Test
public void assignOnePartitionPerTask() {
runPartitionToTaskMappingTest(16, 1);
}
@Test
public void assignTwoPartitionsPerTask() {
runPartitionToTaskMappingTest(16, 2);
}
@Test
public void assignAllPartitionsToOneTask() {
runPartitionToTaskMappingTest(32, 32);
}
public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) {
GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
int numTasks = numPartitions / partitionsPerTask;
for (int i = 0 ; i < numTasks ; i++) {
assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
}
}
@Test
public void moreTasksThanPartitions() {
GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(1);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
int numTasks = 2;
assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
}
@Test (expected = IllegalArgumentException.class )
public void assignInvalidTask() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
}
}