blob: 0f3988758918a7871deef2470995783e460efa02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.syslog;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.realityforge.jsyslog.message.StructuredDataParameter;
import org.realityforge.jsyslog.message.SyslogMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
public class SyslogRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class);
private static final int MAX_RECORDS_PER_BATCH = 4096;
private final DrillFileSystem fileSystem;
private final FileWork fileWork;
private final String userName;
private BufferedReader reader;
private DrillBuf buffer;
private VectorContainerWriter writer;
private final int maxErrors;
private final boolean flattenStructuredData;
private int errorCount;
private int lineCount;
private final List<SchemaPath> projectedColumns;
private String line;
public SyslogRecordReader(FragmentContext context,
DrillFileSystem fileSystem,
FileWork fileWork,
List<SchemaPath> columns,
String userName,
SyslogFormatConfig config) throws OutOfMemoryException {
this.fileSystem = fileSystem;
this.fileWork = fileWork;
this.userName = userName;
this.maxErrors = config.getMaxErrors();
this.errorCount = 0;
this.buffer = context.getManagedBuffer().reallocIfNeeded(4096);
this.projectedColumns = columns;
this.flattenStructuredData = config.getFlattenStructuredData();
setColumns(columns);
}
@Override
public void setup(final OperatorContext context, final OutputMutator output) {
openFile();
this.writer = new VectorContainerWriter(output);
}
private void openFile() {
InputStream in;
try {
in = fileSystem.openPossiblyCompressedStream(fileWork.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", fileWork.getPath())
.addContext("User name", this.userName)
.build(logger);
}
this.lineCount = 0;
reader = new BufferedReader(new InputStreamReader(in));
}
@Override
public int next() {
this.writer.allocate();
this.writer.reset();
int recordCount = 0;
try {
BaseWriter.MapWriter map = this.writer.rootAsMap();
String line;
while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
lineCount++;
// Skip empty lines
line = line.trim();
if (line.length() == 0) {
continue;
}
this.line = line;
try {
SyslogMessage parsedMessage = SyslogMessage.parseStructuredSyslogMessage(line);
this.writer.setPosition(recordCount);
map.start();
if (isStarQuery()) {
writeAllColumns(map, parsedMessage);
} else {
writeProjectedColumns(map, parsedMessage);
}
map.end();
recordCount++;
} catch (Exception e) {
errorCount++;
if (errorCount > maxErrors) {
throw UserException
.dataReadError()
.message("Maximum Error Threshold Exceeded: ")
.addContext("Line: " + lineCount)
.addContext(e.getMessage())
.build(logger);
}
}
}
this.writer.setValueCount(recordCount);
return recordCount;
} catch (final Exception e) {
errorCount++;
if (errorCount > maxErrors) {
throw UserException.dataReadError()
.message("Error parsing file")
.addContext(e.getMessage())
.build(logger);
}
}
return recordCount;
}
private void writeAllColumns(BaseWriter.MapWriter map, SyslogMessage parsedMessage) {
long milliseconds = 0;
try {
milliseconds = parsedMessage.getTimestamp().getMillis();
} catch (final Exception e) {
errorCount++;
if (errorCount > maxErrors) {
throw UserException.dataReadError()
.message("Syslog Format Plugin: Error parsing date")
.addContext(e.getMessage())
.build(logger);
}
}
map.timeStamp("event_date").writeTimeStamp(milliseconds);
map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
mapStringField("severity", parsedMessage.getLevel().name(), map);
mapStringField("facility", parsedMessage.getFacility().name(), map);
mapStringField("ip", parsedMessage.getHostname(), map);
mapStringField("app_name", parsedMessage.getAppName(), map);
mapStringField("process_id", parsedMessage.getProcId(), map);
mapStringField("message_id", parsedMessage.getMsgId(), map);
if (parsedMessage.getStructuredData() != null) {
mapStringField("structured_data_text", parsedMessage.getStructuredData().toString(), map);
Map<String, List<StructuredDataParameter>> structuredData = parsedMessage.getStructuredData();
if (flattenStructuredData) {
mapFlattenedStructuredData(structuredData, map);
} else {
mapComplexField("structured_data", structuredData, map);
}
}
mapStringField("message", parsedMessage.getMessage(), map);
}
private void writeProjectedColumns(BaseWriter.MapWriter map, SyslogMessage parsedMessage) throws UserException {
String columnName;
for (SchemaPath col : projectedColumns) {
//Case for nested fields
if (col.getAsNamePart().hasChild()) {
String fieldName = col.getAsNamePart().getChild().getName();
mapStructuredDataField(fieldName, map, parsedMessage);
} else {
columnName = col.getAsNamePart().getName();
//Extracts fields from structured data IF the user selected to flatten these fields
if ((!columnName.equals("structured_data_text")) && columnName.startsWith("structured_data_")) {
String fieldName = columnName.replace("structured_data_", "");
String value = getFieldFromStructuredData(fieldName, parsedMessage);
mapStringField(columnName, value, map);
} else {
switch (columnName) {
case "event_date":
long milliseconds = parsedMessage.getTimestamp().getMillis(); //TODO put in try/catch
map.timeStamp("event_date").writeTimeStamp(milliseconds);
break;
case "severity_code":
map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
break;
case "facility_code":
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
break;
case "severity":
mapStringField("severity", parsedMessage.getLevel().name(), map);
break;
case "facility":
mapStringField("facility", parsedMessage.getFacility().name(), map);
break;
case "ip":
mapStringField("ip", parsedMessage.getHostname(), map);
break;
case "app_name":
mapStringField("app_name", parsedMessage.getAppName(), map);
break;
case "process_id":
mapStringField("process_id", parsedMessage.getProcId(), map);
break;
case "msg_id":
mapStringField("message_id", parsedMessage.getMsgId(), map);
break;
case "structured_data":
if (parsedMessage.getStructuredData() != null) {
Map<String, List<StructuredDataParameter>> structured_data = parsedMessage.getStructuredData();
mapComplexField("structured_data", structured_data, map);
}
break;
case "structured_data_text":
if (parsedMessage.getStructuredData() != null) {
mapStringField("structured_data_text", parsedMessage.getStructuredData().toString(), map);
} else {
mapStringField("structured_data_text", "", map);
}
break;
case "message":
mapStringField("message", parsedMessage.getMessage(), map);
break;
case "_raw":
mapStringField("_raw", this.line, map);
break;
default:
mapStringField(columnName, "", map);
}
}
}
}
}
//Helper function to map strings
private void mapStringField(String name, String value, BaseWriter.MapWriter map) {
if (value == null) {
return;
}
try {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
int stringLength = bytes.length;
this.buffer = buffer.reallocIfNeeded(stringLength);
this.buffer.setBytes(0, bytes, 0, stringLength);
map.varChar(name).writeVarChar(0, stringLength, buffer);
} catch (Exception e) {
throw UserException
.dataWriteError()
.addContext("Could not write string: ")
.addContext(e.getMessage())
.build(logger);
}
}
//Helper function to flatten structured data
private void mapFlattenedStructuredData(Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
for (StructuredDataParameter parameter : entry.getValue()) {
String fieldName = "structured_data_" + parameter.getName();
String fieldValue = parameter.getValue();
mapStringField(fieldName, fieldValue, map);
}
}
}
//Gets field from the Structured Data Construct
private String getFieldFromStructuredData(String fieldName, SyslogMessage parsedMessage) {
for (Map.Entry<String, List<StructuredDataParameter>> entry : parsedMessage.getStructuredData().entrySet()) {
for (StructuredDataParameter d : entry.getValue()) {
if (d.getName().equals(fieldName)) {
return d.getValue();
}
}
}
return null;
}
//Helper function to map arrays
private void mapComplexField(String mapName, Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
List<StructuredDataParameter> dataParameters = entry.getValue();
String fieldName;
String fieldValue;
for (StructuredDataParameter parameter : dataParameters) {
fieldName = parameter.getName();
fieldValue = parameter.getValue();
VarCharHolder rowHolder = new VarCharHolder();
byte[] rowStringBytes = fieldValue.getBytes();
this.buffer.reallocIfNeeded(rowStringBytes.length);
this.buffer.setBytes(0, rowStringBytes);
rowHolder.start = 0;
rowHolder.end = rowStringBytes.length;
rowHolder.buffer = this.buffer;
map.map(mapName).varChar(fieldName).write(rowHolder);
}
}
}
private void mapStructuredDataField(String fieldName, BaseWriter.MapWriter map, SyslogMessage parsedMessage) {
String fieldValue = getFieldFromStructuredData(fieldName, parsedMessage);
VarCharHolder rowHolder = new VarCharHolder();
byte[] rowStringBytes = fieldValue.getBytes();
this.buffer.reallocIfNeeded(rowStringBytes.length);
this.buffer.setBytes(0, rowStringBytes);
rowHolder.start = 0;
rowHolder.end = rowStringBytes.length;
rowHolder.buffer = this.buffer;
map.map("structured_data").varChar(fieldName).write(rowHolder);
}
public SimpleDateFormat getValidDateObject(String d) {
SimpleDateFormat tempDateFormat;
if (d != null && !d.isEmpty()) {
tempDateFormat = new SimpleDateFormat(d);
} else {
throw UserException
.parseError()
.message("Invalid date format")
.build(logger);
}
return tempDateFormat;
}
public void close() throws Exception {
this.reader.close();
}
}