NIFI-9203 Improve GrokReader to be able to handle complex grok expression properly.

This closes #5376.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index bae70fa..1553926 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -153,6 +153,7 @@
                         <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
                         <exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
                         <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
+                        <exclude>src/test/resources/grok/grok_patterns.txt</exclude>
                         <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
                         <exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
                         <exclude>src/test/resources/json/bank-account-array.json</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 07383f5..e7459d7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -55,6 +55,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -191,7 +192,7 @@
     }
 
     static RecordSchema createRecordSchema(final Grok grok) {
-        final List<RecordField> fields = new ArrayList<>();
+        final Set<RecordField> fields = new LinkedHashSet<>();
 
         String grokExpression = grok.getOriginalGrokPattern();
         populateSchemaFieldNames(grok, grokExpression, fields);
@@ -199,11 +200,12 @@
         fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true));
         fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
 
-        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fields));
+
         return schema;
     }
 
-    private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) {
+    private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final Collection<RecordField> fields) {
         final Set<String> namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern());
         while (grokExpression.length() > 0) {
             final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index ebcefe5..48317c9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -24,8 +24,10 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -142,12 +144,19 @@
                 final Object normalizedValue;
                 if (rawValue instanceof List) {
                     final List<?> list = (List<?>) rawValue;
-                    final String[] array = new String[list.size()];
-                    for (int i = 0; i < list.size(); i++) {
-                        final Object rawObject = list.get(i);
-                        array[i] = rawObject == null ? null : rawObject.toString();
+                    List<?> nonNullElements = list.stream().filter(Objects::nonNull).collect(Collectors.toList());
+                    if (nonNullElements.size() == 0) {
+                        normalizedValue = null;
+                    } else if (nonNullElements.size() == 1) {
+                        normalizedValue = nonNullElements.get(0).toString();
+                    } else {
+                        final String[] array = new String[list.size()];
+                        for (int i = 0; i < list.size(); i++) {
+                            final Object rawObject = list.get(i);
+                            array[i] = rawObject == null ? null : rawObject.toString();
+                        }
+                        normalizedValue = array;
                     }
-                    normalizedValue = array;
                 } else {
                     normalizedValue = rawValue == null ? null : rawValue.toString();
                 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
new file mode 100644
index 0000000..4c29c1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.grok;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+public class TestGrokReader {
+    private TestRunner runner;
+    private List<Record> records;
+
+    private static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
+        .name("reader")
+        .identifiesControllerService(GrokReader.class)
+        .build();
+
+    @BeforeEach
+    void setUp() {
+        Processor processor = new AbstractProcessor() {
+            Relationship SUCCESS = new Relationship.Builder()
+                .name("success")
+                .build();
+
+            @Override
+            public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+                FlowFile flowFile = session.get();
+                final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
+
+                try (final InputStream in = session.read(flowFile);
+                     final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        records.add(record);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                session.transfer(flowFile, SUCCESS);
+            }
+
+            @Override
+            protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+                return Arrays.asList(READER);
+            }
+
+            @Override
+            public Set<Relationship> getRelationships() {
+                return new HashSet<>(Arrays.asList(SUCCESS));
+            }
+        };
+
+        runner = TestRunners.newTestRunner(processor);
+
+        records = new ArrayList<>();
+    }
+
+    @Test
+    void testComplexGrokExpression() throws Exception {
+        // GIVEN
+        String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" + System.lineSeparator()
+            + "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + System.lineSeparator();
+
+        String grokPatternFile = "src/test/resources/grok/grok_patterns.txt";
+        String grokExpression = "%{LINE}";
+
+        SimpleRecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
+            new RecordField("facility", RecordFieldType.STRING.getDataType()),
+            new RecordField("priority", RecordFieldType.STRING.getDataType()),
+            new RecordField("logsource", RecordFieldType.STRING.getDataType()),
+            new RecordField("program", RecordFieldType.STRING.getDataType()),
+            new RecordField("pid", RecordFieldType.STRING.getDataType()),
+            new RecordField("message", RecordFieldType.STRING.getDataType()),
+            new RecordField("stackTrace", RecordFieldType.STRING.getDataType()),
+            new RecordField("_raw", RecordFieldType.STRING.getDataType())
+        ));
+
+        List<Record> expectedRecords = Arrays.asList(
+            new MapRecord(expectedSchema, new HashMap<String, Object>() {{
+                put("timestamp", "1021-09-09 09:03:06");
+                put("facility", null);
+                put("priority", null);
+                put("logsource", "127.0.0.1");
+                put("program", "nifi");
+                put("pid", "1000");
+                put("message", " LogMessage");
+                put("stackstrace", null);
+                put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
+            }}),
+            new MapRecord(expectedSchema, new HashMap<String, Object>() {{
+                put("timestamp", "October 19 19:13:16");
+                put("facility", null);
+                put("priority", null);
+                put("logsource", "127.0.0.1");
+                put("program", "nifi");
+                put("pid", "1000");
+                put("message", " LogMessage2");
+                put("stackstrace", null);
+                put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
+            }})
+        );
+
+        // WHEN
+        GrokReader grokReader = new GrokReader();
+
+        runner.addControllerService("grokReader", grokReader);
+        runner.setProperty(READER, "grokReader");
+
+        runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
+        runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
+
+        runner.enableControllerService(grokReader);
+
+        runner.enqueue(input);
+        runner.run();
+
+        // THEN
+        List<Function<Record, Object>> propertyProviders = Arrays.asList(
+            Record::getSchema,
+            Record::getValues
+        );
+
+        List<EqualsWrapper<Record>> wrappedExpected = EqualsWrapper.wrapList(expectedRecords, propertyProviders);
+        List<EqualsWrapper<Record>> wrappedActual = EqualsWrapper.wrapList(records, propertyProviders);
+
+        Assertions.assertEquals(wrappedExpected, wrappedActual);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt
new file mode 100644
index 0000000..1d4642e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/grok/grok_patterns.txt
@@ -0,0 +1,4 @@
+SYSLOGBASE_ISO8601 %{TIMESTAMP_ISO8601:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
+LINE_1 %{SYSLOGBASE}%{GREEDYDATA:message}
+LINE_2 %{SYSLOGBASE_ISO8601}%{GREEDYDATA:message}
+LINE (?:%{LINE_1}|%{LINE_2})