NIFI-9329 - Expose event validation in ParseCEF processor (#5477)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
index 002c048..4d8c7f6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseCEF.java
@@ -144,6 +144,19 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .required(true)
             .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor VALIDATE_DATA = new PropertyDescriptor.Builder()
+            .name("VALIDATE_DATA")
+            .displayName("Validate the CEF event")
+            .description("If set to true, the event will be validated against the CEF standard (revision 23). If the event is invalid, the "
+                    + "FlowFile will be routed to the failure relationship. If this property is set to false, the event will be processed "
+                    + "without validating the data.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
             .build();
 
     public static final String UTC = "UTC";
@@ -187,6 +200,7 @@
         properties.add(FIELDS_DESTINATION);
         properties.add(APPEND_RAW_MESSAGE_TO_JSON);
         properties.add(INCLUDE_CUSTOM_EXTENSIONS);
+        properties.add(VALIDATE_DATA);
         properties.add(TIME_REPRESENTATION);
         properties.add(DATETIME_REPRESENTATION);
         return properties;
@@ -247,7 +261,8 @@
             // parcefoneLocale defaults to en_US, so this should not fail. But we force failure in case the custom
             // validator failed to identify an invalid Locale
             final Locale parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).getValue());
-            event = parser.parse(buffer, true, parcefoneLocale);
+            final boolean validateData = context.getProperty(VALIDATE_DATA).asBoolean();
+            event = parser.parse(buffer, validateData, parcefoneLocale);
 
         } catch (Exception e) {
             // This should never trigger but adding in here as a fencing mechanism to
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
index 009c4aa..94c61ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
@@ -342,5 +342,34 @@
         Assert.assertEquals(200, extension.get("http_response").asInt());
     }
 
+    @Test
+    public void testDataValidation() throws Exception {
+        String invalidEvent = sample1 + " proto=ICMP"; // according to the standard, proto can be either tcp or udp.
+
+        final TestRunner runner = TestRunners.newTestRunner(new ParseCEF());
+        runner.setProperty(ParseCEF.FIELDS_DESTINATION, ParseCEF.DESTINATION_CONTENT);
+        runner.setProperty(ParseCEF.TIME_REPRESENTATION, ParseCEF.UTC);
+        runner.enqueue(invalidEvent.getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ParseCEF.REL_FAILURE, 1);
+
+        runner.clearTransferState();
+        runner.setProperty(ParseCEF.VALIDATE_DATA, "false");
+        runner.enqueue(invalidEvent.getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ParseCEF.REL_SUCCESS, 1);
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseCEF.REL_SUCCESS).get(0);
+
+        byte [] rawJson = mff.toByteArray();
+
+        JsonNode results = new ObjectMapper().readTree(rawJson);
+
+        JsonNode extension = results.get("extension");
+        Assert.assertEquals("ICMP", extension.get("proto").asText());
+    }
+
 }