NIFI-9203 Improve GrokReader to be able to handle complex grok expression properly.
This closes #5376.
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index bae70fa..1553926 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -153,6 +153,7 @@
<exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
<exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
+ <exclude>src/test/resources/grok/grok_patterns.txt</exclude>
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
<exclude>src/test/resources/json/bank-account-array.json</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 07383f5..e7459d7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -55,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -191,7 +192,7 @@
}
static RecordSchema createRecordSchema(final Grok grok) {
- final List<RecordField> fields = new ArrayList<>();
+ final Set<RecordField> fields = new LinkedHashSet<>();
String grokExpression = grok.getOriginalGrokPattern();
populateSchemaFieldNames(grok, grokExpression, fields);
@@ -199,11 +200,12 @@
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
- final RecordSchema schema = new SimpleRecordSchema(fields);
+ final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fields));
+
return schema;
}
- private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) {
+ private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final Collection<RecordField> fields) {
final Set<String> namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern());
while (grokExpression.length() > 0) {
final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index ebcefe5..48317c9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -24,8 +24,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -142,12 +144,19 @@
final Object normalizedValue;
if (rawValue instanceof List) {
final List<?> list = (List<?>) rawValue;
- final String[] array = new String[list.size()];
- for (int i = 0; i < list.size(); i++) {
- final Object rawObject = list.get(i);
- array[i] = rawObject == null ? null : rawObject.toString();
+ List<?> nonNullElements = list.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ if (nonNullElements.size() == 0) {
+ normalizedValue = null;
+ } else if (nonNullElements.size() == 1) {
+ normalizedValue = nonNullElements.get(0).toString();
+ } else {
+ final String[] array = new String[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ final Object rawObject = list.get(i);
+ array[i] = rawObject == null ? null : rawObject.toString();
+ }
+ normalizedValue = array;
}
- normalizedValue = array;
} else {
normalizedValue = rawValue == null ? null : rawValue.toString();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
new file mode 100644
index 0000000..4c29c1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.grok;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+public class TestGrokReader {
+ private TestRunner runner;
+ private List<Record> records;
+
+ private static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
+ .name("reader")
+ .identifiesControllerService(GrokReader.class)
+ .build();
+
+ @BeforeEach
+ void setUp() {
+ Processor processor = new AbstractProcessor() {
+ Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .build();
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
+
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ records.add(record);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ session.transfer(flowFile, SUCCESS);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(READER);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return new HashSet<>(Arrays.asList(SUCCESS));
+ }
+ };
+
+ runner = TestRunners.newTestRunner(processor);
+
+ records = new ArrayList<>();
+ }
+
+ @Test
+ void testComplexGrokExpression() throws Exception {
+ // GIVEN
+ String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" + System.lineSeparator()
+ + "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + System.lineSeparator();
+
+ String grokPatternFile = "src/test/resources/grok/grok_patterns.txt";
+ String grokExpression = "%{LINE}";
+
+ SimpleRecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
+ new RecordField("facility", RecordFieldType.STRING.getDataType()),
+ new RecordField("priority", RecordFieldType.STRING.getDataType()),
+ new RecordField("logsource", RecordFieldType.STRING.getDataType()),
+ new RecordField("program", RecordFieldType.STRING.getDataType()),
+ new RecordField("pid", RecordFieldType.STRING.getDataType()),
+ new RecordField("message", RecordFieldType.STRING.getDataType()),
+ new RecordField("stackTrace", RecordFieldType.STRING.getDataType()),
+ new RecordField("_raw", RecordFieldType.STRING.getDataType())
+ ));
+
+ List<Record> expectedRecords = Arrays.asList(
+ new MapRecord(expectedSchema, new HashMap<String, Object>() {{
+ put("timestamp", "1021-09-09 09:03:06");
+ put("facility", null);
+ put("priority", null);
+ put("logsource", "127.0.0.1");
+ put("program", "nifi");
+ put("pid", "1000");
+ put("message", " LogMessage");
+ put("stackstrace", null);
+ put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
+ }}),
+ new MapRecord(expectedSchema, new HashMap<String, Object>() {{
+ put("timestamp", "October 19 19:13:16");
+ put("facility", null);
+ put("priority", null);
+ put("logsource", "127.0.0.1");
+ put("program", "nifi");
+ put("pid", "1000");
+ put("message", " LogMessage2");
+ put("stackstrace", null);
+ put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
+ }})
+ );
+
+ // WHEN
+ GrokReader grokReader = new GrokReader();
+
+ runner.addControllerService("grokReader", grokReader);
+ runner.setProperty(READER, "grokReader");
+
+ runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
+ runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
+
+ runner.enableControllerService(grokReader);
+
+ runner.enqueue(input);
+ runner.run();
+
+ // THEN
+ List<Function<Record, Object>> propertyProviders = Arrays.asList(
+ Record::getSchema,
+ Record::getValues
+ );
+
+ List<EqualsWrapper<Record>> wrappedExpected = EqualsWrapper.wrapList(expectedRecords, propertyProviders);
+ List<EqualsWrapper<Record>> wrappedActual = EqualsWrapper.wrapList(records, propertyProviders);
+
+ Assertions.assertEquals(wrappedExpected, wrappedActual);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt
new file mode 100644
index 0000000..1d4642e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt
@@ -0,0 +1,4 @@
+SYSLOGBASE_ISO8601 %{TIMESTAMP_ISO8601:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
+LINE_1 %{SYSLOGBASE}%{GREEDYDATA:message}
+LINE_2 %{SYSLOGBASE_ISO8601}%{GREEDYDATA:message}
+LINE (?:%{LINE_1}|%{LINE_2})