blob: b9703c66a5e01e391aad77e21f7636fcc0b3d1c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.junit.jupiter.api.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSyslog5424RecordReader {
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final String expectedVersion = "1";
private static final String expectedMessage = "Removing instance";
private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38";
private static final String expectedHostName = "loggregator";
private static final String expectedPri = "14";
private static final String expectedProcId = "DEA";
private static final Timestamp expectedTimestamp = Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse("2014-06-20T09:14:07+00:00")));
private static final String expectedMessageId = "MSG-01";
private static final String expectedFacility = "1";
private static final String expectedSeverity = "6";
private static final String expectedIUT1 = "3";
private static final String expectedIUT2 = "4";
private static final String expectedEventSource1 = "Application";
private static final String expectedEventSource2 = "Other Application";
private static final String expectedEventID1 = "1011";
private static final String expectedEventID2 = "2022";
private static final String expectedRawMessage = "<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 "
+ "[exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"][exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" "
+ "eventID=\"2022\"] Removing instance";
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLine() throws IOException, MalformedRecordException {
final Syslog5424Reader reader = new Syslog5424Reader();
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_all.txt")) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, true, CHARSET, fis, reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key()));
assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key()));
assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key()));
assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key()));
assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key()));
assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key()));
assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key()));
assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key()));
assertEquals(expectedTimestamp, record.getValue(SyslogAttributes.TIMESTAMP.key()));
assertEquals(expectedMessageId, record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
assertEquals(expectedRawMessage, record.getAsString(Syslog5424Reader.RAW_MESSAGE_NAME));
assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
assertTrue(structured.containsKey("exampleSDID@32473"));
Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473");
assertTrue(example1.containsKey("iut"));
assertTrue(example1.containsKey("eventSource"));
assertTrue(example1.containsKey("eventID"));
assertEquals(expectedIUT1, example1.get("iut").toString());
assertEquals(expectedEventSource1, example1.get("eventSource").toString());
assertEquals(expectedEventID1, example1.get("eventID").toString());
assertTrue(structured.containsKey("exampleSDID@32480"));
Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480");
assertTrue(example2.containsKey("iut"));
assertTrue(example2.containsKey("eventSource"));
assertTrue(example2.containsKey("eventID"));
assertEquals(expectedIUT2, example2.get("iut").toString());
assertEquals(expectedEventSource2, example2.get("eventSource").toString());
assertEquals(expectedEventID2, example2.get("eventID").toString());
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLineSomeNulls() throws IOException, MalformedRecordException {
final Syslog5424Reader reader = new Syslog5424Reader();
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log.txt")) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key()));
assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key()));
assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key()));
assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key()));
assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key()));
assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key()));
assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key()));
assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key()));
assertEquals(expectedTimestamp, record.getValue(SyslogAttributes.TIMESTAMP.key()));
assertNull(record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
assertNull(record.getAsString(Syslog5424Reader.RAW_MESSAGE_NAME));
assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
assertTrue(structured.containsKey("exampleSDID@32473"));
Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473");
assertTrue(example1.containsKey("iut"));
assertTrue(example1.containsKey("eventSource"));
assertTrue(example1.containsKey("eventID"));
assertEquals(expectedIUT1, example1.get("iut").toString());
assertEquals(expectedEventSource1, example1.get("eventSource").toString());
assertEquals(expectedEventID1, example1.get("eventID").toString());
assertTrue(structured.containsKey("exampleSDID@32480"));
Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480");
assertTrue(example2.containsKey("iut"));
assertTrue(example2.containsKey("eventSource"));
assertTrue(example2.containsKey("eventID"));
assertEquals(expectedIUT2, example2.get("iut").toString());
assertEquals(expectedEventSource2, example2.get("eventSource").toString());
assertEquals(expectedEventID2, example2.get("eventID").toString());
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
public void testParseMultipleLine() throws IOException, MalformedRecordException {
final Syslog5424Reader reader = new Syslog5424Reader();
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix.txt")) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
while (record != null){
assertNotNull(record.getValues());
count++;
record = deserializer.nextRecord();
}
assertEquals(count, 3);
deserializer.close();
}
}
@Test
public void testParseMultipleLineWithError() throws IOException, MalformedRecordException {
final Syslog5424Reader reader = new Syslog5424Reader();
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix_in_error.txt")) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
int exceptionCount = 0;
while (record != null){
assertNotNull(record.getValues());
try {
record = deserializer.nextRecord();
count++;
} catch (Exception e) {
exceptionCount++;
}
}
assertEquals(count, 3);
assertEquals(exceptionCount,1);
deserializer.close();
}
}
}