METRON-197 METRON-197: Validation should be the last step in the ParserBolt closes apache/incubator-metron#143
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index 320d497..ad437d1 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -59,7 +59,7 @@
   }
 
   public void error(Exception e, Iterable<Tuple> tuples) {
-    tuples.forEach(t -> collector.fail(t));
+    tuples.forEach(t -> collector.ack(t));
     LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
     ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
   }
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 332ded0..04415e1 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -144,8 +144,7 @@
     for(int i = 0; i < 5; i++) {
       bulkMessageWriterBolt.execute(tuple);
     }
-    verify(outputCollector, times(0)).ack(tuple);
-    verify(outputCollector, times(5)).fail(tuple);
+    verify(outputCollector, times(5)).ack(tuple);
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e3b5fd0..46a49fc 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -45,10 +45,7 @@
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.function.Function;
 
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
@@ -140,35 +137,41 @@
     byte[] originalMessage = tuple.getBinary(0);
     SensorParserConfig sensorParserConfig = getSensorParserConfig();
     try {
-      boolean ackTuple = true;
+      //we want to ack the tuple in the situation where we have are not doing a bulk write
+      //otherwise we want to defer to the writerComponent who will ack on bulk commit.
+      boolean ackTuple = !isBulk;
+      int numWritten = 0;
       if(sensorParserConfig != null) {
         List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
-        List<JSONObject> messages = parser.parse(originalMessage);
-        for (JSONObject message : messages) {
-          if (parser.validate(message)) {
+        Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage);
+        for (JSONObject message : messages.orElse(Collections.emptyList())) {
+          if (parser.validate(message) && filter != null && filter.emitTuple(message)) {
+            message.put(Constants.SENSOR_TYPE, getSensorType());
+            for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+              if (handler != null) {
+                handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
+              }
+            }
             if(!isGloballyValid(message, fieldValidations)) {
               message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
               collector.emit(Constants.INVALID_STREAM, new Values(message));
             }
-            else if (filter != null && filter.emitTuple(message)) {
-              ackTuple = !isBulk;
-              message.put(Constants.SENSOR_TYPE, getSensorType());
-              for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
-                if (handler != null) {
-                  handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
-                }
-              }
+            else {
+              numWritten++;
               writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
             }
           }
         }
       }
-      if(ackTuple) {
+      //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
+      //(meaning that none of the messages are valid either globally or locally)
+      //then we want to handle the ack ourselves.
+      if(ackTuple || numWritten == 0) {
         collector.ack(tuple);
       }
     } catch (Throwable ex) {
       ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
-      collector.fail(tuple);
+      collector.ack(tuple);
     }
   }
 
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 439f06d..740b897 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -20,10 +20,15 @@
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public interface MessageParser<T> extends Configurable{
-	void init();
-	List<T> parse(byte[] rawMessage);
-	boolean validate(T message);
+  void init();
+  List<T> parse(byte[] rawMessage);
+  default Optional<List<T>> parseOptional(byte[] parseMessage) {
+    return Optional.of(parse(parseMessage));
+  }
+
+  boolean validate(T message);
 
 }
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 73aad23..bc0bd70 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -43,10 +43,7 @@
 import org.mockito.Mock;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -103,6 +100,7 @@
           }
         };
       }
+
     };
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
@@ -120,7 +118,7 @@
     final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
     final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\" }");
     when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parse(sampleBinary)).thenReturn(messages);
+    when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messages));
     when(parser.validate(eq(messages.get(0)))).thenReturn(true);
     when(parser.validate(eq(messages.get(1)))).thenReturn(false);
     parserBolt.execute(tuple);
@@ -166,7 +164,7 @@
   verify(parser, times(1)).init();
   verify(batchWriter, times(1)).init(any(), any());
   when(parser.validate(any())).thenReturn(true);
-  when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+  when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
   when(filter.emitTuple(any())).thenReturn(true);
   parserBolt.withMessageFilter(filter);
   parserBolt.execute(t1);
@@ -202,7 +200,7 @@
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     verify(outputCollector, times(1)).ack(t1);
@@ -236,7 +234,7 @@
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     when(filter.emitTuple(any())).thenReturn(true);
     parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
@@ -271,7 +269,7 @@
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     when(filter.emitTuple(any())).thenReturn(true);
     parserBolt.withMessageFilter(filter);
     writeNonBatch(outputCollector, parserBolt, t1);
@@ -320,46 +318,20 @@
     when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
     when(filter.emitTuple(any())).thenReturn(true);
     parserBolt.withMessageFilter(filter);
-    writeNonBatch(outputCollector, parserBolt, t1);
-    writeNonBatch(outputCollector, parserBolt, t2);
-    writeNonBatch(outputCollector, parserBolt, t3);
-    writeNonBatch(outputCollector, parserBolt, t4);
+    parserBolt.execute(t1);
+    parserBolt.execute(t2);
+    parserBolt.execute(t3);
+    parserBolt.execute(t4);
     parserBolt.execute(t5);
-    verify(outputCollector, times(0)).ack(t1);
-    verify(outputCollector, times(1)).fail(t1);
-    verify(outputCollector, times(0)).ack(t2);
-    verify(outputCollector, times(1)).fail(t2);
-    verify(outputCollector, times(0)).ack(t3);
-    verify(outputCollector, times(1)).fail(t3);
-    verify(outputCollector, times(0)).ack(t4);
-    verify(outputCollector, times(1)).fail(t4);
-    verify(outputCollector, times(0)).ack(t5);
-    verify(outputCollector, times(1)).fail(t5);
-
+    verify(outputCollector, times(1)).ack(t1);
+    verify(outputCollector, times(1)).ack(t2);
+    verify(outputCollector, times(1)).ack(t3);
+    verify(outputCollector, times(1)).ack(t4);
+    verify(outputCollector, times(1)).ack(t5);
 
   }
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);
-    verify(collector, times(0)).ack(t);
   }
 
-/*=======
-    verify(writer, times(1)).init();
-    byte[] sampleBinary = "some binary message".getBytes();
-    JSONParser jsonParser = new JSONParser();
-    final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
-    final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
-    List<JSONObject> messages = new ArrayList<JSONObject>() {{
-      add(sampleMessage1);
-      add(sampleMessage2);
-    }};
-    final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parse(sampleBinary)).thenReturn(messages);
-    when(parser.validate(any(JSONObject.class))).thenReturn(true);
-    parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
-    verify(outputCollector, times(1)).ack(tuple);
-  }
->>>>>>> master*/
 }