blob: 48317c9e0e237eb9671b4aa365248b572de71890 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.grok;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.Match;
public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
private final Grok grok;
private final NoMatchStrategy noMatchStrategy;
private final RecordSchema schemaFromGrok;
private RecordSchema schema;
private String nextLine;
Map<String, Object> nextMap = null;
static final String STACK_TRACE_COLUMN_NAME = "stackTrace";
static final String RAW_MESSAGE_NAME = "_raw";
private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
"^\\s*(?:(?: |\\t)+at )|"
+ "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
+ "(?:Caused by\\: )|"
+ "(?:Suppressed\\: )|"
+ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy noMatchStrategy) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.grok = grok;
this.schema = schema;
this.noMatchStrategy = noMatchStrategy;
this.schemaFromGrok = schemaFromGrok;
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
Map<String, Object> valueMap = nextMap;
nextMap = null;
StringBuilder raw = new StringBuilder();
int iterations = 0;
while (valueMap == null || valueMap.isEmpty()) {
iterations++;
final String line = nextLine == null ? reader.readLine() : nextLine;
raw.append(line);
nextLine = null; // ensure that we don't process nextLine again
if (line == null) {
return null;
}
final Match match = grok.match(line);
valueMap = match.capture();
if((valueMap == null || valueMap.isEmpty()) && noMatchStrategy.equals(NoMatchStrategy.RAW)) {
break;
}
}
if (iterations == 0 && nextLine != null) {
raw.append(nextLine);
}
// Read the next line to see if it matches the pattern (in which case we will simply leave it for
// the next call to nextRecord()) or we will attach it to the previously read record.
String stackTrace = null;
final StringBuilder trailingText = new StringBuilder();
while ((nextLine = reader.readLine()) != null) {
final Match nextLineMatch = grok.match(nextLine);
final Map<String, Object> nextValueMap = nextLineMatch.capture();
if (nextValueMap.isEmpty() && !noMatchStrategy.equals(NoMatchStrategy.RAW)) {
// next line did not match. Check if it indicates a Stack Trace. If so, read until
// the stack trace ends. Otherwise, append the next line to the last field in the record.
if (isStartOfStackTrace(nextLine)) {
stackTrace = readStackTrace(nextLine);
raw.append("\n").append(stackTrace);
break;
} else if (noMatchStrategy.equals(NoMatchStrategy.APPEND)) {
trailingText.append("\n").append(nextLine);
raw.append("\n").append(nextLine);
}
} else {
// The next line matched our pattern.
nextMap = nextValueMap;
break;
}
}
final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields);
return record;
}
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) {
final Map<String, Object> converted = new HashMap<>();
if(valueMap != null && !valueMap.isEmpty()) {
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
final Object rawValue = entry.getValue();
final Object normalizedValue;
if (rawValue instanceof List) {
final List<?> list = (List<?>) rawValue;
List<?> nonNullElements = list.stream().filter(Objects::nonNull).collect(Collectors.toList());
if (nonNullElements.size() == 0) {
normalizedValue = null;
} else if (nonNullElements.size() == 1) {
normalizedValue = nonNullElements.get(0).toString();
} else {
final String[] array = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
final Object rawObject = list.get(i);
array[i] = rawObject == null ? null : rawObject.toString();
}
normalizedValue = array;
}
} else {
normalizedValue = rawValue == null ? null : rawValue.toString();
}
final Optional<RecordField> optionalRecordField = schema.getField(fieldName);
final Object coercedValue;
if (coerceTypes && optionalRecordField.isPresent()) {
final RecordField field = optionalRecordField.get();
final DataType fieldType = field.getDataType();
coercedValue = convert(fieldType, normalizedValue, fieldName);
} else {
coercedValue = normalizedValue;
}
converted.put(fieldName, coercedValue);
}
// If there is any trailing text, determine the last column from the grok schema
// and then append the trailing text to it.
if (noMatchStrategy.equals(NoMatchStrategy.APPEND) && trailingText.length() > 0) {
String lastPopulatedFieldName = null;
final List<RecordField> schemaFields = schemaFromGrok.getFields();
for (int i = schemaFields.size() - 1; i >= 0; i--) {
final RecordField field = schemaFields.get(i);
Object value = converted.get(field.getFieldName());
if (value != null) {
lastPopulatedFieldName = field.getFieldName();
break;
}
for (final String alias : field.getAliases()) {
value = converted.get(alias);
if (value != null) {
lastPopulatedFieldName = alias;
break;
}
}
}
if (lastPopulatedFieldName != null) {
final Object value = converted.get(lastPopulatedFieldName);
if (value == null) {
converted.put(lastPopulatedFieldName, trailingText.toString());
} else if (value instanceof String) { // if not a String it is a List and we will just drop the trailing text
converted.put(lastPopulatedFieldName, (String) value + trailingText.toString());
}
}
}
}
converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
converted.put(RAW_MESSAGE_NAME, raw);
return new MapRecord(schema, converted);
}
private boolean isStartOfStackTrace(final String line) {
if (line == null) {
return false;
}
// Stack Traces are generally of the form:
// java.lang.IllegalArgumentException: My message
// at org.apache.nifi.MyClass.myMethod(MyClass.java:48)
// at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
// Caused by: java.net.SocketTimeoutException: null
// ... 13 common frames omitted
int index = line.indexOf("Exception: ");
if (index < 0) {
index = line.indexOf("Error: ");
}
if (index < 0) {
return false;
}
if (line.indexOf(" ") < index) {
return false;
}
return true;
}
private String readStackTrace(final String firstLine) throws IOException {
final StringBuilder sb = new StringBuilder(firstLine);
String line;
while ((line = reader.readLine()) != null) {
if (isLineInStackTrace(line)) {
sb.append("\n").append(line);
} else {
nextLine = line;
break;
}
}
return sb.toString();
}
private boolean isLineInStackTrace(final String line) {
return STACK_TRACE_PATTERN.matcher(line).find();
}
protected Object convert(final DataType fieldType, final Object rawValue, final String fieldName) {
if (fieldType == null) {
return rawValue;
}
if (rawValue == null) {
return null;
}
// If string is empty then return an empty string if field type is STRING. If field type is
// anything else, we can't really convert it so return null
final boolean fieldEmpty = rawValue instanceof String && ((String) rawValue).isEmpty();
if (fieldEmpty && fieldType.getFieldType() != RecordFieldType.STRING) {
return null;
}
return DataTypeUtils.convertType(rawValue, fieldType, fieldName);
}
@Override
public RecordSchema getSchema() {
return schema;
}
}