blob: 9cbdee9924ffdf3db09c5ff5f0c98556d0a83f32 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.kafka.bolt;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import storm.kafka.*;
import storm.kafka.trident.GlobalPartitionInformation;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class KafkaBoltTest {
private static final String TEST_TOPIC = "test-topic";
private KafkaTestBroker broker;
private KafkaBolt bolt;
private Config config = new Config();
private KafkaConfig kafkaConfig;
private SimpleConsumer simpleConsumer;
@Mock
private IOutputCollector collector;
@Before
public void initMocks() {
MockitoAnnotations.initMocks(this);
broker = new KafkaTestBroker();
setupKafkaConsumer();
config.put(KafkaBolt.TOPIC, TEST_TOPIC);
bolt = generateStringSerializerBolt();
}
@After
public void shutdown() {
simpleConsumer.close();
broker.shutdown();
bolt.cleanup();
}
private void setupKafkaConsumer() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC);
simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
}
@Test
public void shouldAcknowledgeTickTuples() throws Exception {
// Given
Tuple tickTuple = mockTickTuple();
// When
bolt.execute(tickTuple);
// Then
verify(collector).ack(tickTuple);
}
@Test
public void executeWithKey() throws Exception {
String message = "value-123";
String key = "key-123";
Tuple tuple = generateTestTuple(key, message);
bolt.execute(tuple);
verify(collector).ack(tuple);
verifyMessage(key, message);
}
/* test synchronous sending */
@Test
public void executeWithByteArrayKeyAndMessageSync() {
boolean async = false;
boolean fireAndForget = false;
bolt = generateDefaultSerializerBolt(async, fireAndForget);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
Tuple tuple = generateTestTuple(key, message);
bolt.execute(tuple);
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
/* test asynchronous sending (default) */
@Test
public void executeWithByteArrayKeyAndMessageAsync() {
boolean async = true;
boolean fireAndForget = false;
bolt = generateDefaultSerializerBolt(async, fireAndForget);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
Tuple tuple = generateTestTuple(key, message);
bolt.execute(tuple);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
/* test with fireAndForget option enabled */
@Test
public void executeWithByteArrayKeyAndMessageFire() {
boolean async = true;
boolean fireAndForget = true;
bolt = generateDefaultSerializerBolt(async, fireAndForget);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
Tuple tuple = generateTestTuple(key, message);
bolt.execute(tuple);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
/* test bolt specified properties */
@Test
public void executeWithBoltSpecifiedProperties() {
boolean async = false;
boolean fireAndForget = false;
bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
Tuple tuple = generateTestTuple(key, message);
bolt.execute(tuple);
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
private KafkaBolt generateStringSerializerBolt() {
Properties props = new Properties();
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("metadata.fetch.timeout.ms", 1000);
KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(false);
return bolt;
}
private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
Properties props = new Properties();
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("metadata.fetch.timeout.ms", 1000);
props.put("linger.ms", 0);
KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);
return bolt;
}
private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) {
Properties props = new Properties();
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("metadata.fetch.timeout.ms", 1000);
props.put("linger.ms", 0);
KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);
return bolt;
}
@Test
public void executeWithoutKey() throws Exception {
String message = "value-234";
Tuple tuple = generateTestTuple(message);
bolt.execute(tuple);
verify(collector).ack(tuple);
verifyMessage(null, message);
}
@Test
public void executeWithBrokerDown() throws Exception {
broker.shutdown();
String message = "value-234";
Tuple tuple = generateTestTuple(message);
bolt.execute(tuple);
verify(collector).fail(tuple);
}
private boolean verifyMessage(String key, String message) {
long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1;
ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), lastMessageOffset);
MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next();
Message kafkaMessage = messageAndOffset.message();
ByteBuffer messageKeyBuffer = kafkaMessage.key();
String keyString = null;
String messageString = new String(Utils.toByteArray(kafkaMessage.payload()));
if (messageKeyBuffer != null) {
keyString = new String(Utils.toByteArray(messageKeyBuffer));
}
assertEquals(key, keyString);
assertEquals(message, messageString);
return true;
}
private Tuple generateTestTuple(Object key, Object message) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("key", "message");
}
};
return new TupleImpl(topologyContext, new Values(key, message), 1, "");
}
private Tuple generateTestTuple(Object message) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("message");
}
};
return new TupleImpl(topologyContext, new Values(message), 1, "");
}
private Tuple mockTickTuple() {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
// Sanity check
assertTrue(TupleUtils.isTick(tuple));
return tuple;
}
}