blob: 2faf6ef8a307ea363686f21c9f57b4fe781dc631 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.googlesheets;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.base.filter.ExprNode;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.metastore.metadata.TableMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@JsonTypeName("googlesheets-group-scan")
public class GoogleSheetsGroupScan extends AbstractGroupScan {
private final GoogleSheetsScanSpec scanSpec;
private final GoogleSheetsStoragePluginConfig config;
private final List<SchemaPath> columns;
private final String pluginName;
private final Map<String, ExprNode.ColRelOpConstNode> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
private final int maxRecords;
private final GoogleSheetsStoragePlugin plugin;
private int hashCode;
private MetadataProviderManager metadataProviderManager;
// Initial Constructor
public GoogleSheetsGroupScan(String userName,
GoogleSheetsScanSpec scanSpec,
GoogleSheetsStoragePlugin plugin,
MetadataProviderManager metadataProviderManager) {
super(userName);
this.scanSpec = scanSpec;
this.config = scanSpec.getConfig();
this.columns = ALL_COLUMNS;
this.pluginName = plugin.getName();
this.filters = null;
this.filterSelectivity = 0.0;
this.maxRecords = -1;
this.scanStats = computeScanStats();
this.plugin = plugin;
this.metadataProviderManager = metadataProviderManager;
}
// Copy Constructor
public GoogleSheetsGroupScan(GoogleSheetsGroupScan that) {
super(that);
this.scanSpec = that.scanSpec;
this.config = that.config;
this.columns = that.columns;
this.filters = that.filters;
this.pluginName = that.pluginName;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = that.scanStats;
this.maxRecords = that.maxRecords;
this.plugin = that.plugin;
this.metadataProviderManager = that.metadataProviderManager;
this.hashCode = hashCode();
}
/**
* Constructor for applying a limit.
* @param that The previous group scan without the limit.
* @param maxRecords The desired limit, pushed down from Calcite
*/
public GoogleSheetsGroupScan(GoogleSheetsGroupScan that, int maxRecords) {
super(that);
this.scanSpec = that.scanSpec;
this.config = that.config;
this.columns = that.columns;
this.pluginName = that.pluginName;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = maxRecords;
this.plugin = that.plugin;
this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
}
/**
* Constructor for applying columns (Projection pushdown).
* @param that The previous GroupScan, without the columns
* @param columns The list of columns to push down
*/
public GoogleSheetsGroupScan(GoogleSheetsGroupScan that, List<SchemaPath> columns) {
super(that);
this.scanSpec = that.scanSpec;
this.config = scanSpec.getConfig();
this.columns = columns;
this.filters = that.filters;
this.pluginName = that.pluginName;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
this.plugin = that.plugin;
this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
}
/**
* Constructor for applying a filter
* @param that Previous group scan w/o filters
* @param filters The list of filters
* @param filterSelectivity The filter selectivity
*/
public GoogleSheetsGroupScan(GoogleSheetsGroupScan that,
Map<String, ExprNode.ColRelOpConstNode> filters,
double filterSelectivity) {
super(that);
this.scanSpec = that.scanSpec;
this.config = that.config;
this.columns = that.columns;
this.filters = filters;
this.pluginName = that.pluginName;
this.filterSelectivity = filterSelectivity;
this.maxRecords = that.maxRecords;
this.plugin = that.plugin;
this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
}
@JsonCreator
public GoogleSheetsGroupScan(
@JsonProperty("userName") String userName,
@JsonProperty("scanSpec") GoogleSheetsScanSpec scanSpec,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
@JsonProperty("filterSelectivity") double selectivity,
@JsonProperty("maxRecords") int maxRecords,
@JacksonInject StoragePluginRegistry plugins
) {
super(userName);
this.scanSpec = scanSpec;
this.config = scanSpec.getConfig();
this.columns = columns;
this.filters = filters;
this.filterSelectivity = selectivity;
this.maxRecords = maxRecords;
this.scanStats = computeScanStats();
this.plugin = plugins.resolve(config, GoogleSheetsStoragePlugin.class);
this.pluginName = plugin.getName();
}
@JsonProperty("scanSpec")
public GoogleSheetsScanSpec scanSpec() {
return scanSpec;
}
@JsonProperty("config")
public GoogleSheetsStoragePluginConfig config() {
return config;
}
@JsonProperty("columns")
public List<SchemaPath> columns() {
return columns;
}
@JsonProperty("filters")
public Map<String, ExprNode.ColRelOpConstNode> filters() {
return filters;
}
@JsonProperty("maxRecords")
public int maxRecords() {
return maxRecords;
}
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
}
public TupleMetadata getSchema() {
if (metadataProviderManager == null) {
return null;
}
try {
return metadataProviderManager.getSchemaProvider().read().getSchema();
} catch (IOException | NullPointerException e) {
return null;
}
}
@Override
public TableMetadata getTableMetadata() {
if (getMetadataProvider() == null) {
return null;
}
return getMetadataProvider().getTableMetadata();
}
@Override
public TableMetadataProvider getMetadataProvider() {
if (metadataProviderManager == null) {
return null;
}
return metadataProviderManager.getTableMetadataProvider();
}
@Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
@JsonIgnore
public boolean allowsFilters() {
return true;
}
@Override
public SubScan getSpecificScan(int minorFragmentId) {
return new GoogleSheetsSubScan(userName, config, scanSpec, columns, filters, maxRecords, getSchema());
}
@Override
public int getMaxParallelizationWidth() {
return 1;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
return new GoogleSheetsGroupScan(this, columns);
}
@Override
public boolean supportsLimitPushdown() {
return true;
}
@Override
public GroupScan applyLimit(int maxRecords) {
if (maxRecords == this.maxRecords) {
return null;
}
return new GoogleSheetsGroupScan(this, maxRecords);
}
@Override
public String getDigest() {
return toString();
}
@Override
public ScanStats getScanStats() {
// Since this class is immutable, compute stats once and cache
// them. If the scan changes (adding columns, adding filters), we
// get a new scan without cached stats.
return scanStats;
}
private ScanStats computeScanStats() {
// If this config allows filters, then make the default
// cost very high to force the planner to choose the version
// with filters.
if (!hasFilters()) {
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
1E9, 1E112, 1E12);
}
// No good estimates at all, just make up something.
// TODO It may be possible to obtain this from the Google SDK.
double estRowCount = 10_000;
if (maxRecords > 0) {
estRowCount = maxRecords;
}
// NOTE this was important! if the predicates don't make the query more
// efficient they won't get pushed down
if (hasFilters()) {
estRowCount *= filterSelectivity;
}
double estColCount = Utilities.isStarQuery(columns) ? DrillScanRel.STAR_COLUMN_COST : columns.size();
double valueCount = estRowCount * estColCount;
double cpuCost = valueCount;
double ioCost = valueCount;
// Force the caller to use our costs rather than the
// defaults (which sets IO cost to zero).
return new ScanStats(ScanStats.GroupScanProperty.ESTIMATED_TOTAL_COST,
estRowCount, cpuCost, ioCost);
}
@JsonIgnore
public boolean hasFilters() {
return filters != null;
}
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new GoogleSheetsGroupScan(this);
}
@Override
public int hashCode() {
if (hashCode == 0) {
hashCode = Objects.hash(scanSpec, config, columns, filters, maxRecords, pluginName);
}
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
GoogleSheetsGroupScan other = (GoogleSheetsGroupScan) obj;
return Objects.equals(scanSpec, other.scanSpec) &&
Objects.equals(config, other.config) &&
Objects.equals(columns, other.columns) &&
Objects.equals(filters, other.filters) &&
Objects.equals(maxRecords, other.maxRecords);
}
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("scanSpec", scanSpec)
.field("filters", filters)
.field("columns", columns)
.field("field selectivity", filterSelectivity)
.field("maxRecords", maxRecords)
.toString();
}
}