blob: 14577c19675056f7c5bb9e6a67af8526271eb75a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsDeriveResult;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TOlapScanNode;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSortInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
// Full scan of an Olap table.
public class OlapScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(OlapScanNode.class);
// average compression ratio in doris storage engine
private static final int COMPRESSION_RATIO = 5;
private List<TScanRangeLocations> result = new ArrayList<>();
/*
* When the field value is ON, the storage engine can return the data directly
* without pre-aggregation.
* When the field value is OFF, the storage engine needs to aggregate the data
* before returning to scan node.
* For example:
* Aggregate table: k1, k2, v1 sum
* Field value is ON
* Query1: select k1, sum(v1) from table group by k1
* This aggregation function in query is same as the schema.
* So the field value is ON while the query can scan data directly.
*
* Field value is OFF
* Query1: select k1 , k2 from table
* This aggregation info is null.
* Query2: select k1, min(v1) from table group by k1
* This aggregation function in query is min which different from the schema.
* So the data stored in storage engine need to be merged firstly before
* returning to scan node.
*
* There are currently two places to modify this variable:
* 1. The turnOffPreAgg() method of SingleNodePlanner.
* This method will only be called on the left deepest OlapScanNode the plan
* tree,
* while other nodes are false by default (because the Aggregation operation is
* executed after Join,
* we cannot judge whether other OlapScanNodes can close the pre-aggregation).
* So even the Duplicate key table, if it is not the left deepest node, it will
* remain false too.
*
* 2. After MaterializedViewSelector selects the materialized view, the
* updateScanRangeInfoByNewMVSelector()\
* method of OlapScanNode may be called to update this variable.
* This call will be executed on all ScanNodes in the plan tree. In this step,
* for the DuplicateKey table, the variable will be set to true.
* See comment of "isPreAggregation" variable in MaterializedViewSelector for
* details.
*/
private boolean isPreAggregation = false;
private String reasonOfPreAggregation = null;
private boolean canTurnOnPreAggr = true;
private boolean forceOpenPreAgg = false;
private OlapTable olapTable = null;
private long selectedTabletsNum = 0;
private long totalTabletsNum = 0;
private long selectedIndexId = -1;
private int selectedPartitionNum = 0;
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
private long totalBytes = 0;
private SortInfo sortInfo = null;
// When scan match sort_info, we can push limit into OlapScanNode.
// It's limit for scanner instead of scanNode so we add a new limit.
private long sortLimit = -1;
private TPushAggOp pushDownAggNoGroupingOp = null;
// List of tablets will be scanned by current olap_scan_node
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
private Set<Long> sampleTabletIds = Sets.newHashSet();
private TableSample tableSample;
private HashSet<Long> scanBackendIds = new HashSet<>();
private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
// a bucket seq may map to many tablets, and each tablet has a
// TScanRangeLocations.
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create();
// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
}
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason :
this.reasonOfPreAggregation + " " + reason;
}
public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
}
public boolean isPreAggregation() {
return isPreAggregation;
}
public boolean getCanTurnOnPreAggr() {
return canTurnOnPreAggr;
}
public Set<Long> getSampleTabletIds() {
return sampleTabletIds;
}
public void setSampleTabletIds(List<Long> sampleTablets) {
if (sampleTablets != null) {
this.sampleTabletIds.addAll(sampleTablets);
}
}
public void setTableSample(TableSample tSample) {
this.tableSample = tSample;
}
public void setCanTurnOnPreAggr(boolean canChangePreAggr) {
this.canTurnOnPreAggr = canChangePreAggr;
}
public void closePreAggregation(String reason) {
setIsPreAggregation(false, reason);
setCanTurnOnPreAggr(false);
}
public long getTotalTabletsNum() {
return totalTabletsNum;
}
public boolean getForceOpenPreAgg() {
return forceOpenPreAgg;
}
public void setForceOpenPreAgg(boolean forceOpenPreAgg) {
this.forceOpenPreAgg = forceOpenPreAgg;
}
public Integer getSelectedPartitionNum() {
return selectedPartitionNum;
}
public Long getSelectedTabletsNum() {
return selectedTabletsNum;
}
public SortInfo getSortInfo() {
return sortInfo;
}
public void setSortInfo(SortInfo sortInfo) {
this.sortInfo = sortInfo;
}
public void setSortLimit(long sortLimit) {
this.sortLimit = sortLimit;
}
public Collection<Long> getSelectedPartitionIds() {
return selectedPartitionIds;
}
public void setTupleIds(ArrayList<TupleId> tupleIds) {
this.tupleIds = tupleIds;
}
// only used for UT and Nereids
public void setSelectedPartitionIds(Collection<Long> selectedPartitionIds) {
this.selectedPartitionIds = selectedPartitionIds;
}
/**
* Only used for Neredis to set rollup or materialized view selection result.
*/
public void setSelectedIndexInfo(
long selectedIndexId,
boolean isPreAggregation,
String reasonOfPreAggregation) {
this.selectedIndexId = selectedIndexId;
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = reasonOfPreAggregation;
}
/**
* The function is used to directly select the index id of the base table as the
* selectedIndexId.
* It makes sure that the olap scan node must scan the base data rather than
* scan the materialized view data.
* <p>
* This function is mainly used to update stmt.
* Update stmt also needs to scan data like normal queries.
* But its syntax is different from ordinary queries,
* so planner cannot use the logic of query to automatically match the best
* index id.
* So, here it need to manually specify the index id to scan the base table
* directly.
*/
public void useBaseIndexId() {
this.selectedIndexId = olapTable.getBaseIndexId();
}
public long getSelectedIndexId() {
return selectedIndexId;
}
/**
* This method is mainly used to update scan range info in OlapScanNode by the
* new materialized selector.
* Situation1:
* If the new scan range is same as the old scan range which determined by the
* old materialized selector,
* the scan range will not be changed.
* <p>
* Situation2: Scan range is difference. The type of table is duplicated.
* The new scan range is used directly.
* The reason is that the old selector does not support SPJ<->SPJG, so the
* result of old one must be incorrect.
* <p>
* Situation3: Scan range is difference. The type of table is aggregated.
* The new scan range is different from the old one.
* If the test_materialized_view is set to true, an error will be reported.
* The query will be cancelled.
* <p>
* Situation4: Scan range is difference. The type of table is aggregated.
* `test_materialized_view` is set to false.
* The result of the old version selector will be selected. Print the warning
* log
*
* @param selectedIndexId
* @param isPreAggregation
* @param reasonOfDisable
* @throws UserException
*/
public void updateScanRangeInfoByNewMVSelector(long selectedIndexId,
boolean isPreAggregation, String reasonOfDisable)
throws UserException {
if (selectedIndexId == this.selectedIndexId && isPreAggregation == this.isPreAggregation) {
return;
}
StringBuilder stringBuilder = new StringBuilder("The new selected index id ")
.append(selectedIndexId)
.append(", pre aggregation tag ").append(isPreAggregation)
.append(", reason ").append(reasonOfDisable == null ? "null" : reasonOfDisable)
.append(". The old selected index id ").append(this.selectedIndexId)
.append(" pre aggregation tag ").append(this.isPreAggregation)
.append(" reason ").append(this.reasonOfPreAggregation == null ? "null" : this.reasonOfPreAggregation);
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
update = true;
break CHECK;
}
if (ConnectContext.get() == null) {
situation = "Connection context is null";
update = true;
break CHECK;
}
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
if (sessionVariable.getTestMaterializedView()) {
throw new AnalysisException("The old scan range info is different from the new one when "
+ "test_materialized_view is true. "
+ scanRangeInfo);
}
situation = "The key type of table is aggregated.";
update = false;
} // CHECKSTYLE IGNORE THIS LINE
if (update) {
this.selectedIndexId = selectedIndexId;
updateSlotUniqueId();
setIsPreAggregation(isPreAggregation, reasonOfDisable);
updateColumnType();
if (LOG.isDebugEnabled()) {
LOG.debug("Using the new scan range info instead of the old one. {}, {}",
situation, scanRangeInfo);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Using the old scan range info instead of the new one. {}, {}",
situation, scanRangeInfo);
}
}
}
/**
* In some situation, the column type between base and mv is different.
* If mv selector selects the mv index, the type of column should be changed to
* the type of mv column.
* For example:
* base table: k1 int, k2 int
* mv table: k1 int, k2 bigint sum
* The type of `k2` column between base and mv is different.
* When mv selector selects the mv table to scan, the type of column should be
* changed to bigint in here.
* Currently, only `SUM` aggregate type could match this changed.
*/
private void updateColumnType() {
if (selectedIndexId == olapTable.getBaseIndexId()) {
return;
}
MaterializedIndexMeta meta = olapTable.getIndexMetaByIndexId(selectedIndexId);
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (!slotDescriptor.isMaterialized()) {
continue;
}
Column baseColumn = slotDescriptor.getColumn();
Preconditions.checkNotNull(baseColumn);
Column mvColumn = meta.getColumnByName(baseColumn.getName());
Preconditions.checkNotNull(mvColumn);
if (mvColumn.getType() != baseColumn.getType()) {
slotDescriptor.setColumn(mvColumn);
}
}
}
/**
* In some situation, we need use mv col unique id , because mv col unique and
* base col unique id is different.
* For example: select count(*) from table (table has a mv named mv1)
* if Optimizer deceide use mv1, we need updateSlotUniqueId.
*/
private void updateSlotUniqueId() {
if (!olapTable.getEnableLightSchemaChange() || selectedIndexId == olapTable.getBaseIndexId()) {
return;
}
MaterializedIndexMeta meta = olapTable.getIndexMetaByIndexId(selectedIndexId);
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (!slotDescriptor.isMaterialized()) {
continue;
}
Column baseColumn = slotDescriptor.getColumn();
Column mvColumn = meta.getColumnByName(baseColumn.getName());
slotDescriptor.setColumn(mvColumn);
}
LOG.debug("updateSlotUniqueId() slots: {}", desc.getSlots());
}
public OlapTable getOlapTable() {
return olapTable;
}
@Override
protected String debugString() {
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.addValue(super.debugString());
helper.addValue("olapTable=" + olapTable.getName());
return helper.toString();
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
filterDeletedRows(analyzer);
computeColumnFilter();
computePartitionInfo();
computeTupleState(analyzer);
computeSampleTabletIds();
/**
* Compute InAccurate cardinality before mv selector and tablet pruning.
* - Accurate statistical information relies on the selector of materialized
* views and bucket reduction.
* - However, Those both processes occur after the reorder algorithm is
* completed.
* - When Join reorder is turned on, the cardinality must be calculated before
* the reorder algorithm.
* - So only an inaccurate cardinality can be calculated here.
*/
mockRowCountInStatistic();
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
computeInaccurateCardinality();
}
}
/**
* Init OlapScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else.
*/
public void init() throws UserException {
selectedPartitionNum = selectedPartitionIds.size();
try {
getScanRangeLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
}
/**
* Remove the method after statistics collection is working properly
*/
public void mockRowCountInStatistic() {
long tableId = desc.getTable().getId();
cardinality = 0;
for (long selectedPartitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(selectedPartitionId);
final MaterializedIndex baseIndex = partition.getBaseIndex();
cardinality += baseIndex.getRowCount();
}
Env.getCurrentEnv().getStatisticsManager()
.getStatistics().mockTableStatsWithRowCount(tableId, cardinality);
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
LOG.debug("OlapScanNode get scan range locations. Tuple: {}", desc);
/**
* If JoinReorder is turned on, it will be calculated init(), and this value is
* not accurate.
* In the following logic, cardinality will be accurately calculated again.
* So here we need to reset the value of cardinality.
*/
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
cardinality = 0;
}
try {
getScanRangeLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
// Relatively accurate cardinality according to ScanRange in
// getScanRangeLocations
computeStats(analyzer);
computeNumNodes();
}
public void computeTupleState(Analyzer analyzer) {
for (TupleId id : tupleIds) {
analyzer.getDescTbl().getTupleDesc(id).computeStat();
}
}
@Override
public void computeStats(Analyzer analyzer) throws UserException {
super.computeStats(analyzer);
if (cardinality > 0) {
avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO;
capCardinalityAtLimit();
}
// when node scan has no data, cardinality should be 0 instead of a invalid
// value after computeStats()
cardinality = cardinality == -1 ? 0 : cardinality;
// update statsDeriveResult for real statistics
// After statistics collection is complete, remove the logic
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
statsDeriveResult = new StatsDeriveResult(cardinality, statsDeriveResult.getSlotIdToColumnStats());
}
}
@Override
protected void computeNumNodes() {
if (cardinality > 0) {
numNodes = scanBackendIds.size();
}
// even current node scan has no data,at least on backend will be assigned when
// the fragment actually execute
numNodes = numNodes <= 0 ? 1 : numNodes;
}
private void computeInaccurateCardinality() throws UserException {
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
cardinality = (long) statsDeriveResult.getRowCount();
}
private Collection<Long> partitionPrune(PartitionInfo partitionInfo,
PartitionNames partitionNames) throws AnalysisException {
PartitionPruner partitionPruner = null;
Map<Long, PartitionItem> keyItemMap;
if (partitionNames != null) {
keyItemMap = Maps.newHashMap();
for (String partName : partitionNames.getPartitionNames()) {
Partition partition = olapTable.getPartition(partName, partitionNames.isTemp());
if (partition == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_SUCH_PARTITION, partName);
}
keyItemMap.put(partition.getId(), partitionInfo.getItem(partition.getId()));
}
} else {
keyItemMap = partitionInfo.getIdToItem(false);
}
if (partitionInfo.getType() == PartitionType.RANGE) {
if (analyzer.partitionPruneV2Enabled()) {
partitionPruner = new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(), columnNameToRange);
} else {
partitionPruner = new RangePartitionPruner(keyItemMap,
partitionInfo.getPartitionColumns(), columnFilters);
}
} else if (partitionInfo.getType() == PartitionType.LIST) {
if (analyzer.partitionPruneV2Enabled()) {
partitionPruner = new ListPartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(),
columnNameToRange);
} else {
partitionPruner = new ListPartitionPruner(keyItemMap,
partitionInfo.getPartitionColumns(), columnFilters);
}
}
return partitionPruner.prune();
}
private Collection<Long> distributionPrune(
MaterializedIndex table,
DistributionInfo distributionInfo) throws AnalysisException {
DistributionPruner distributionPruner = null;
switch (distributionInfo.getType()) {
case HASH: {
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
distributionPruner = new HashDistributionPruner(table.getTabletIdsInOrder(),
info.getDistributionColumns(),
columnFilters,
info.getBucketNum());
return distributionPruner.prune();
}
case RANDOM: {
return null;
}
default: {
return null;
}
}
}
private void addScanRangeLocations(Partition partition,
List<Tablet> tablets) throws UserException {
long visibleVersion = partition.getVisibleVersion();
String visibleVersionStr = String.valueOf(visibleVersion);
Set<Tag> allowedTags = Sets.newHashSet();
boolean needCheckTags = false;
if (ConnectContext.get() != null) {
allowedTags = ConnectContext.get().getResourceTags();
needCheckTags = ConnectContext.get().isResourceTagsSet();
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
TPaloScanRange paloRange = new TPaloScanRange();
paloRange.setDbName("");
paloRange.setSchemaHash("0");
paloRange.setVersion(visibleVersionStr);
paloRange.setVersionHash("");
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion);
if (replicas.isEmpty()) {
LOG.error("no queryable replica found in tablet {}. visible version {}",
tabletId, visibleVersion);
if (LOG.isDebugEnabled()) {
for (Replica replica : tablet.getReplicas()) {
LOG.debug("tablet {}, replica: {}", tabletId, replica.toString());
}
}
throw new UserException("Failed to get scan range, no queryable replica found in tablet: " + tabletId);
}
int useFixReplica = -1;
if (ConnectContext.get() != null) {
useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica;
}
if (useFixReplica == -1) {
Collections.shuffle(replicas);
} else {
LOG.debug("use fix replica, value: {}, replica num: {}", useFixReplica, replicas.size());
// sort by replica id
replicas.sort(Replica.ID_COMPARATOR);
Replica replica = replicas.get(useFixReplica >= replicas.size() ? replicas.size() - 1 : useFixReplica);
replicas.clear();
replicas.add(replica);
}
boolean tabletIsNull = true;
boolean collectedStat = false;
List<String> errs = Lists.newArrayList();
for (Replica replica : replicas) {
Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendId());
if (backend == null || !backend.isAlive()) {
LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(),
replica.getId());
errs.add(replica.getId() + "'s backend " + replica.getBackendId() + " does not exist or not alive");
continue;
}
if (!backend.isMixNode()) {
continue;
}
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) {
String err = String.format(
"Replica on backend %d with tag %s," + " which is not in user's resource tags: %s",
backend.getId(), backend.getLocationTag(), allowedTags);
if (LOG.isDebugEnabled()) {
LOG.debug(err);
}
errs.add(err);
continue;
}
String ip = backend.getHost();
int port = backend.getBePort();
TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress(ip, port));
scanRangeLocation.setBackendId(replica.getBackendId());
scanRangeLocations.addToLocations(scanRangeLocation);
paloRange.addToHosts(new TNetworkAddress(ip, port));
tabletIsNull = false;
// for CBO
if (!collectedStat && replica.getRowCount() != -1) {
totalBytes += replica.getDataSize();
collectedStat = true;
}
scanBackendIds.add(backend.getId());
}
if (tabletIsNull) {
throw new UserException(tabletId + " have no queryable replicas. err: " + Joiner.on(", ").join(errs));
}
TScanRange scanRange = new TScanRange();
scanRange.setPaloScanRange(paloRange);
scanRangeLocations.setScanRange(scanRange);
bucketSeq2locations.put(tabletId2BucketSeq.get(tabletId), scanRangeLocations);
result.add(scanRangeLocations);
}
if (tablets.size() == 0) {
desc.setCardinality(0);
} else {
desc.setCardinality(cardinality);
}
}
private void computePartitionInfo() throws AnalysisException {
long start = System.currentTimeMillis();
// Step1: compute partition ids
PartitionNames partitionNames = ((BaseTableRef) desc.getRef()).getPartitionNames();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
selectedPartitionIds = partitionPrune(partitionInfo, partitionNames);
} else {
selectedPartitionIds = null;
}
if (selectedPartitionIds == null) {
selectedPartitionIds = Lists.newArrayList();
for (Partition partition : olapTable.getPartitions()) {
if (!partition.hasData()) {
continue;
}
selectedPartitionIds.add(partition.getId());
}
} else {
selectedPartitionIds = selectedPartitionIds.stream()
.filter(id -> olapTable.getPartition(id).hasData())
.collect(Collectors.toList());
}
selectedPartitionNum = selectedPartitionIds.size();
for (long id : selectedPartitionIds) {
Partition partition = olapTable.getPartition(id);
if (partition.getState() == PartitionState.RESTORE) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_PARTITION_STATE,
partition.getName(), "RESTORING");
}
}
LOG.debug("partition prune cost: {} ms, partitions: {}",
(System.currentTimeMillis() - start), selectedPartitionIds);
}
public void selectBestRollupByRollupSelector(Analyzer analyzer) throws UserException {
// Step2: select best rollup
long start = System.currentTimeMillis();
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
// This function is compatible with the INDEX selection logic of ROLLUP,
// so the Duplicate table here returns base index directly
// and the selection logic of materialized view is selected in
// "MaterializedViewSelector"
selectedIndexId = olapTable.getBaseIndexId();
LOG.debug("The best index will be selected later in mv selector");
return;
}
final RollupSelector rollupSelector = new RollupSelector(analyzer, desc, olapTable);
selectedIndexId = rollupSelector.selectBestRollup(selectedPartitionIds, conjuncts, isPreAggregation);
updateSlotUniqueId();
LOG.debug("select best roll up cost: {} ms, best index id: {}",
(System.currentTimeMillis() - start), selectedIndexId);
}
private void getScanRangeLocations() throws UserException {
if (selectedPartitionIds.size() == 0) {
desc.setCardinality(0);
return;
}
Preconditions.checkState(selectedIndexId != -1);
// compute tablet info by selected index id and selected partition ids
long start = System.currentTimeMillis();
computeTabletInfo();
LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start));
}
/**
* First, determine how many rows to sample from each partition according to the number of partitions.
* Then determine the number of Tablets to be selected for each partition according to the average number
* of rows of Tablet,
* If seek is not specified, the specified number of Tablets are pseudo-randomly selected from each partition.
* If seek is specified, it will be selected sequentially from the seek tablet of the partition.
* And add the manually specified Tablet id to the selected Tablet.
* simpleTabletNums = simpleRows / partitionNums / (partitionRows / partitionTabletNums)
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
long sampleRows; // The total number of sample rows
long hitRows = 1; // The total number of rows hit by the tablet
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
}
// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);
for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();
if (ids.isEmpty()) {
continue;
}
// Skip partitions with row count < row count / 2 expected to be sampled per partition.
// It can be expected to sample a smaller number of partitions to avoid uneven distribution
// of sampling results.
if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
continue;
}
long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / ids.size(), 1);
long tabletCounts = Math.max(
avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
tabletCounts = Math.min(tabletCounts, ids.size());
long seek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (Math.random() * ids.size());
for (int i = 0; i < tabletCounts; i++) {
int seekTid = (int) ((i + seek) % ids.size());
sampleTabletIds.add(ids.get(seekTid));
}
hitRows += avgRowsPerTablet * tabletCounts;
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
}
// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
}
private void computeTabletInfo() throws UserException {
/**
* The tablet info could be computed only once.
* So the scanBackendIds should be empty in the beginning.
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
final List<Tablet> tablets = Lists.newArrayList();
final Collection<Long> tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo());
LOG.debug("distribution prune tablets: {}", tabletIds);
if (sampleTabletIds.size() != 0) {
tabletIds.retainAll(sampleTabletIds);
LOG.debug("after sample tablets: {}", tabletIds);
}
List<Long> allTabletIds = selectedTable.getTabletIdsInOrder();
if (tabletIds != null) {
for (Long id : tabletIds) {
tablets.add(selectedTable.getTablet(id));
}
scanTabletIds.addAll(tabletIds);
} else {
tablets.addAll(selectedTable.getTablets());
scanTabletIds.addAll(allTabletIds);
}
for (int i = 0; i < allTabletIds.size(); i++) {
tabletId2BucketSeq.put(allTabletIds.get(i), i);
}
totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
addScanRangeLocations(partition, tablets);
}
}
/**
* Check Parent sort node can push down to child olap scan.
*/
public boolean checkPushSort(SortNode sortNode) {
// Ensure all isAscOrder is same, ande length != 0.
// Can't be zorder.
if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1
|| olapTable.isZOrderSort()) {
return false;
}
// Tablet's order by key only can be the front part of schema.
// Like: schema: a.b.c.d.e.f.g order by key: a.b.c (no a,b,d)
// Do **prefix match** to check if order by key can be pushed down.
// olap order by key: a.b.c.d
// sort key: (a) (a,b) (a,b,c) (a,b,c,d) is ok
// (a,c) (a,c,d), (a,c,b) (a,c,f) (a,b,c,d,e)is NOT ok
List<Expr> sortExprs = sortNode.getSortInfo().getMaterializedOrderingExprs();
if (sortExprs.size() > olapTable.getDataSortInfo().getColNum()) {
return false;
}
for (int i = 0; i < sortExprs.size(); i++) {
// table key.
Column tableKey = olapTable.getFullSchema().get(i);
// sort slot.
Expr sortExpr = sortExprs.get(i);
if (!(sortExpr instanceof SlotRef) || !tableKey.equals(((SlotRef) sortExpr).getColumn())) {
return false;
}
}
return true;
}
/**
* We query Palo Meta to get request's data location
* extra result info will pass to backend ScanNode
*/
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return result;
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
long selectedIndexIdForExplain = selectedIndexId;
if (selectedIndexIdForExplain == -1) {
// If there is no data in table, the selectedIndexId will be -1, set it to base index id,
// so that to avoid "null" in explain result.
selectedIndexIdForExplain = olapTable.getBaseIndexId();
}
String indexName = olapTable.getIndexNameById(selectedIndexIdForExplain);
output.append(prefix).append("TABLE: ").append(olapTable.getQualifiedName())
.append("(").append(indexName).append(")");
if (detailLevel == TExplainLevel.BRIEF) {
output.append("\n").append(prefix).append(String.format("cardinality=%,d", cardinality));
if (!runtimeFilters.isEmpty()) {
output.append("\n").append(prefix).append("Apply RFs: ");
output.append(getRuntimeFilterExplainString(false, true));
}
if (!conjuncts.isEmpty()) {
output.append("\n").append(prefix).append("PREDICATES: ").append(conjuncts.size()).append("\n");
}
return output.toString();
}
if (isPreAggregation) {
output.append(", PREAGGREGATION: ON");
} else {
output.append(", PREAGGREGATION: OFF. Reason: ").append(reasonOfPreAggregation);
}
output.append("\n");
if (sortColumn != null) {
output.append(prefix).append("SORT COLUMN: ").append(sortColumn).append("\n");
}
if (sortInfo != null) {
output.append(prefix).append("SORT INFO:\n");
sortInfo.getMaterializedOrderingExprs().forEach(expr -> {
output.append(prefix).append(prefix).append(expr.toSql()).append("\n");
});
}
if (sortLimit != -1) {
output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n");
}
if (!conjuncts.isEmpty()) {
output.append(prefix).append("PREDICATES: ").append(getExplainString(conjuncts)).append("\n");
}
if (!runtimeFilters.isEmpty()) {
output.append(prefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(false));
}
output.append(prefix).append(String.format("partitions=%s/%s, tablets=%s/%s", selectedPartitionNum,
olapTable.getPartitions().size(), selectedTabletsNum, totalTabletsNum));
// We print up to 3 tablet, and we print "..." if the number is more than 3
if (scanTabletIds.size() > 3) {
List<Long> firstTenTabletIds = scanTabletIds.subList(0, 3);
output.append(String.format(", tabletList=%s ...", Joiner.on(",").join(firstTenTabletIds)));
} else {
output.append(String.format(", tabletList=%s", Joiner.on(",").join(scanTabletIds)));
}
output.append("\n");
output.append(prefix).append(String.format("cardinality=%s", cardinality))
.append(String.format(", avgRowSize=%s", avgRowSize)).append(String.format(", numNodes=%s", numNodes));
output.append("\n");
return output.toString();
}
@Override
public int getNumInstances() {
return result.size();
}
@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();
List<TPrimitiveType> keyColumnTypes = new ArrayList<TPrimitiveType>();
List<TColumn> columnsDesc = new ArrayList<TColumn>();
if (selectedIndexId != -1) {
for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) {
TColumn tColumn = col.toThrift();
col.setIndexFlag(tColumn, olapTable);
columnsDesc.add(tColumn);
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
keyColumnNames.add(col.getName());
keyColumnTypes.add(col.getDataType().toThrift());
}
}
}
msg.node_type = TPlanNodeType.OLAP_SCAN_NODE;
msg.olap_scan_node = new TOlapScanNode(desc.getId().asInt(), keyColumnNames, keyColumnTypes, isPreAggregation);
msg.olap_scan_node.setColumnsDesc(columnsDesc);
if (null != sortColumn) {
msg.olap_scan_node.setSortColumn(sortColumn);
}
if (sortInfo != null) {
TSortInfo tSortInfo = new TSortInfo(
Expr.treesToThrift(sortInfo.getOrderingExprs()),
sortInfo.getIsAscOrder(),
sortInfo.getNullsFirst());
if (sortInfo.getSortTupleSlotExprs() != null) {
tSortInfo.setSortTupleSlotExprs(Expr.treesToThrift(sortInfo.getSortTupleSlotExprs()));
}
msg.olap_scan_node.setSortInfo(tSortInfo);
}
if (sortLimit != -1) {
msg.olap_scan_node.setSortLimit(sortLimit);
}
msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
msg.olap_scan_node.setTableName(olapTable.getName());
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
if (pushDownAggNoGroupingOp != null) {
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
}
}
// export some tablets
public static OlapScanNode createOlapScanNodeByLocation(
PlanNodeId id, TupleDescriptor desc, String planNodeName, List<TScanRangeLocations> locationsList) {
OlapScanNode olapScanNode = new OlapScanNode(id, desc, planNodeName);
olapScanNode.numInstances = 1;
olapScanNode.selectedIndexId = olapScanNode.olapTable.getBaseIndexId();
olapScanNode.selectedPartitionNum = 1;
olapScanNode.selectedTabletsNum = 1;
olapScanNode.totalTabletsNum = 1;
olapScanNode.setIsPreAggregation(false, "Export job");
olapScanNode.result.addAll(locationsList);
return olapScanNode;
}
public void collectColumns(Analyzer analyzer, Set<String> equivalenceColumns, Set<String> unequivalenceColumns) {
// 1. Get columns which has predicate on it.
for (Expr expr : conjuncts) {
if (!isPredicateUsedForPrefixIndex(expr, false)) {
continue;
}
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
if (expr.isBound(slot.getId())) {
if (!isEquivalenceExpr(expr)) {
unequivalenceColumns.add(slot.getColumn().getName());
} else {
equivalenceColumns.add(slot.getColumn().getName());
}
break;
}
}
}
// 2. Equal join predicates when pushing inner child.
List<Expr> eqJoinPredicate = analyzer.getEqJoinConjuncts(desc.getId());
for (Expr expr : eqJoinPredicate) {
if (!isPredicateUsedForPrefixIndex(expr, true)) {
continue;
}
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
Preconditions.checkState(expr.getChildren().size() == 2);
for (Expr child : expr.getChildren()) {
if (child.isBound(slot.getId())) {
equivalenceColumns.add(slot.getColumn().getName());
break;
}
}
}
}
}
public TupleId getTupleId() {
Preconditions.checkNotNull(desc);
return desc.getId();
}
private boolean isEquivalenceExpr(Expr expr) {
if (expr instanceof InPredicate) {
return true;
}
if (expr instanceof BinaryPredicate) {
final BinaryPredicate predicate = (BinaryPredicate) expr;
if (predicate.getOp().isEquivalence()) {
return true;
}
}
return false;
}
private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) {
if (!(expr instanceof InPredicate)
&& !(expr instanceof BinaryPredicate)) {
return false;
}
if (expr instanceof InPredicate) {
return isInPredicateUsedForPrefixIndex((InPredicate) expr);
} else if (expr instanceof BinaryPredicate) {
if (isJoinConjunct) {
return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate) expr);
} else {
return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate) expr);
}
}
return true;
}
private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) {
Preconditions.checkArgument(expr.getOp().isEquivalence());
if (expr.isAuxExpr()) {
return false;
}
for (Expr child : expr.getChildren()) {
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
if (child.isBound(slot.getId()) && isSlotRefNested(child)) {
return true;
}
}
}
return false;
}
private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) {
if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) {
return false;
}
return (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant())
|| (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant());
}
private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) {
if (expr.isNotIn()) {
return false;
}
return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren();
}
private boolean isSlotRefNested(Expr expr) {
while (expr instanceof CastExpr) {
expr = expr.getChild(0);
}
return expr instanceof SlotRef;
}
private void filterDeletedRows(Analyzer analyzer) throws AnalysisException {
if (!Util.showHiddenColumns() && olapTable.hasDeleteSign() && !ConnectContext.get().getSessionVariable()
.skipDeleteSign()) {
SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN);
deleteSignSlot.analyze(analyzer);
deleteSignSlot.getDesc().setIsMaterialized(true);
Expr conjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, deleteSignSlot, new IntLiteral(0));
conjunct.analyze(analyzer);
conjuncts.add(conjunct);
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
closePreAggregation(Column.DELETE_SIGN + " is used as conjuncts.");
}
}
}
/*
* Although sometimes the scan range only involves one instance,
* the data distribution cannot be set to UNPARTITIONED here.
* The reason is that @coordinator will not set the scan range for the fragment,
* when data partition of fragment is UNPARTITIONED.
*/
public DataPartition constructInputPartitionByDistributionInfo() throws UserException {
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
if ((colocateTableIndex.isColocateTable(olapTable.getId())
&& !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())))
|| olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED
|| olapTable.getPartitions().size() == 1) {
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
if (!(distributionInfo instanceof HashDistributionInfo)) {
return DataPartition.RANDOM;
}
List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
List<Expr> dataDistributeExprs = Lists.newArrayList();
for (Column column : distributeColumns) {
SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName());
dataDistributeExprs.add(slotRef);
}
return DataPartition.hashPartitioned(dataDistributeExprs);
} else {
return DataPartition.RANDOM;
}
}
@VisibleForTesting
public String getReasonOfPreAggregation() {
return reasonOfPreAggregation;
}
@VisibleForTesting
public String getSelectedIndexName() {
return olapTable.getIndexNameById(selectedIndexId);
}
}