blob: 790700638a7fe26cafb889481f1788d5cb6d704f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.mapr.db.json;
import java.util.List;
import java.util.NavigableMap;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.metastore.store.FileSystemMetadataProviderManager;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.planner.index.MapRDBStatistics;
import org.apache.drill.exec.planner.cost.PluginCost;
import org.apache.drill.exec.planner.index.Statistics;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
/**
* A RestrictedJsonTableGroupScan encapsulates (along with a subscan) the functionality
* for doing restricted (i.e skip) scan rather than sequential scan. The skipping is based
* on a supplied set of row keys (primary keys) from a join operator.
*/
@JsonTypeName("restricted-json-scan")
public class RestrictedJsonTableGroupScan extends JsonTableGroupScan {
@JsonCreator
public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("storage") FileSystemPlugin storagePlugin,
@JsonProperty("format") MapRDBFormatPlugin formatPlugin,
@JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("") MapRDBStatistics statistics,
@JsonProperty("schema") TupleMetadata schema) {
super(userName, storagePlugin, formatPlugin, scanSpec, columns,
statistics, FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
// TODO: this method needs to be fully implemented
protected RestrictedMapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
JsonScanSpec spec = scanSpec;
RestrictedMapRDBSubScanSpec subScanSpec =
new RestrictedMapRDBSubScanSpec(
spec.getTableName(),
getRegionsToScan().get(tfi), spec.getSerializedFilter(), getUserName());
return subScanSpec;
}
protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
return getRegionsToScan(formatPlugin.getRestrictedScanRangeSizeMB());
}
@Override
public MapRDBSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
RestrictedMapRDBSubScan subscan =
new RestrictedMapRDBSubScan(getUserName(), formatPlugin,
getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON, getSchema());
return subscan;
}
private List<RestrictedMapRDBSubScanSpec> getEndPointFragmentMapping(int minorFragmentId) {
List<RestrictedMapRDBSubScanSpec> restrictedSubScanSpecList = Lists.newArrayList();
List<MapRDBSubScanSpec> subScanSpecList = endpointFragmentMapping.get(minorFragmentId);
for (MapRDBSubScanSpec s : subScanSpecList) {
restrictedSubScanSpecList.add((RestrictedMapRDBSubScanSpec) s);
}
return restrictedSubScanSpecList;
}
/**
* Private constructor, used for cloning.
* @param that The RestrictedJsonTableGroupScan to clone
*/
private RestrictedJsonTableGroupScan(RestrictedJsonTableGroupScan that) {
super(that);
}
@Override
public GroupScan clone(JsonScanSpec scanSpec) {
RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
newScan.scanSpec = scanSpec;
newScan.resetRegionsToScan(); // resetting will force recalculation
return newScan;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
newScan.columns = columns;
return newScan;
}
@Override
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new RestrictedJsonTableGroupScan(this);
}
@Override
public ScanStats getScanStats() {
// TODO: ideally here we should use the rowcount from index scan, and multiply a factor of restricted scan
double rowCount;
PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
int numColumns = (columns == null || columns.isEmpty()) ? STAR_COLS: columns.size();
// Get the restricted group scan row count - same as the right side index rows
rowCount = computeRestrictedScanRowcount();
// Get the average row size of the primary table
double avgRowSize = stats.getAvgRowSize(null, true);
if (avgRowSize == Statistics.AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
avgRowSize = avgColumnSize * numColumns;
}
// restricted scan does random lookups and each row may belong to a different block, with the number
// of blocks upper bounded by the total num blocks in the primary table
double totalBlocksPrimary = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
double numBlocks = Math.min(totalBlocksPrimary, rowCount);
double diskCost = numBlocks * pluginCostModel.getRandomBlockReadCost(this);
// For non-covering plans, the dominating cost would be of the join back. Reduce it using the factor
// for biasing towards non-covering plans.
diskCost *= stats.getRowKeyJoinBackIOFactor();
logger.debug("RestrictedJsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}",
System.identityHashCode(this), rowCount, avgRowSize, numBlocks, totalBlocksPrimary, diskCost);
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
}
private double computeRestrictedScanRowcount() {
double rowCount = Statistics.ROWCOUNT_UNKNOWN;
// The rowcount should be the same as the build side which was FORCED by putting it in forcedRowCountMap
if (forcedRowCountMap.get(null) != null) {
rowCount = forcedRowCountMap.get(null);
}
// If limit pushdown has occurred - factor it in the rowcount
if (rowCount == Statistics.ROWCOUNT_UNKNOWN || rowCount == 0) {
rowCount = (0.001f * fullTableRowCount);
}
if (this.maxRecordsToRead > 0) {
rowCount = Math.min(rowCount, this.maxRecordsToRead);
}
return rowCount;
}
@Override
public boolean isRestrictedScan() {
return true;
}
@Override
public String toString() {
return "RestrictedJsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns
+ ", rowcount=" + computeRestrictedScanRowcount()
+ (maxRecordsToRead > 0 ? ", limit=" + maxRecordsToRead : "")
+ (getMaxParallelizationWidth() > 0 ? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]";
}
}