NIFI-9329 - Expose event validation in ParseCEF processor (#5477)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
index 002c048..4d8c7f6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
@@ -144,6 +144,19 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(true)
.defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor VALIDATE_DATA = new PropertyDescriptor.Builder()
+ .name("VALIDATE_DATA")
+ .displayName("Validate the CEF event")
+ .description("If set to true, the event will be validated against the CEF standard (revision 23). If the event is invalid, the "
+ + "FlowFile will be routed to the failure relationship. If this property is set to false, the event will be processed "
+ + "without validating the data.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
.build();
public static final String UTC = "UTC";
@@ -187,6 +200,7 @@
properties.add(FIELDS_DESTINATION);
properties.add(APPEND_RAW_MESSAGE_TO_JSON);
properties.add(INCLUDE_CUSTOM_EXTENSIONS);
+ properties.add(VALIDATE_DATA);
properties.add(TIME_REPRESENTATION);
properties.add(DATETIME_REPRESENTATION);
return properties;
@@ -247,7 +261,8 @@
// parcefoneLocale defaults to en_US, so this should not fail. But we force failure in case the custom
// validator failed to identify an invalid Locale
final Locale parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).getValue());
- event = parser.parse(buffer, true, parcefoneLocale);
+ final boolean validateData = context.getProperty(VALIDATE_DATA).asBoolean();
+ event = parser.parse(buffer, validateData, parcefoneLocale);
} catch (Exception e) {
// This should never trigger but adding in here as a fencing mechanism to
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
index 009c4aa..94c61ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
@@ -342,5 +342,34 @@
Assert.assertEquals(200, extension.get("http_response").asInt());
}
+ @Test
+ public void testDataValidation() throws Exception {
+ String invalidEvent = sample1 + " proto=ICMP"; // according to the standard, proto can be either tcp or udp.
+
+ final TestRunner runner = TestRunners.newTestRunner(new ParseCEF());
+ runner.setProperty(ParseCEF.FIELDS_DESTINATION, ParseCEF.DESTINATION_CONTENT);
+ runner.setProperty(ParseCEF.TIME_REPRESENTATION, ParseCEF.UTC);
+ runner.enqueue(invalidEvent.getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ParseCEF.REL_FAILURE, 1);
+
+ runner.clearTransferState();
+ runner.setProperty(ParseCEF.VALIDATE_DATA, "false");
+ runner.enqueue(invalidEvent.getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ParseCEF.REL_SUCCESS, 1);
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseCEF.REL_SUCCESS).get(0);
+
+ byte [] rawJson = mff.toByteArray();
+
+ JsonNode results = new ObjectMapper().readTree(rawJson);
+
+ JsonNode extension = results.get("extension");
+ Assert.assertEquals("ICMP", extension.get("proto").asText());
+ }
+
}