blob: 4c29c1b7ba99a68227fd634ea7715bdea532a44d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.grok;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.EqualsWrapper;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
public class TestGrokReader {
private TestRunner runner;
private List<Record> records;
private static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
.name("reader")
.identifiesControllerService(GrokReader.class)
.build();
@BeforeEach
void setUp() {
Processor processor = new AbstractProcessor() {
Relationship SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
session.transfer(flowFile, SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(READER);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(SUCCESS));
}
};
runner = TestRunners.newTestRunner(processor);
records = new ArrayList<>();
}
@Test
void testComplexGrokExpression() throws Exception {
// GIVEN
String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" + System.lineSeparator()
+ "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + System.lineSeparator();
String grokPatternFile = "src/test/resources/grok/grok_patterns.txt";
String grokExpression = "%{LINE}";
SimpleRecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
new RecordField("facility", RecordFieldType.STRING.getDataType()),
new RecordField("priority", RecordFieldType.STRING.getDataType()),
new RecordField("logsource", RecordFieldType.STRING.getDataType()),
new RecordField("program", RecordFieldType.STRING.getDataType()),
new RecordField("pid", RecordFieldType.STRING.getDataType()),
new RecordField("message", RecordFieldType.STRING.getDataType()),
new RecordField("stackTrace", RecordFieldType.STRING.getDataType()),
new RecordField("_raw", RecordFieldType.STRING.getDataType())
));
List<Record> expectedRecords = Arrays.asList(
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "1021-09-09 09:03:06");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage");
put("stackstrace", null);
put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
}}),
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "October 19 19:13:16");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage2");
put("stackstrace", null);
put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
}})
);
// WHEN
GrokReader grokReader = new GrokReader();
runner.addControllerService("grokReader", grokReader);
runner.setProperty(READER, "grokReader");
runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
runner.enableControllerService(grokReader);
runner.enqueue(input);
runner.run();
// THEN
List<Function<Record, Object>> propertyProviders = Arrays.asList(
Record::getSchema,
Record::getValues
);
List<EqualsWrapper<Record>> wrappedExpected = EqualsWrapper.wrapList(expectedRecords, propertyProviders);
List<EqualsWrapper<Record>> wrappedActual = EqualsWrapper.wrapList(records, propertyProviders);
Assertions.assertEquals(wrappedExpected, wrappedActual);
}
}