blob: 69ed312183176427290512ad84781caaa0434352 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.splunk;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.base.filter.ExprNode;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class SplunkGroupScan extends AbstractGroupScan {
private final SplunkPluginConfig config;
private final List<SchemaPath> columns;
private final SplunkScanSpec splunkScanSpec;
private final Map<String, ExprNode.ColRelOpConstNode> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
private final int maxRecords;
private int hashCode;
/**
* Creates a new group scan from the storage plugin.
*/
public SplunkGroupScan (SplunkScanSpec scanSpec) {
super("no-user");
this.splunkScanSpec = scanSpec;
this.config = scanSpec.getConfig();
this.columns = ALL_COLUMNS;
this.filters = null;
this.filterSelectivity = 0.0;
this.maxRecords = -1;
this.scanStats = computeScanStats();
}
/**
* Copies the group scan during many stages of Calcite operation.
*/
public SplunkGroupScan(SplunkGroupScan that) {
super(that);
this.config = that.config;
this.splunkScanSpec = that.splunkScanSpec;
this.columns = that.columns;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
this.scanStats = that.scanStats;
}
/**
* Applies columns. Oddly called multiple times, even when
* the scan already has columns.
*/
public SplunkGroupScan(SplunkGroupScan that, List<SchemaPath> columns) {
super(that);
this.columns = columns;
this.splunkScanSpec = that.splunkScanSpec;
this.config = that.config;
// Oddly called later in planning, after earlier assigning columns,
// to again assign columns. Retain filters, but compute new stats.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
this.scanStats = computeScanStats();
}
/**
* Adds a filter to the scan.
*/
public SplunkGroupScan(SplunkGroupScan that, Map<String, ExprNode.ColRelOpConstNode> filters,
double filterSelectivity) {
super(that);
this.columns = that.columns;
this.splunkScanSpec = that.splunkScanSpec;
this.config = that.config;
// Applies a filter.
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.maxRecords = that.maxRecords;
this.scanStats = computeScanStats();
}
/**
* Deserialize a group scan. Not called in normal operation. Probably used
* only if Drill executes a logical plan.
*/
@JsonCreator
public SplunkGroupScan(
@JsonProperty("config") SplunkPluginConfig config,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("splunkScanSpec") SplunkScanSpec splunkScanSpec,
@JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
@JsonProperty("filterSelectivity") double selectivity,
@JsonProperty("maxRecords") int maxRecords
) {
super("no-user");
this.config = config;
this.columns = columns;
this.splunkScanSpec = splunkScanSpec;
this.filters = filters;
this.filterSelectivity = selectivity;
this.maxRecords = maxRecords;
this.scanStats = computeScanStats();
}
/**
* Adds a limit to the group scan
* @param that Previous SplunkGroupScan
* @param maxRecords the limit pushdown
*/
public SplunkGroupScan(SplunkGroupScan that, int maxRecords) {
super(that);
this.columns = that.columns;
// Apply the limit
this.maxRecords = maxRecords;
this.splunkScanSpec = that.splunkScanSpec;
this.config = that.config;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
}
@JsonProperty("config")
public SplunkPluginConfig config() { return config; }
@JsonProperty("columns")
public List<SchemaPath> columns() { return columns; }
@JsonProperty("splunkScanSpec")
public SplunkScanSpec splunkScanSpec() { return splunkScanSpec; }
@JsonProperty("filters")
public Map<String, ExprNode.ColRelOpConstNode> filters() { return filters; }
@JsonProperty("maxRecords")
public int maxRecords() { return maxRecords; }
@JsonProperty("filterSelectivity")
public double selectivity() { return filterSelectivity; }
@Override
public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) { }
@Override
public SubScan getSpecificScan(int minorFragmentId) {
return new SplunkSubScan(config, splunkScanSpec, columns, filters, maxRecords);
}
@Override
public int getMaxParallelizationWidth() {
return 1;
}
@Override
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
@Override
public boolean supportsLimitPushdown() {
return true;
}
@Override
public GroupScan applyLimit(int maxRecords) {
if (maxRecords == this.maxRecords) {
return null;
}
return new SplunkGroupScan(this, maxRecords);
}
@Override
public String getDigest() {
return toString();
}
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new SplunkGroupScan(this);
}
@Override
public ScanStats getScanStats() {
// Since this class is immutable, compute stats once and cache
// them. If the scan changes (adding columns, adding filters), we
// get a new scan without cached stats.
return scanStats;
}
private ScanStats computeScanStats() {
// If this config allows filters, then make the default
// cost very high to force the planner to choose the version
// with filters.
if (allowsFilters() && !hasFilters() && !hasLimit()) {
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
1E9, 1E112, 1E12);
}
// No good estimates at all, just make up something.
double estRowCount = 100_000;
// NOTE this was important! if the predicates don't make the query more
// efficient they won't get pushed down
if (hasFilters()) {
estRowCount *= filterSelectivity;
}
if (maxRecords > 0) {
estRowCount = estRowCount / 2;
}
double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
double valueCount = estRowCount * estColCount;
double cpuCost = valueCount;
double ioCost = valueCount;
// Force the caller to use our costs rather than the
// defaults (which sets IO cost to zero).
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
estRowCount, cpuCost, ioCost);
}
@JsonIgnore
public boolean hasFilters() {
return filters != null;
}
@JsonIgnore
public boolean hasLimit() { return maxRecords == -1; }
@JsonIgnore
public boolean allowsFilters() {
return true;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
return new SplunkGroupScan(this, columns);
}
@Override
public int hashCode() {
// Hash code is cached since Calcite calls this method many times.
if (hashCode == 0) {
// Don't include cost; it is derived.
hashCode = Objects.hash(splunkScanSpec, config, splunkScanSpec, columns);
}
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
// Don't include cost; it is derived.
SplunkGroupScan other = (SplunkGroupScan) obj;
return Objects.equals(splunkScanSpec, other.splunkScanSpec())
&& Objects.equals(config, other.config())
&& Objects.equals(columns, other.columns())
&& Objects.equals(maxRecords, other.maxRecords());
}
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("config", config)
.field("scan spec", splunkScanSpec)
.field("columns", columns)
.field("maxRecords", maxRecords)
.toString();
}
}