blob: 4f2d9e179a700c0f2b5eab2a8af05aae47136749 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.http;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@JsonTypeName("http-scan")
public class HttpGroupScan extends AbstractGroupScan {
private final List<SchemaPath> columns;
private final HttpScanSpec httpScanSpec;
private final Map<String, String> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
private final int maxRecords;
// Used only in planner, not serialized
private int hashCode;
/**
* Creates a new group scan from the storage plugin.
*/
public HttpGroupScan (HttpScanSpec scanSpec) {
super("no-user");
this.httpScanSpec = scanSpec;
this.columns = ALL_COLUMNS;
this.filters = null;
this.filterSelectivity = 0.0;
this.scanStats = computeScanStats();
this.maxRecords = -1;
}
/**
* Copies the group scan during many stages of Calcite operation.
*/
public HttpGroupScan(HttpGroupScan that) {
super(that);
this.httpScanSpec = that.httpScanSpec;
this.columns = that.columns;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
this.scanStats = that.scanStats;
}
/**
* Applies columns. Oddly called multiple times, even when
* the scan already has columns.
*/
public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
super(that);
this.columns = columns;
this.httpScanSpec = that.httpScanSpec;
// Oddly called later in planning, after earlier assigning columns,
// to again assign columns. Retain filters, but compute new stats.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = that.maxRecords;
}
/**
* Adds a filter to the scan.
*/
public HttpGroupScan(HttpGroupScan that, Map<String, String> filters,
double filterSelectivity) {
super(that);
this.columns = that.columns;
this.httpScanSpec = that.httpScanSpec;
// Applies a filter.
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = that.maxRecords;
}
/**
* Adds a limit to the scan.
*/
public HttpGroupScan(HttpGroupScan that, int maxRecords) {
super(that);
this.columns = that.columns;
this.httpScanSpec = that.httpScanSpec;
// Applies a filter.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = maxRecords;
}
/**
* Deserialize a group scan. Not called in normal operation. Probably used
* only if Drill executes a logical plan.
*/
@JsonCreator
public HttpGroupScan(
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
@JsonProperty("filters") Map<String, String> filters,
@JsonProperty("filterSelectivity") double selectivity,
@JsonProperty("maxRecords") int maxRecords
) {
super("no-user");
this.columns = columns;
this.httpScanSpec = httpScanSpec;
this.filters = filters;
this.filterSelectivity = selectivity;
this.scanStats = computeScanStats();
this.maxRecords = maxRecords;
}
@JsonProperty("columns")
public List<SchemaPath> columns() { return columns; }
@JsonProperty("httpScanSpec")
public HttpScanSpec httpScanSpec() { return httpScanSpec; }
@JsonProperty("filters")
public Map<String, String> filters() { return filters; }
@JsonProperty("filterSelectivity")
public double selectivity() { return filterSelectivity; }
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) { }
@Override
@JsonIgnore
public int getMaxParallelizationWidth() {
return 1;
}
@Override
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
@JsonIgnore
public HttpApiConfig getHttpConfig() {
return httpScanSpec.connectionConfig();
}
@Override
public SubScan getSpecificScan(int minorFragmentId) {
return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
return new HttpGroupScan(this, columns);
}
@Override
@JsonIgnore
public String getDigest() {
return toString();
}
@JsonProperty("maxRecords")
public int maxRecords() { return maxRecords; }
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HttpGroupScan(this);
}
@Override
public ScanStats getScanStats() {
// Since this class is immutable, compute stats once and cache
// them. If the scan changes (adding columns, adding filters), we
// get a new scan without cached stats.
return scanStats;
}
private ScanStats computeScanStats() {
// If this config allows filters, then make the default
// cost very high to force the planner to choose the version
// with filters.
if (allowsFilters() && !hasFilters()) {
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
1E9, 1E112, 1E12);
}
// No good estimates at all, just make up something.
double estRowCount = 10_000;
// NOTE this was important! if the predicates don't make the query more
// efficient they won't get pushed down
if (hasFilters()) {
estRowCount *= filterSelectivity;
}
double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
double valueCount = estRowCount * estColCount;
double cpuCost = valueCount;
double ioCost = valueCount;
// Force the caller to use our costs rather than the
// defaults (which sets IO cost to zero).
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
estRowCount, cpuCost, ioCost);
}
@JsonIgnore
public boolean hasFilters() {
return filters != null;
}
@Override
public boolean supportsLimitPushdown() {
return true;
}
@Override
public GroupScan applyLimit(int maxRecords) {
if (maxRecords == this.maxRecords) {
return null;
}
return new HttpGroupScan(this, maxRecords);
}
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("scan spec", httpScanSpec)
.field("columns", columns)
.field("filters", filters)
.field("maxRecords", maxRecords)
.toString();
}
@Override
public int hashCode() {
// Hash code is cached since Calcite calls this method many times.
if (hashCode == 0) {
// Don't include cost; it is derived.
hashCode = Objects.hash(httpScanSpec, columns, filters);
}
return hashCode;
}
@JsonIgnore
public boolean allowsFilters() {
return getHttpConfig().params() != null;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
// Don't include cost; it is derived.
HttpGroupScan other = (HttpGroupScan) obj;
return Objects.equals(httpScanSpec, other.httpScanSpec())
&& Objects.equals(columns, other.columns())
&& Objects.equals(filters, other.filters());
}
}