blob: 47a9fc6d400ae49140c36b5e2e555f477a3ed645 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.storm;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
public class PulsarSpoutTest extends ProducerConsumerBase {
public final String serviceUrl = "http://127.0.0.1:" + BROKER_WEBSERVICE_PORT;
public final String topic = "persistent://my-property/use/my-ns/my-topic1";
public final String subscriptionName = "my-subscriber-name";
protected PulsarSpoutConfiguration pulsarSpoutConf;
protected ConsumerConfiguration consumerConf;
protected PulsarSpout spout;
protected MockSpoutOutputCollector mockCollector;
protected Producer producer;
@Override
@BeforeMethod
public void beforeMethod(Method m) throws Exception {
super.beforeMethod(m);
setup();
}
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
pulsarSpoutConf = new PulsarSpoutConfiguration();
pulsarSpoutConf.setServiceUrl(serviceUrl);
pulsarSpoutConf.setTopic(topic);
pulsarSpoutConf.setSubscriptionName(subscriptionName);
pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
pulsarSpoutConf.setMaxFailedRetries(2);
pulsarSpoutConf.setSharedConsumerEnabled(true);
pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
consumerConf = new ConsumerConfiguration();
consumerConf.setSubscriptionType(SubscriptionType.Shared);
spout = new PulsarSpout(pulsarSpoutConf, new ClientConfiguration(), consumerConf);
mockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(0);
spout.open(Maps.newHashMap(), context, collector);
producer = pulsarClient.createProducer(topic);
}
@AfterMethod
public void cleanup() throws Exception {
producer.close();
spout.close();
super.internalCleanup();
}
@SuppressWarnings("serial")
public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
@Override
public Values toValues(Message msg) {
if ("message to be dropped".equals(new String(msg.getData()))) {
return null;
}
return new Values(new String(msg.getData()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
};
@Test
public void testBasic() throws Exception {
String msgContent = "hello world";
producer.send(msgContent.getBytes());
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.ack(mockCollector.getLastMessage());
}
@Test
public void testRedeliverOnFail() throws Exception {
String msgContent = "hello world";
producer.send(msgContent.getBytes());
spout.nextTuple();
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
Thread.sleep(150);
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.ack(mockCollector.getLastMessage());
}
@Test
public void testNoRedeliverOnAck() throws Exception {
String msgContent = "hello world";
producer.send(msgContent.getBytes());
spout.nextTuple();
spout.ack(mockCollector.getLastMessage());
mockCollector.reset();
spout.nextTuple();
Assert.assertFalse(mockCollector.emitted());
Assert.assertNull(mockCollector.getTupleData());
}
@Test
public void testLimitedRedeliveriesOnTimeout() throws Exception {
String msgContent = "chuck norris";
producer.send(msgContent.getBytes());
long startTime = System.currentTimeMillis();
while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
.currentTimeMillis()) {
mockCollector.reset();
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.fail(mockCollector.getLastMessage());
// wait to avoid backoff
Thread.sleep(500);
}
spout.nextTuple();
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
Thread.sleep(500);
spout.nextTuple();
Assert.assertFalse(mockCollector.emitted());
Assert.assertNull(mockCollector.getTupleData());
}
@Test
public void testLimitedRedeliveriesOnCount() throws Exception {
String msgContent = "hello world";
producer.send(msgContent.getBytes());
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
Thread.sleep(150);
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
Thread.sleep(300);
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
Thread.sleep(500);
spout.nextTuple();
Assert.assertFalse(mockCollector.emitted());
Assert.assertNull(mockCollector.getTupleData());
}
@Test
public void testBackoffOnRetry() throws Exception {
String msgContent = "chuck norris";
producer.send(msgContent.getBytes());
spout.nextTuple();
spout.fail(mockCollector.getLastMessage());
mockCollector.reset();
// due to backoff we should not get the message again immediately
spout.nextTuple();
Assert.assertFalse(mockCollector.emitted());
Assert.assertNull(mockCollector.getTupleData());
Thread.sleep(100);
spout.nextTuple();
Assert.assertTrue(mockCollector.emitted());
Assert.assertTrue(msgContent.equals(mockCollector.getTupleData()));
spout.ack(mockCollector.getLastMessage());
}
@Test
public void testMessageDrop() throws Exception {
String msgContent = "message to be dropped";
producer.send(msgContent.getBytes());
spout.nextTuple();
Assert.assertFalse(mockCollector.emitted());
Assert.assertNull(mockCollector.getTupleData());
}
@SuppressWarnings({ "rawtypes" })
@Test
public void testMetrics() throws Exception {
spout.resetMetrics();
String msgContent = "hello world";
producer.send(msgContent.getBytes());
spout.nextTuple();
Map metrics = spout.getMetrics();
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
Assert.assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
Assert.assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
spout.fail(mockCollector.getLastMessage());
metrics = spout.getMetrics();
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
Thread.sleep(150);
spout.nextTuple();
metrics = spout.getMetrics();
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
spout.ack(mockCollector.getLastMessage());
metrics = (Map) spout.getValueAndReset();
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
}
@Test
public void testSharedConsumer() throws Exception {
PersistentTopicStats topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, new ClientConfiguration(), consumerConf);
MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(1);
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
otherSpout.close();
topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
}
@Test
public void testNoSharedConsumer() throws Exception {
PersistentTopicStats topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
pulsarSpoutConf.setSharedConsumerEnabled(false);
PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, new ClientConfiguration(), consumerConf);
MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(1);
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
otherSpout.close();
topicStats = admin.persistentTopics().getStats(topic);
Assert.assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
}
@Test
public void testSerializability() throws Exception {
// test serializability with no auth
PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, new ClientConfiguration());
TestUtil.testSerializability(spoutWithNoAuth);
}
}