METRON-197 METRON-197: Validation should be the last step in the ParserBolt closes apache/incubator-metron#143
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index 320d497..ad437d1 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -59,7 +59,7 @@
}
public void error(Exception e, Iterable<Tuple> tuples) {
- tuples.forEach(t -> collector.fail(t));
+ tuples.forEach(t -> collector.ack(t));
LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
}
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 332ded0..04415e1 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -144,8 +144,7 @@
for(int i = 0; i < 5; i++) {
bulkMessageWriterBolt.execute(tuple);
}
- verify(outputCollector, times(0)).ack(tuple);
- verify(outputCollector, times(5)).fail(tuple);
+ verify(outputCollector, times(5)).ack(tuple);
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
verify(outputCollector, times(1)).reportError(any(Throwable.class));
}
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e3b5fd0..46a49fc 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -45,10 +45,7 @@
import org.json.simple.JSONObject;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.function.Function;
public class ParserBolt extends ConfiguredParserBolt implements Serializable {
@@ -140,35 +137,41 @@
byte[] originalMessage = tuple.getBinary(0);
SensorParserConfig sensorParserConfig = getSensorParserConfig();
try {
- boolean ackTuple = true;
+ //we want to ack the tuple in the situation where we have are not doing a bulk write
+ //otherwise we want to defer to the writerComponent who will ack on bulk commit.
+ boolean ackTuple = !isBulk;
+ int numWritten = 0;
if(sensorParserConfig != null) {
List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
- List<JSONObject> messages = parser.parse(originalMessage);
- for (JSONObject message : messages) {
- if (parser.validate(message)) {
+ Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage);
+ for (JSONObject message : messages.orElse(Collections.emptyList())) {
+ if (parser.validate(message) && filter != null && filter.emitTuple(message)) {
+ message.put(Constants.SENSOR_TYPE, getSensorType());
+ for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+ if (handler != null) {
+ handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
+ }
+ }
if(!isGloballyValid(message, fieldValidations)) {
message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
collector.emit(Constants.INVALID_STREAM, new Values(message));
}
- else if (filter != null && filter.emitTuple(message)) {
- ackTuple = !isBulk;
- message.put(Constants.SENSOR_TYPE, getSensorType());
- for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
- if (handler != null) {
- handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
- }
- }
+ else {
+ numWritten++;
writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
}
}
}
}
- if(ackTuple) {
+ //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
+ //(meaning that none of the messages are valid either globally or locally)
+ //then we want to handle the ack ourselves.
+ if(ackTuple || numWritten == 0) {
collector.ack(tuple);
}
} catch (Throwable ex) {
ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
- collector.fail(tuple);
+ collector.ack(tuple);
}
}
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 439f06d..740b897 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -20,10 +20,15 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public interface MessageParser<T> extends Configurable{
- void init();
- List<T> parse(byte[] rawMessage);
- boolean validate(T message);
+ void init();
+ List<T> parse(byte[] rawMessage);
+ default Optional<List<T>> parseOptional(byte[] parseMessage) {
+ return Optional.of(parse(parseMessage));
+ }
+
+ boolean validate(T message);
}
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 73aad23..bc0bd70 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -43,10 +43,7 @@
import org.mockito.Mock;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -103,6 +100,7 @@
}
};
}
+
};
parserBolt.setCuratorFramework(client);
parserBolt.setTreeCache(cache);
@@ -120,7 +118,7 @@
final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\" }");
when(tuple.getBinary(0)).thenReturn(sampleBinary);
- when(parser.parse(sampleBinary)).thenReturn(messages);
+ when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messages));
when(parser.validate(eq(messages.get(0)))).thenReturn(true);
when(parser.validate(eq(messages.get(1)))).thenReturn(false);
parserBolt.execute(tuple);
@@ -166,7 +164,7 @@
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+ when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
when(filter.emitTuple(any())).thenReturn(true);
parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
@@ -202,7 +200,7 @@
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+ when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
verify(outputCollector, times(1)).ack(t1);
@@ -236,7 +234,7 @@
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+ when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
when(filter.emitTuple(any())).thenReturn(true);
parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
@@ -271,7 +269,7 @@
verify(parser, times(1)).init();
verify(batchWriter, times(1)).init(any(), any());
when(parser.validate(any())).thenReturn(true);
- when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+ when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
when(filter.emitTuple(any())).thenReturn(true);
parserBolt.withMessageFilter(filter);
writeNonBatch(outputCollector, parserBolt, t1);
@@ -320,46 +318,20 @@
when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
when(filter.emitTuple(any())).thenReturn(true);
parserBolt.withMessageFilter(filter);
- writeNonBatch(outputCollector, parserBolt, t1);
- writeNonBatch(outputCollector, parserBolt, t2);
- writeNonBatch(outputCollector, parserBolt, t3);
- writeNonBatch(outputCollector, parserBolt, t4);
+ parserBolt.execute(t1);
+ parserBolt.execute(t2);
+ parserBolt.execute(t3);
+ parserBolt.execute(t4);
parserBolt.execute(t5);
- verify(outputCollector, times(0)).ack(t1);
- verify(outputCollector, times(1)).fail(t1);
- verify(outputCollector, times(0)).ack(t2);
- verify(outputCollector, times(1)).fail(t2);
- verify(outputCollector, times(0)).ack(t3);
- verify(outputCollector, times(1)).fail(t3);
- verify(outputCollector, times(0)).ack(t4);
- verify(outputCollector, times(1)).fail(t4);
- verify(outputCollector, times(0)).ack(t5);
- verify(outputCollector, times(1)).fail(t5);
-
+ verify(outputCollector, times(1)).ack(t1);
+ verify(outputCollector, times(1)).ack(t2);
+ verify(outputCollector, times(1)).ack(t3);
+ verify(outputCollector, times(1)).ack(t4);
+ verify(outputCollector, times(1)).ack(t5);
}
private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
bolt.execute(t);
- verify(collector, times(0)).ack(t);
}
-/*=======
- verify(writer, times(1)).init();
- byte[] sampleBinary = "some binary message".getBytes();
- JSONParser jsonParser = new JSONParser();
- final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
- final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
- List<JSONObject> messages = new ArrayList<JSONObject>() {{
- add(sampleMessage1);
- add(sampleMessage2);
- }};
- final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
- when(tuple.getBinary(0)).thenReturn(sampleBinary);
- when(parser.parse(sampleBinary)).thenReturn(messages);
- when(parser.validate(any(JSONObject.class))).thenReturn(true);
- parserBolt.execute(tuple);
- verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
- verify(outputCollector, times(1)).ack(tuple);
- }
->>>>>>> master*/
}