Merge branch 'STORM-2903-1.0.x-merge' into 1.0.x-branch
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e7f5156..e8ecb3e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -503,9 +503,11 @@
                 /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
                 * to allow its offset to be commited to Kafka*/
                 LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
-                msgId.setNullTuple(true);
-                offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
-                ack(msgId);
+                if (isAtLeastOnceProcessing()) {
+                    msgId.setNullTuple(true);
+                    offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
+                    ack(msgId);
+                }
             }
         }
         return false;
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
new file mode 100644
index 0000000..f2b2f98
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+
+public class NullRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return null;
+
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return new Fields("topic", "key", "value");
+    }
+
+    @Override
+    public List<String> streams() {
+        return Collections.singletonList("default");
+    }
+}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 082cc58..ca16237 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,8 +21,10 @@
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
@@ -44,6 +46,7 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.NullRecordTranslator;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -64,7 +67,7 @@
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
+    
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -96,7 +99,7 @@
         inOrder.verify(consumerMock).poll(anyLong());
         inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
-
+        
         CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
         assertThat(committedOffsets.get(partition).offset(), is(0L));
@@ -191,7 +194,7 @@
             .setTupleTrackingEnforced(true)
             .build();
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
@@ -204,13 +207,13 @@
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
 
             spout.ack(msgIdCaptor.getValue());
-
+            
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+            
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
-
+            
             spout.nextTuple();
-
+            
             verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
                 @Override
                 public boolean matches(Object arg) {
@@ -228,27 +231,27 @@
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-
+        
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
 
             spout.nextTuple();
-
+            
             when(consumerMock.position(partition)).thenReturn(1L);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
-
+            
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-
+            
             spout.nextTuple();
-
+            
             verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
-
+            
             CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
             Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
             assertThat(committedOffsets.get(partition).offset(), is(1L));
@@ -256,4 +259,32 @@
         }
     }
 
+    private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) {
+        //STORM-3059
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(processingGuaranteee)
+            .setTupleTrackingEnforced(true)
+            .setRecordTranslator(new NullRecordTranslator<String, String>())
+            .build();
+        
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+        
+        verify(collectorMock, never()).emit(anyString(), anyList(), any());
+    }
+    
+    @Test
+    public void testAtMostOnceModeCanFilterNullTuples() {
+        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+    }
+    
+    @Test
+    public void testNoGuaranteeModeCanFilterNullTuples() {
+        doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+    }
+
 }
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 159366b..54393f7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -18,13 +18,10 @@
 package org.apache.storm.kafka.spout;
 
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Time;
 import org.junit.Test;
 
-import java.util.List;
 import java.util.regex.Pattern;
 
 import static org.mockito.Matchers.any;
@@ -33,6 +30,8 @@
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
+import org.apache.storm.kafka.NullRecordTranslator;
+
 public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {
 
     public KafkaSpoutNullTupleTest() {
@@ -42,11 +41,10 @@
 
     @Override
     KafkaSpoutConfig<String, String> createSpoutConfig() {
-
         return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
                 Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-                .setRecordTranslator(new NullRecordExtractor())
+                .setRecordTranslator(new NullRecordTranslator<String, String>())
                 .build();
     }
 
@@ -72,32 +70,4 @@
         verifyAllMessagesCommitted(messageCount);
     }
 
-    private class NullRecordExtractor implements RecordTranslator {
-
-        @Override
-        public List<Object> apply(ConsumerRecord record) {
-            return null;
-
-        }
-
-        @Override
-        public Fields getFieldsFor(String stream) {
-            return new Fields("topic", "key", "value");
-        }
-
-        /**
-         * @return the list of streams that this will handle.
-         */
-        @Override
-        public List<String> streams() {
-            return null;
-        }
-
-        @Override
-        public Object apply(Object record) {
-            return null;
-        }
-    }
-
-
 }