blob: e8f761a8ef7280441ace7a45cd3c8732a41999bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.execute;
import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
import static org.apache.phoenix.util.NumberUtil.add;
import static org.apache.phoenix.util.NumberUtil.getMin;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.*;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PArrayDataType;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.SQLCloseables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.phoenix.util.ServerUtil;
public class HashJoinPlan extends DelegateQueryPlan {
private static final Logger LOGGER = LoggerFactory.getLogger(HashJoinPlan.class);
private static final Random RANDOM = new Random();
private final SelectStatement statement;
private final HashJoinInfo joinInfo;
private final SubPlan[] subPlans;
private final boolean recompileWhereClause;
private final Set<TableRef> tableRefs;
private final int maxServerCacheTimeToLive;
private final long serverCacheLimit;
private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
private HashCacheClient hashClient;
private AtomicLong firstJobEndTime;
private List<Expression> keyRangeExpressions;
private Long estimatedRows;
private Long estimatedBytes;
private Long estimateInfoTs;
private boolean getEstimatesCalled;
private boolean hasSubPlansWithPersistentCache;
public static HashJoinPlan create(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
if (!(plan instanceof HashJoinPlan))
return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<ImmutableBytesPtr,ServerCache>emptyMap());
HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan);
SubPlan[] mergedSubPlans = new SubPlan[hashJoinPlan.subPlans.length + subPlans.length];
int i = 0;
for (SubPlan subPlan : hashJoinPlan.subPlans) {
mergedSubPlans[i++] = subPlan;
}
for (SubPlan subPlan : subPlans) {
mergedSubPlans[i++] = subPlan;
}
return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true, hashJoinPlan.dependencies);
}
private HashJoinPlan(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, Map<ImmutableBytesPtr,ServerCache> dependencies) throws SQLException {
super(plan);
this.dependencies.putAll(dependencies);
this.statement = statement;
this.joinInfo = joinInfo;
this.subPlans = subPlans;
this.recompileWhereClause = recompileWhereClause;
this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size());
this.tableRefs.addAll(plan.getSourceRefs());
this.hasSubPlansWithPersistentCache = false;
for (SubPlan subPlan : subPlans) {
tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
if (subPlan instanceof HashSubPlan && ((HashSubPlan)subPlan).usePersistentCache) {
this.hasSubPlansWithPersistentCache = true;
}
}
QueryServices services = plan.getContext().getConnection().getQueryServices();
this.maxServerCacheTimeToLive = services.getProps().getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.serverCacheLimit = services.getProps().getLong(
QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
}
@Override
public Set<TableRef> getSourceRefs() {
return tableRefs;
}
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
if (scan == null) {
scan = delegate.getContext().getScan();
}
int count = subPlans.length;
PhoenixConnection connection = getContext().getConnection();
ConnectionQueryServices services = connection.getQueryServices();
ExecutorService executor = services.getExecutor();
List<Future<ServerCache>> futures = Lists.newArrayListWithExpectedSize(count);
if (joinInfo != null) {
hashClient = hashClient != null ?
hashClient
: new HashCacheClient(delegate.getContext().getConnection());
firstJobEndTime = new AtomicLong(0);
keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
}
for (int i = 0; i < count; i++) {
final int index = i;
futures.add(executor.submit(new JobCallable<ServerCache>() {
@Override
public ServerCache call() throws Exception {
ServerCache cache = subPlans[index].execute(HashJoinPlan.this);
return cache;
}
@Override
public Object getJobId() {
return HashJoinPlan.this;
}
@Override
public TaskExecutionMetricsHolder getTaskExecutionMetric() {
return NO_OP_INSTANCE;
}
}));
}
SQLException firstException = null;
for (int i = 0; i < count; i++) {
try {
ServerCache result = futures.get(i).get();
if (result != null) {
dependencies.put(new ImmutableBytesPtr(result.getId()),result);
}
subPlans[i].postProcess(result, this);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (firstException == null) {
firstException = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).setMessage("Sub plan [" + i + "] execution interrupted.").build().buildException();
}
} catch (ExecutionException e) {
if (firstException == null) {
firstException = new SQLException("Encountered exception in sub plan [" + i + "] execution.",
e.getCause());
}
}
}
if (firstException != null) {
SQLCloseables.closeAllQuietly(dependencies.values());
throw firstException;
}
Expression postFilter = null;
boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
if (recompileWhereClause || hasKeyRangeExpressions) {
StatementContext context = delegate.getContext();
// Since we are going to compile the WHERE conditions all over again, we will clear
// the old filter, otherwise there would be conflicts and would cause PHOENIX-4692.
context.getScan().setFilter(null);
PTable table = context.getCurrentTable().getTable();
ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection()));
if (recompileWhereClause) {
postFilter = WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, null);
}
if (hasKeyRangeExpressions) {
WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, null);
}
}
if (joinInfo != null) {
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
}
ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper, scan) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper, scan);
if (statement.getInnerSelectStatement() != null && postFilter != null) {
iterator = new FilterResultIterator(iterator, postFilter);
}
if (hasSubPlansWithPersistentCache) {
return peekForPersistentCache(iterator, scanGrouper, scan);
} else {
return iterator;
}
}
private ResultIterator peekForPersistentCache(ResultIterator iterator, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
// The persistent subquery is optimistic and assumes caches are present on region
// servers. We verify that this is the case by peeking at one result. If there is
// a cache missing exception, we retry the query with the persistent cache disabled
// for that specific cache ID.
PeekingResultIterator peeking = LookAheadResultIterator.wrap(iterator);
try {
peeking.peek();
} catch (Exception e) {
try {
throw ServerUtil.parseServerException(e);
} catch (HashJoinCacheNotFoundException e2) {
Long cacheId = e2.getCacheId();
if (delegate.getContext().getRetryingPersistentCache(cacheId)) {
throw e2;
}
delegate.getContext().setRetryingPersistentCache(cacheId);
return iterator(scanGrouper, scan);
}
}
return peeking;
}
private Expression createKeyRangeExpression(Expression lhsExpression,
Expression rhsExpression, List<Expression> rhsValues,
ImmutableBytesWritable ptr, boolean rowKeyOrderOptimizable) throws SQLException {
if (rhsValues.isEmpty())
return LiteralExpression.newConstant(false, PBoolean.INSTANCE, Determinism.ALWAYS);
rhsValues.add(0, lhsExpression);
return InListExpression.create(rhsValues, false, ptr, rowKeyOrderOptimizable);
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
int count = subPlans.length;
for (int i = 0; i < count; i++) {
planSteps.addAll(subPlans[i].getPreSteps(this));
}
for (int i = 0; i < count; i++) {
planSteps.addAll(subPlans[i].getPostSteps(this));
}
if (joinInfo != null && joinInfo.getPostJoinFilterExpression() != null) {
planSteps.add(" AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
}
if (joinInfo != null && joinInfo.getLimit() != null) {
planSteps.add(" JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
}
return new ExplainPlan(planSteps);
}
@Override
public FilterableStatement getStatement() {
return statement;
}
public HashJoinInfo getJoinInfo() {
return joinInfo;
}
public SubPlan[] getSubPlans() {
return subPlans;
}
@Override
public <T> T accept(QueryPlanVisitor<T> visitor) {
return visitor.visit(this);
}
@Override
public Cost getCost() {
try {
Long r = delegate.getEstimatedRowsToScan();
Double w = delegate.accept(new AvgRowWidthVisitor());
if (r == null || w == null) {
return Cost.UNKNOWN;
}
int parallelLevel = CostUtil.estimateParallelLevel(
true, getContext().getConnection().getQueryServices());
double rowWidth = w;
double rows = RowCountVisitor.filter(
r.doubleValue(),
RowCountVisitor.stripSkipScanFilter(
delegate.getContext().getScan().getFilter()));
double bytes = rowWidth * rows;
Cost cost = Cost.ZERO;
double rhsByteSum = 0.0;
for (int i = 0; i < subPlans.length; i++) {
double lhsBytes = bytes;
Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor());
Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor());
if (rhsRows == null || rhsWidth == null) {
return Cost.UNKNOWN;
}
double rhsBytes = rhsWidth * rhsRows;
rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]);
rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]);
bytes = rowWidth * rows;
cost = cost.plus(CostUtil.estimateHashJoinCost(
lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel));
rhsByteSum += rhsBytes;
}
if (rhsByteSum > serverCacheLimit) {
return Cost.UNKNOWN;
}
// Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan
if (delegate instanceof AggregatePlan) {
AggregatePlan aggPlan = (AggregatePlan) delegate;
double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy());
double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving());
double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
double bytesAfterHaving = rowWidth * rowsAfterHaving;
Cost aggCost = CostUtil.estimateAggregateCost(
bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel);
cost = cost.plus(aggCost);
rows = rowsAfterHaving;
bytes = bytesAfterHaving;
}
double outputRows = RowCountVisitor.limit(rows, delegate.getLimit());
double outputBytes = rowWidth * outputRows;
if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) {
int parallelLevel2 = CostUtil.estimateParallelLevel(
delegate instanceof ScanPlan, getContext().getConnection().getQueryServices());
Cost orderByCost = CostUtil.estimateOrderByCost(
bytes, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
// Calculate the cost of child nodes
Cost lhsCost = new Cost(0, 0, r.doubleValue() * w);
Cost rhsCost = Cost.ZERO;
for (SubPlan subPlan : subPlans) {
rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
}
return cost.plus(lhsCost).plus(rhsCost);
} catch (SQLException e) {
}
return Cost.UNKNOWN;
}
public interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
public QueryPlan getInnerPlan();
public boolean hasKeyRangeExpression();
}
public static class WhereClauseSubPlan implements SubPlan {
private final QueryPlan plan;
private final SelectStatement select;
private final boolean expectSingleRow;
public WhereClauseSubPlan(QueryPlan plan, SelectStatement select, boolean expectSingleRow) {
this.plan = plan;
this.select = select;
this.expectSingleRow = expectSingleRow;
}
@Override
public ServerCache execute(HashJoinPlan parent) throws SQLException {
List<Object> values = Lists.<Object> newArrayList();
ResultIterator iterator = plan.iterator();
try {
RowProjector projector = plan.getProjector();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
int columnCount = projector.getColumnCount();
int rowCount = 0;
PDataType baseType = PVarbinary.INSTANCE;
for (Tuple tuple = iterator.next(); tuple != null; tuple = iterator.next()) {
if (expectSingleRow && rowCount >= 1)
throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
if (columnCount == 1) {
ColumnProjector columnProjector = projector.getColumnProjector(0);
baseType = columnProjector.getExpression().getDataType();
Object value = columnProjector.getValue(tuple, baseType, ptr);
values.add(value);
} else {
List<Expression> expressions = Lists.<Expression>newArrayListWithExpectedSize(columnCount);
for (int i = 0; i < columnCount; i++) {
ColumnProjector columnProjector = projector.getColumnProjector(i);
PDataType type = columnProjector.getExpression().getDataType();
Object value = columnProjector.getValue(tuple, type, ptr);
expressions.add(LiteralExpression.newConstant(value, type));
}
Expression expression = new RowValueConstructorExpression(expressions, true);
baseType = expression.getDataType();
expression.evaluate(null, ptr);
values.add(baseType.toObject(ptr));
}
rowCount++;
}
Object result = expectSingleRow ? (values.isEmpty() ? null : values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, values.toArray());
parent.getContext().setSubqueryResult(select, result);
return null;
} finally {
iterator.close();
}
}
@Override
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException {
}
@Override
public List<String> getPreSteps(HashJoinPlan parent) throws SQLException {
List<String> steps = Lists.newArrayList();
steps.add(" EXECUTE " + (expectSingleRow ? "SINGLE" : "MULTIPLE") + "-ROW SUBQUERY");
for (String step : plan.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
return steps;
}
@Override
public List<String> getPostSteps(HashJoinPlan parent) throws SQLException {
return Collections.<String>emptyList();
}
@Override
public QueryPlan getInnerPlan() {
return plan;
}
@Override
public boolean hasKeyRangeExpression() {
return false;
}
}
public static class HashSubPlan implements SubPlan {
private final int index;
private final QueryPlan plan;
private final List<Expression> hashExpressions;
private final boolean singleValueOnly;
private final boolean usePersistentCache;
private final Expression keyRangeLhsExpression;
private final Expression keyRangeRhsExpression;
private final MessageDigest digest;
public HashSubPlan(int index, QueryPlan subPlan,
List<Expression> hashExpressions,
boolean singleValueOnly,
boolean usePersistentCache,
Expression keyRangeLhsExpression,
Expression keyRangeRhsExpression) {
this.index = index;
this.plan = subPlan;
this.hashExpressions = hashExpressions;
this.singleValueOnly = singleValueOnly;
this.usePersistentCache = usePersistentCache;
this.keyRangeLhsExpression = keyRangeLhsExpression;
this.keyRangeRhsExpression = keyRangeRhsExpression;
try {
this.digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
@Override
public ServerCache execute(HashJoinPlan parent) throws SQLException {
ScanRanges ranges = parent.delegate.getContext().getScanRanges();
List<Expression> keyRangeRhsValues = null;
if (keyRangeRhsExpression != null) {
keyRangeRhsValues = Lists.<Expression>newArrayList();
}
ServerCache cache = null;
if (hashExpressions != null) {
ResultIterator iterator = plan.iterator();
try {
final byte[] cacheId;
String queryString = plan.getStatement().toString().replaceAll("\\$[0-9]+", "\\$");
if (usePersistentCache) {
cacheId = Arrays.copyOfRange(digest.digest(queryString.getBytes()), 0, 8);
boolean retrying = parent.delegate.getContext().getRetryingPersistentCache(Bytes.toLong(cacheId));
if (!retrying) {
try {
cache = parent.hashClient.createServerCache(cacheId, parent.delegate);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
cacheId = Bytes.toBytes(RANDOM.nextLong());
}
LOGGER.debug("Using cache ID " + Hex.encodeHexString(cacheId) +
" for " + queryString);
if (cache == null) {
LOGGER.debug("Making RPC to add cache " + Hex.encodeHexString(cacheId));
cache = parent.hashClient.addHashCache(ranges, cacheId, iterator,
plan.getEstimatedSize(), hashExpressions, singleValueOnly, usePersistentCache,
parent.delegate.getTableRef().getTable(), keyRangeRhsExpression,
keyRangeRhsValues);
long endTime = System.currentTimeMillis();
boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
if (!isSet && (endTime
- parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
LOGGER.warn(addCustomAnnotations(
"Hash plan [" + index
+ "] execution seems too slow. Earlier" +
" hash cache(s) might have expired on servers.",
parent.delegate.getContext().getConnection()));
}
}
} finally {
iterator.close();
}
} else {
assert (keyRangeRhsExpression != null);
ResultIterator iterator = plan.iterator();
try {
for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
// Evaluate key expressions for hash join key range optimization.
keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(
keyRangeRhsExpression, result, plan.getContext().getTempPtr()));
}
} finally {
iterator.close();
}
}
if (keyRangeRhsValues != null) {
parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression, keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable()));
}
return cache;
}
@Override
public void postProcess(ServerCache result, HashJoinPlan parent)
throws SQLException {
ServerCache cache = result;
if (cache != null) {
parent.joinInfo.getJoinIds()[index].set(cache.getId());
}
}
@Override
public List<String> getPreSteps(HashJoinPlan parent) throws SQLException {
List<String> steps = Lists.newArrayList();
boolean earlyEvaluation = parent.joinInfo.earlyEvaluation()[index];
boolean skipMerge = parent.joinInfo.getSchemas()[index].getFieldCount() == 0;
if (hashExpressions != null) {
steps.add(" PARALLEL " + parent.joinInfo.getJoinTypes()[index].toString().toUpperCase()
+ "-JOIN TABLE " + index + (earlyEvaluation ? "" : "(DELAYED EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
}
else {
steps.add(" SKIP-SCAN-JOIN TABLE " + index);
}
for (String step : plan.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
return steps;
}
@Override
public List<String> getPostSteps(HashJoinPlan parent) throws SQLException {
if (keyRangeLhsExpression == null)
return Collections.<String> emptyList();
String step = " DYNAMIC SERVER FILTER BY " + keyRangeLhsExpression.toString()
+ " IN (" + keyRangeRhsExpression.toString() + ")";
return Collections.<String> singletonList(step);
}
@Override
public QueryPlan getInnerPlan() {
return plan;
}
@Override
public boolean hasKeyRangeExpression() {
return keyRangeLhsExpression != null;
}
}
@Override
public Long getEstimatedRowsToScan() throws SQLException {
if (!getEstimatesCalled) {
getEstimates();
}
return estimatedRows;
}
@Override
public Long getEstimatedBytesToScan() throws SQLException {
if (!getEstimatesCalled) {
getEstimates();
}
return estimatedBytes;
}
@Override
public Long getEstimateInfoTimestamp() throws SQLException {
if (!getEstimatesCalled) {
getEstimates();
}
return estimateInfoTs;
}
private void getEstimates() throws SQLException {
getEstimatesCalled = true;
for (SubPlan subPlan : subPlans) {
if (subPlan.getInnerPlan().getEstimatedBytesToScan() == null
|| subPlan.getInnerPlan().getEstimatedRowsToScan() == null
|| subPlan.getInnerPlan().getEstimateInfoTimestamp() == null) {
/*
* If any of the sub plans doesn't have the estimate info available, then we don't
* provide estimate for the overall plan
*/
estimatedBytes = null;
estimatedRows = null;
estimateInfoTs = null;
break;
} else {
estimatedBytes =
add(estimatedBytes, subPlan.getInnerPlan().getEstimatedBytesToScan());
estimatedRows = add(estimatedRows, subPlan.getInnerPlan().getEstimatedRowsToScan());
estimateInfoTs =
getMin(estimateInfoTs, subPlan.getInnerPlan().getEstimateInfoTimestamp());
}
}
}
}