blob: 63eeb7a860ce50449d7132912a98a55be4eff1f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.httpd;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import nl.basjes.parse.core.exceptions.DissectionFailure;
import nl.basjes.parse.core.exceptions.InvalidDissectorException;
import nl.basjes.parse.core.exceptions.MissingDissectorsException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import java.util.Collections;
import java.util.Map;
import org.apache.drill.exec.store.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig> {
private static final Logger logger = LoggerFactory.getLogger(HttpdLogFormatPlugin.class);
private static final String PLUGIN_EXTENSION = "httpd";
private static final int VECTOR_MEMORY_ALLOCATION = 4095;
public HttpdLogFormatPlugin(final String name, final DrillbitContext context, final Configuration fsConf,
final StoragePluginConfig storageConfig, final HttpdLogFormatConfig formatConfig) {
super(name, context, fsConf, storageConfig, formatConfig, true, false, true, true,
Collections.singletonList(PLUGIN_EXTENSION), PLUGIN_EXTENSION);
}
@Override
public boolean supportsStatistics() {
return false;
}
@Override
public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
@Override
public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
/**
* This class performs the work for the plugin. This is where all logic goes to read records. In this case httpd logs
* are lines terminated with a new line character.
*/
private class HttpdLogRecordReader extends AbstractRecordReader {
private final DrillFileSystem fs;
private final FileWork work;
private final FragmentContext fragmentContext;
private ComplexWriter writer;
private HttpdParser parser;
private LineRecordReader lineReader;
private LongWritable lineNumber;
public HttpdLogRecordReader(final FragmentContext context, final DrillFileSystem fs, final FileWork work, final List<SchemaPath> columns) {
this.fs = fs;
this.work = work;
this.fragmentContext = context;
setColumns(columns);
}
/**
* The query fields passed in are formatted in a way that Drill requires. Those must be cleaned up to work with the
* parser.
*
* @return Map with Drill field names as a key and Parser Field names as a value
*/
private Map<String, String> makeParserFields() {
Map<String, String> fieldMapping = new HashMap<>();
for (final SchemaPath sp : getColumns()) {
String drillField = sp.getRootSegment().getPath();
try {
String parserField = HttpdParser.parserFormattedFieldName(drillField);
fieldMapping.put(drillField, parserField);
} catch (Exception e) {
logger.info("Putting field: {} into map", drillField, e);
}
}
return fieldMapping;
}
@Override
public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
try {
/*
* Extract the list of field names for the parser to use if it is NOT a star query. If it is a star query just
* pass through an empty map, because the parser is going to have to build all possibilities.
*/
final Map<String, String> fieldMapping = !isStarQuery() ? makeParserFields() : null;
writer = new VectorContainerWriter(output);
parser = new HttpdParser(writer.rootAsMap(), context.getManagedBuffer(),
HttpdLogFormatPlugin.this.getConfig().getLogFormat(),
HttpdLogFormatPlugin.this.getConfig().getTimestampFormat(),
fieldMapping);
final Path path = fs.makeQualified(work.getPath());
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
TextInputFormat inputFormat = new TextInputFormat();
JobConf job = new JobConf(fs.getConf());
job.setInt("io.file.buffer.size", fragmentContext.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
job.setInputFormat(inputFormat.getClass());
lineReader = (LineRecordReader) inputFormat.getRecordReader(split, job, Reporter.NULL);
lineNumber = lineReader.createKey();
} catch (NoSuchMethodException | MissingDissectorsException | InvalidDissectorException e) {
throw handleAndGenerate("Failure creating HttpdParser", e);
} catch (IOException e) {
throw handleAndGenerate("Failure creating HttpdRecordReader", e);
}
}
private RuntimeException handleAndGenerate(final String s, final Exception e) {
throw UserException.dataReadError(e)
.message(s + "\n%s", e.getMessage())
.addContext("Path", work.getPath())
.addContext("Split Start", work.getStart())
.addContext("Split Length", work.getLength())
.addContext("Local Line Number", lineNumber.get())
.build(logger);
}
/**
* This record reader is given a batch of records (lines) to read. Next acts upon a batch of records.
*
* @return Number of records in this batch.
*/
@Override
public int next() {
try {
final Text line = lineReader.createValue();
writer.allocate();
writer.reset();
int recordCount = 0;
while (recordCount < VECTOR_MEMORY_ALLOCATION && lineReader.next(lineNumber, line)) {
writer.setPosition(recordCount);
parser.parse(line.toString());
recordCount++;
}
writer.setValueCount(recordCount);
return recordCount;
} catch (DissectionFailure | InvalidDissectorException | MissingDissectorsException | IOException e) {
throw handleAndGenerate("Failure while parsing log record.", e);
}
}
@Override
public void close() throws Exception {
try {
if (lineReader != null) {
lineReader.close();
}
} catch (IOException e) {
logger.warn("Failure while closing Httpd reader.", e);
}
}
@Override
public String toString() {
return "HttpdLogRecordReader[Path=" + work.getPath()
+ ", Start=" + work.getStart()
+ ", Length=" + work.getLength()
+ ", Line=" + lineNumber.get()
+ "]";
}
}
/**
* This plugin supports pushing down into the parser. Only fields specifically asked for within the configuration will
* be parsed. If no fields are asked for then all possible fields will be returned.
*
* @return true
*/
@Override
public boolean supportsPushDown() {
return true;
}
@Override
public RecordReader getRecordReader(final FragmentContext context, final DrillFileSystem dfs, final FileWork fileWork, final List<SchemaPath> columns, final String userName) {
return new HttpdLogRecordReader(context, dfs, fileWork, columns);
}
@Override
public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) {
throw new UnsupportedOperationException("Drill doesn't currently support writing HTTPd logs");
}
@Override
public int getReaderOperatorType() {
return CoreOperatorType.HTPPD_LOG_SUB_SCAN_VALUE;
}
@Override
public int getWriterOperatorType() {
throw new UnsupportedOperationException();
}
}