blob: 50c15759fc199422ecbe4d19af2c5e9d195b1137 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.splunk;
import com.splunk.JobExportArgs;
import com.splunk.Service;
import com.univocity.parsers.common.processor.RowListProcessor;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.base.filter.ExprNode;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class);
private static final List<String> INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount"));
private static final List<String> TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time"));
private static final String EARLIEST_TIME_COLUMN = "earliestTime";
private static final String LATEST_TIME_COLUMN = "latestTime";
private final SplunkPluginConfig config;
private final SplunkSubScan subScan;
private final List<SchemaPath> projectedColumns;
private final Service splunkService;
private final SplunkScanSpec subScanSpec;
private final CsvParserSettings csvSettings;
private JobExportArgs exportArgs;
private InputStream searchResults;
private CsvParser csvReader;
private String[] firstRow;
private CustomErrorContext errorContext;
private List<SplunkColumnWriter> columnWriters;
private SchemaBuilder builder;
private RowSetLoader rowWriter;
private Stopwatch timer;
public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
this.config = config;
this.subScan = subScan;
this.projectedColumns = subScan.getColumns();
this.subScanSpec = subScan.getScanSpec();
SplunkConnection connection = new SplunkConnection(config);
this.splunkService = connection.connect();
this.csvSettings = new CsvParserSettings();
csvSettings.setLineSeparatorDetectionEnabled(true);
RowListProcessor rowProcessor = new RowListProcessor();
csvSettings.setProcessor(rowProcessor);
csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
}
@Override
public boolean open(SchemaNegotiator negotiator) {
timer = Stopwatch.createUnstarted();
timer.start();
this.errorContext = negotiator.parentErrorContext();
String queryString = buildQueryString();
logger.debug("Query Sent to Splunk: {}", queryString);
// Execute the query
searchResults = splunkService.export(queryString, exportArgs);
logger.debug("Time to execute query: {} milliseconds", timer.elapsed().getNano() / 100000);
/*
Splunk produces poor output from the API. Of the available choices, CSV was the easiest to deal with. Unfortunately,
the data is not consistent, as some fields are quoted, some are not.
*/
this.csvReader = new CsvParser(csvSettings);
logger.debug("Time to open CSV Parser: {} milliseconds", timer.elapsed().getNano() / 100000);
csvReader.beginParsing(searchResults, "utf-8");
logger.debug("Time to open input stream: {} milliseconds", timer.elapsed().getNano() / 100000);
// Build the Schema
builder = new SchemaBuilder();
TupleMetadata drillSchema = buildSchema();
negotiator.tableSchema(drillSchema, false);
ResultSetLoader resultLoader = negotiator.build();
// Create ScalarWriters
rowWriter = resultLoader.writer();
populateWriterArray();
logger.debug("Completed open function in {} milliseconds", timer.elapsed().getNano() / 100000);
return true;
}
@Override
public boolean next() {
while (!rowWriter.isFull()) {
if (!processRow()) {
return false;
}
}
return true;
}
@Override
public void close() {
timer.stop();
if (searchResults != null) {
AutoCloseables.closeSilently(searchResults);
searchResults = null;
}
}
/**
* Splunk returns the data in CSV format with some fields escaped and some not. Splunk does
* not have the concept of datatypes, or at least does not make the metadata available in the API, so
* the best solution is to provide a list of columns that are known to be a specific data type such as _time,
* indextime, the various date components etc and map those as the appropriate columns. Then map everything else as a string.
*/
private TupleMetadata buildSchema() {
this.firstRow = csvReader.parseNext();
// Case for empty dataset
if (firstRow == null) {
return builder.buildSchema();
}
// Parse the first row
for (String value : firstRow) {
if (INT_COLS.contains(value)) {
builder.addNullable(value, MinorType.INT);
} else if (TS_COLS.contains(value)) {
builder.addNullable(value, MinorType.TIMESTAMP);
} else {
try {
builder.addNullable(value, MinorType.VARCHAR);
} catch (Exception e) {
logger.warn("Splunk attempted to add duplicate column {}", value);
}
}
}
logger.debug("Time to build schmea: {} milliseconds", timer.elapsed().getNano() / 100000);
return builder.buildSchema();
}
private void populateWriterArray() {
// Case for empty result set
if (firstRow == null || firstRow.length == 0) {
return;
}
columnWriters = new ArrayList<>();
int colPosition = 0;
for (String value : firstRow) {
if (INT_COLS.contains(value)) {
columnWriters.add(new IntColumnWriter(value, rowWriter, colPosition));
} else if (TS_COLS.contains(value)) {
columnWriters.add(new TimestampColumnWriter(value, rowWriter, colPosition));
} else {
columnWriters.add(new StringColumnWriter(value, rowWriter, colPosition));
}
colPosition++;
}
logger.debug("Time to populate writer array: {} milliseconds", timer.elapsed().getNano() / 100000);
}
private boolean processRow() {
String[] nextRow = csvReader.parseNext();
if (nextRow == null) {
return false;
}
rowWriter.start();
for (SplunkColumnWriter columnWriter : columnWriters) {
columnWriter.load(nextRow);
}
rowWriter.save();
return true;
}
/**
* Checks to see whether the query is a star query. For our purposes, the star query is
* anything that contains only the ** and the SPECIAL_COLUMNS which are not projected.
* @return true if it is a star query, false if not.
*/
private boolean isStarQuery() {
if (Utilities.isStarQuery(projectedColumns)) {
return true;
}
List<SplunkUtils.SPECIAL_FIELDS> specialFields = Arrays.asList(SplunkUtils.SPECIAL_FIELDS.values());
for (SchemaPath path: projectedColumns) {
if (path.nameEquals("**")) {
return true;
} else {
return specialFields.contains(path.getAsNamePart());
}
}
return false;
}
/**
* Determines whether a field is a Splunk multifield.
* @param fieldValue The field to be tested
* @return True if a multifield, false if not.
*/
protected static boolean isMultiField(String fieldValue) {
return (fieldValue.startsWith("{") && fieldValue.endsWith("}"));
}
private String buildQueryString () {
String earliestTime = null;
String latestTime = null;
Map<String, ExprNode.ColRelOpConstNode> filters = subScan.getFilters();
exportArgs = new JobExportArgs();
// Set to normal search mode
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
// Set all time stamps to epoch seconds
exportArgs.setTimeFormat("%s");
// Set output mode to CSV
exportArgs.setOutputMode(JobExportArgs.OutputMode.CSV);
exportArgs.setEnableLookups(true);
// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();
// Remove from map
filters.remove(EARLIEST_TIME_COLUMN);
}
if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();
// Remove from map so they are not pushed down into the query
filters.remove(LATEST_TIME_COLUMN);
}
if (earliestTime == null) {
earliestTime = config.getEarliestTime();
}
if (latestTime == null) {
latestTime = config.getLatestTime();
}
logger.debug("Query time bounds: {} and {}", earliestTime, latestTime);
exportArgs.setEarliestTime(earliestTime);
exportArgs.setLatestTime(latestTime);
// Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL"
// Index and spl filter
if (subScanSpec.getIndexName().equalsIgnoreCase("spl")) {
if (filters == null || filters.get("spl") == null) {
throw UserException
.validationError()
.message("SPL cannot be empty when querying spl table.")
.addContext(errorContext)
.build(logger);
}
return filters.get("spl").value.value.toString();
}
SplunkQueryBuilder builder = new SplunkQueryBuilder(subScanSpec.getIndexName());
// Set the sourcetype
if (filters != null && filters.containsKey("sourcetype")) {
String sourcetype = filters.get("sourcetype").value.value.toString();
builder.addSourceType(sourcetype);
filters.remove("sourcetype");
}
// Pushdown the selected fields for non star queries.
if (! isStarQuery()) {
builder.addField(projectedColumns);
}
// Apply filters
builder.addFilters(filters);
// Apply limits
if (subScan.getMaxRecords() > 0) {
builder.addLimit(subScan.getMaxRecords());
}
return builder.build();
}
public abstract static class SplunkColumnWriter {
final String colName;
ScalarWriter columnWriter;
RowSetLoader rowWriter;
int columnIndex;
public SplunkColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
this.colName = colName;
this.rowWriter = rowWriter;
this.columnWriter = rowWriter.scalar(colName);
this.columnIndex = columnIndex;
}
public void load(String[] record) {}
}
public static class StringColumnWriter extends SplunkColumnWriter {
StringColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
super(colName, rowWriter, columnIndex);
}
@Override
public void load(String[] record) {
String value = record[columnIndex];
if (Strings.isNullOrEmpty(value)) {
columnWriter.setNull();
} else {
columnWriter.setString(value);
}
}
}
public static class IntColumnWriter extends SplunkColumnWriter {
IntColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
super(colName, rowWriter, columnIndex);
}
@Override
public void load(String[] record) {
int value = Integer.parseInt(record[columnIndex]);
columnWriter.setInt(value);
}
}
/**
* There are two known time columns in Splunk, the _time and _indextime.
*/
public static class TimestampColumnWriter extends SplunkColumnWriter {
TimestampColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
super(colName, rowWriter, columnIndex);
}
@Override
public void load(String[] record) {
long value = Long.parseLong(record[columnIndex]) * 1000;
columnWriter.setTimestamp(Instant.ofEpochMilli(value));
}
}
}