blob: da94919ff67b922f85ed532c263ddfb1b9fd90dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.http;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.metastore.metadata.TableMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import com.google.common.base.Preconditions;
@JsonTypeName("http-scan")
public class HttpGroupScan extends AbstractGroupScan {
private final List<SchemaPath> columns;
private final HttpScanSpec httpScanSpec;
private final Map<String, String> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
private final int maxRecords;
private final String username;
// Used only in planner, not serialized
private int hashCode;
private MetadataProviderManager metadataProviderManager;
/**
* Creates a new group scan from the storage plugin.
*/
public HttpGroupScan (HttpScanSpec scanSpec, MetadataProviderManager metadataProviderManager) {
super(scanSpec.queryUserName());
this.httpScanSpec = scanSpec;
this.username = scanSpec.queryUserName();
this.columns = ALL_COLUMNS;
this.filters = null;
this.filterSelectivity = 0.0;
this.scanStats = computeScanStats();
this.maxRecords = -1;
this.metadataProviderManager = metadataProviderManager;
}
/**
* Copies the group scan during many stages of Calcite operation.
*/
public HttpGroupScan(HttpGroupScan that) {
super(that);
this.httpScanSpec = that.httpScanSpec;
this.columns = that.columns;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
this.username = that.username;
this.metadataProviderManager = that.metadataProviderManager;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
this.scanStats = that.scanStats;
}
/**
* Applies columns. Oddly called multiple times, even when
* the scan already has columns.
*/
public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
super(that);
this.httpScanSpec = that.httpScanSpec;
this.columns = columns;
// Oddly called later in planning, after earlier assigning columns,
// to again assign columns. Retain filters, but compute new stats.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
this.username = that.username;
this.maxRecords = that.maxRecords;
}
private List<SchemaPath> addColumnsToSchemaPath(List<SchemaPath> columns) {
// This function handles the case when the pagination columns are not projected.
// This logic adds these columns to the projection list to ensure they are projected
// This is only relevant for index pagination.
// First check to see whether this is an index paginator or not, and all the fields are populated.
if (httpScanSpec.connectionConfig().paginator() != null &&
httpScanSpec.connectionConfig().paginator().getMethodType() != PaginatorMethod.INDEX &&
StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
return columns;
}
// Next, if the query is a star query, we don't need to modify anything.
if (columns == ALL_COLUMNS && StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
return columns;
}
HttpPaginatorConfig paginatorConfig = httpScanSpec.connectionConfig().paginator();
if (StringUtils.isNotEmpty(paginatorConfig.hasMoreParam())) {
columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.hasMoreParam())));
}
if (StringUtils.isNotEmpty(paginatorConfig.indexParam())) {
columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.indexParam())));
}
if (StringUtils.isNotEmpty(paginatorConfig.nextPageParam())) {
columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.nextPageParam())));
}
return columns;
}
/**
* Adds a filter to the scan.
*/
public HttpGroupScan(HttpGroupScan that, Map<String, String> filters,
double filterSelectivity) {
super(that);
this.columns = that.columns;
this.httpScanSpec = that.httpScanSpec;
this.username = that.username;
// Applies a filter.
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = that.maxRecords;
this.metadataProviderManager = that.metadataProviderManager;
}
/**
* Adds a limit to the scan.
*/
public HttpGroupScan(HttpGroupScan that, int maxRecords) {
super(that);
this.columns = that.columns;
this.httpScanSpec = that.httpScanSpec;
this.username = that.username;
// Applies a filter.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = maxRecords;
this.metadataProviderManager = that.metadataProviderManager;
}
/**
* Deserialize a group scan. Not called in normal operation. Probably used
* only if Drill executes a logical plan.
*/
@JsonCreator
public HttpGroupScan(
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
@JsonProperty("filters") Map<String, String> filters,
@JsonProperty("filterSelectivity") double selectivity,
@JsonProperty("maxRecords") int maxRecords
) {
super(httpScanSpec.queryUserName());
this.columns = columns;
this.httpScanSpec = httpScanSpec;
this.username = httpScanSpec.queryUserName();
this.filters = filters;
this.filterSelectivity = selectivity;
this.scanStats = computeScanStats();
this.maxRecords = maxRecords;
}
@JsonProperty("columns")
public List<SchemaPath> columns() { return columns; }
@JsonProperty("httpScanSpec")
public HttpScanSpec httpScanSpec() { return httpScanSpec; }
@JsonProperty("filters")
public Map<String, String> filters() { return filters; }
@JsonProperty("filterSelectivity")
public double selectivity() { return filterSelectivity; }
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) { }
@Override
@JsonIgnore
public int getMaxParallelizationWidth() {
return 1;
}
@Override
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
@JsonIgnore
public HttpApiConfig getHttpConfig() {
return httpScanSpec.connectionConfig();
}
@Override
public SubScan getSpecificScan(int minorFragmentId) {
return new HttpSubScan(httpScanSpec, columns, filters, maxRecords, getSchema());
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
return new HttpGroupScan(this, columns);
}
@Override
@JsonIgnore
public String getDigest() {
return toString();
}
@JsonProperty("maxRecords")
public int maxRecords() { return maxRecords; }
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HttpGroupScan(this);
}
public TupleMetadata getSchema() {
if (metadataProviderManager == null) {
return null;
}
try {
return metadataProviderManager.getSchemaProvider().read().getSchema();
} catch (IOException | NullPointerException e) {
return null;
}
}
@Override
public TableMetadata getTableMetadata() {
if (getMetadataProvider() == null) {
return null;
}
return getMetadataProvider().getTableMetadata();
}
@Override
public TableMetadataProvider getMetadataProvider() {
if (metadataProviderManager == null) {
return null;
}
return metadataProviderManager.getTableMetadataProvider();
}
@Override
public ScanStats getScanStats() {
// Since this class is immutable, compute stats once and cache
// them. If the scan changes (adding columns, adding filters), we
// get a new scan without cached stats.
return scanStats;
}
private ScanStats computeScanStats() {
// If this config allows filters, then make the default
// cost very high to force the planner to choose the version
// with filters.
if (allowsFilters() && !hasFilters()) {
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
1E9, 1E112, 1E12);
}
// No good estimates at all, just make up something. Make it smaller if there is a limit.
double estRowCount = 10_000;
// If the limit is greater than 10_000 then use a smaller number so the limit is pushed down.
if (maxRecords >= -1) {
estRowCount = Math.min(maxRecords, 10_000);
estRowCount = estRowCount / 2;
}
// NOTE this was important! if the predicates don't make the query more
// efficient they won't get pushed down
if (hasFilters()) {
estRowCount *= filterSelectivity;
}
double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
double valueCount = estRowCount * estColCount;
double cpuCost = valueCount;
double ioCost = valueCount;
// Force the caller to use our costs rather than the
// defaults (which sets IO cost to zero).
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
estRowCount, cpuCost, ioCost);
}
@JsonIgnore
public boolean hasFilters() {
return filters != null;
}
@Override
public boolean supportsLimitPushdown() {
return true;
}
@Override
public GroupScan applyLimit(int maxRecords) {
if (maxRecords == this.maxRecords) {
return null;
}
return new HttpGroupScan(this, maxRecords);
}
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("scan spec", httpScanSpec)
.field("columns", columns)
.field("filters", filters)
.field("maxRecords", maxRecords)
.toString();
}
@Override
public int hashCode() {
// Hash code is cached since Calcite calls this method many times.
if (hashCode == 0) {
// Don't include cost; it is derived.
hashCode = Objects.hash(httpScanSpec, columns, filters);
}
return hashCode;
}
@JsonIgnore
public boolean allowsFilters() {
// Return true if the query has either parameters specified in the URL or URL params.
return (getHttpConfig().params() != null) || SimpleHttp.hasURLParameters(getHttpConfig().getHttpUrl());
}
private String cleanUpColumnName(String columnName) {
if (! columnName.startsWith("`")) {
columnName = "`" + columnName;
}
if (! columnName.endsWith("`")) {
columnName = columnName + "`";
}
return columnName;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
// Don't include cost; it is derived.
HttpGroupScan other = (HttpGroupScan) obj;
return Objects.equals(httpScanSpec, other.httpScanSpec())
&& Objects.equals(columns, other.columns())
&& Objects.equals(filters, other.filters());
}
}