Merge branch 'STORM-2903-1.0.x-merge' into 1.0.x-branch
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e7f5156..e8ecb3e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -503,9 +503,11 @@
/*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
* to allow its offset to be commited to Kafka*/
LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
- msgId.setNullTuple(true);
- offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
- ack(msgId);
+ if (isAtLeastOnceProcessing()) {
+ msgId.setNullTuple(true);
+ offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
+ ack(msgId);
+ }
}
}
return false;
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
new file mode 100644
index 0000000..f2b2f98
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+
+public class NullRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+ @Override
+ public List<Object> apply(ConsumerRecord<K, V> record) {
+ return null;
+
+ }
+
+ @Override
+ public Fields getFieldsFor(String stream) {
+ return new Fields("topic", "key", "value");
+ }
+
+ @Override
+ public List<String> streams() {
+ return Collections.singletonList("default");
+ }
+}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 082cc58..ca16237 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,8 +21,10 @@
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
@@ -44,6 +46,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.NullRecordTranslator;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -64,7 +67,7 @@
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
+
private final TopologyContext contextMock = mock(TopologyContext.class);
private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
@@ -96,7 +99,7 @@
inOrder.verify(consumerMock).poll(anyLong());
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
-
+
CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
assertThat(committedOffsets.get(partition).offset(), is(0L));
@@ -191,7 +194,7 @@
.setTupleTrackingEnforced(true)
.build();
try (SimulatedTime time = new SimulatedTime()) {
- KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
@@ -204,13 +207,13 @@
assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
spout.ack(msgIdCaptor.getValue());
-
+
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
-
+
spout.nextTuple();
-
+
verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
@Override
public boolean matches(Object arg) {
@@ -228,27 +231,27 @@
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setTupleTrackingEnforced(true)
.build();
-
+
try (SimulatedTime time = new SimulatedTime()) {
- KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
spout.nextTuple();
-
+
when(consumerMock.position(partition)).thenReturn(1L);
ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
-
+
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+
spout.nextTuple();
-
+
verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
-
+
CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
assertThat(committedOffsets.get(partition).offset(), is(1L));
@@ -256,4 +259,32 @@
}
}
+ private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) {
+ //STORM-3059
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(processingGuaranteee)
+ .setTupleTrackingEnforced(true)
+ .setRecordTranslator(new NullRecordTranslator<String, String>())
+ .build();
+
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ verify(collectorMock, never()).emit(anyString(), anyList(), any());
+ }
+
+ @Test
+ public void testAtMostOnceModeCanFilterNullTuples() {
+ doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+ }
+
+ @Test
+ public void testNoGuaranteeModeCanFilterNullTuples() {
+ doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+ }
+
}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 159366b..54393f7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -18,13 +18,10 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Time;
import org.junit.Test;
-import java.util.List;
import java.util.regex.Pattern;
import static org.mockito.Matchers.any;
@@ -33,6 +30,8 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import org.apache.storm.kafka.NullRecordTranslator;
+
public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
public KafkaSpoutNullTupleTest() {
@@ -42,11 +41,10 @@
@Override
KafkaSpoutConfig<String, String> createSpoutConfig() {
-
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
- .setRecordTranslator(new NullRecordExtractor())
+ .setRecordTranslator(new NullRecordTranslator<String, String>())
.build();
}
@@ -72,32 +70,4 @@
verifyAllMessagesCommitted(messageCount);
}
- private class NullRecordExtractor implements RecordTranslator {
-
- @Override
- public List<Object> apply(ConsumerRecord record) {
- return null;
-
- }
-
- @Override
- public Fields getFieldsFor(String stream) {
- return new Fields("topic", "key", "value");
- }
-
- /**
- * @return the list of streams that this will handle.
- */
- @Override
- public List<String> streams() {
- return null;
- }
-
- @Override
- public Object apply(Object record) {
- return null;
- }
- }
-
-
}