blob: d95cf273e2ce300ec21201c52953f1c96d214be0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.cube.parse;
import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode;
import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode;
import static org.apache.lens.cube.parse.CandidateTablePruneCause.timeDimNotSupported;
import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange;
import static org.apache.lens.cube.parse.StorageUtil.joinWithAnd;
import static org.apache.lens.cube.parse.StorageUtil.processCubeColForDataCompleteness;
import static org.apache.lens.cube.parse.StorageUtil.processExpressionsForCompleteness;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.lens.cube.metadata.AbstractCubeTable;
import org.apache.lens.cube.metadata.CubeFactTable;
import org.apache.lens.cube.metadata.CubeInterface;
import org.apache.lens.cube.metadata.CubeMetastoreClient;
import org.apache.lens.cube.metadata.DateUtil;
import org.apache.lens.cube.metadata.Dimension;
import org.apache.lens.cube.metadata.FactPartition;
import org.apache.lens.cube.metadata.MetastoreUtil;
import org.apache.lens.cube.metadata.TimeRange;
import org.apache.lens.cube.metadata.UpdatePeriod;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.metastore.DataCompletenessChecker;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.antlr.runtime.CommonToken;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Represents a fact on a storage table and the dimensions it needs to be joined with to answer the query
*/
@Slf4j
public class StorageCandidate implements Candidate, CandidateTable {
// TODO union : Put comments on member variables.
@Getter
private final CubeQueryContext cubeql;
private final String processTimePartCol;
private final CubeMetastoreClient client;
private final String completenessPartCol;
private final float completenessThreshold;
/**
* Name of this storage candidate = storageName_factName
*/
@Getter
@Setter
private String name;
/**
* This is the storage table specific name. It is used while generating query from this candidate
*/
@Setter
private String resolvedName;
/**
* Valid update periods populated by Phase 1.
*/
@Getter
private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>();
/**
* These are the update periods that finally participate in partitions.
* @see #getParticipatingPartitions()
*/
@Getter
private TreeSet<UpdatePeriod> participatingUpdatePeriods = new TreeSet<>();
@Getter
@Setter
Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause;
private Configuration conf = null;
/**
* This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data.
* Value is a map of date string and %completeness.
*/
@Getter
private Map<String, Map<String, Float>> dataCompletenessMap = new HashMap<>();
private SimpleDateFormat partWhereClauseFormat = null;
/**
* Participating fact, storage and dimensions for this StorageCandidate
*/
@Getter
private CubeFactTable fact;
@Getter
private String storageName;
@Getter
private String storageTable;
@Getter
@Setter
private QueryAST queryAst;
@Getter
private Map<TimeRange, Set<FactPartition>> rangeToPartitions = new LinkedHashMap<>();
@Getter
private Map<TimeRange, String> rangeToExtraWhereFallBack = new LinkedHashMap<>();
@Getter
@Setter
private String whereString;
@Getter
private Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet();
@Getter
@Setter
private String fromString;
@Getter
private CubeInterface cube;
@Getter
private Map<Dimension, CandidateDim> dimsToQuery;
@Getter
private Date startTime;
@Getter
private Date endTime;
/**
* Cached fact columns
*/
private Collection<String> factColumns;
/**
* Non existing partitions
*/
@Getter
private Set<String> nonExistingPartitions = new HashSet<>();
@Getter
private int numQueriedParts = 0;
/**
* This will be true if this storage candidate has multiple storage tables (one per update period)
* https://issues.apache.org/jira/browse/LENS-1386
*/
@Getter
private boolean isStorageTblsAtUpdatePeriodLevel;
public StorageCandidate(StorageCandidate sc) throws LensException {
this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeql());
this.validUpdatePeriods.addAll(sc.getValidUpdatePeriods());
this.whereString = sc.whereString;
this.fromString = sc.fromString;
this.dimsToQuery = sc.dimsToQuery;
this.factColumns = sc.factColumns;
this.answerableMeasurePhraseIndices.addAll(sc.answerableMeasurePhraseIndices);
if (sc.getQueryAst() != null) {
this.queryAst = new DefaultQueryAST();
CandidateUtil.copyASTs(sc.getQueryAst(), new DefaultQueryAST());
}
for (Map.Entry<TimeRange, Set<FactPartition>> entry : sc.getRangeToPartitions().entrySet()) {
rangeToPartitions.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
}
this.rangeToExtraWhereFallBack = sc.rangeToExtraWhereFallBack;
this.answerableMeasurePhraseIndices = sc.answerableMeasurePhraseIndices;
}
public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, CubeQueryContext cubeql)
throws LensException {
if ((cube == null) || (fact == null) || (storageName == null)) {
throw new IllegalArgumentException("Cube,fact and storageName should be non null");
}
this.cube = cube;
this.fact = fact;
this.cubeql = cubeql;
this.storageName = storageName;
this.storageTable = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName);
this.conf = cubeql.getConf();
this.name = fact.getName();
this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL);
String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
if (formatStr != null) {
this.partWhereClauseFormat = new SimpleDateFormat(formatStr);
}
completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
completenessThreshold = conf
.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
client = cubeql.getMetastoreClient();
Set<String> storageTblNames = client.getStorageTables(fact.getName(), storageName);
if (storageTblNames.size() > 1) {
isStorageTblsAtUpdatePeriodLevel = true;
} else {
//if this.storageTable is equal to the storage table name it implies isStorageTblsAtUpdatePeriodLevel is false
isStorageTblsAtUpdatePeriodLevel = !storageTblNames.iterator().next().equalsIgnoreCase(storageTable);
}
setStorageStartAndEndDate();
}
/**
* Sets Storage candidates start and end time based on underlying storage-tables
*
* CASE 1
* If has Storage has single storage table*
* Storage start time = max(storage start time , fact start time)
* Storage end time = min(storage end time , fact start time)
*
* CASE 2
* If the Storage has multiple Storage Tables (one per update period)*
* update Period start Time = Max(update start time, fact start time)
* update Period end Time = Min(update end time, fact end time)
* Stoarge start and end time is derived form the underlying update period start and end times.
* Storage start time = min(update1 start time ,...., updateN start time)
* Storage end time = max(update1 end time ,...., updateN end time)
*
* Note in Case 2 its assumed that the time range supported by different update periods are either
* overlapping(Example 2) or form a non overlapping but continuous chain(Example 1) as illustrated
* in examples below
*
* Example 1
* A Storage has 2 Non Oevralpping but continuous Update Periods.
* MONTHLY with start time as now.month -13 months and end time as now.month -2months and
* DAILY with start time as now.day and end time as now.month -2months
* Then this Sorage will have an implied start time as now.month -13 month and end time as now.day
*
* Example 2
* A Storage has 2 Overlapping Update Periods.
* MONTHLY with start time as now.month -13 months and end time as now.month -1months and
* DAILY with start time as now.day and end time as now.month -2months
* Then this Sorage will have an implied start time as now.month -13 month and end time as now.day
*
* @throws LensException
*/
public void setStorageStartAndEndDate() throws LensException {
if (this.startTime != null && !this.isStorageTblsAtUpdatePeriodLevel) {
//If the times are already set and are not dependent of update period, no point setting times again.
return;
}
List<Date> startDates = new ArrayList<>();
List<Date> endDates = new ArrayList<>();
for (String storageTablePrefix : getValidStorageTableNames()) {
startDates.add(client.getStorageTableStartDate(storageTablePrefix, fact.getName()));
endDates.add(client.getStorageTableEndDate(storageTablePrefix, fact.getName()));
}
this.startTime = Collections.min(startDates);
this.endTime = Collections.max(endDates);
}
private Set<String> getValidStorageTableNames() throws LensException {
if (!validUpdatePeriods.isEmpty()) {
// In this case skip invalid update periods and get storage tables only for valid ones.
Set<String> uniqueStorageTables = new HashSet<>();
for (UpdatePeriod updatePeriod : validUpdatePeriods) {
uniqueStorageTables.add(client.getStorageTableName(fact.getName(), storageName, updatePeriod));
}
return uniqueStorageTables;
} else {
//Get all storage tables.
return client.getStorageTables(fact.getName(), storageName);
}
}
private void setMissingExpressions(Set<Dimension> queriedDims) throws LensException {
setFromString(String.format("%s", getFromTable()));
setWhereString(joinWithAnd(
genWhereClauseWithDimPartitions(whereString, queriedDims), cubeql.getConf().getBoolean(
CubeQueryConfUtil.REPLACE_TIMEDIM_WITH_PART_COL, CubeQueryConfUtil.DEFAULT_REPLACE_TIMEDIM_WITH_PART_COL)
? getPostSelectionWhereClause() : null));
if (cubeql.getHavingAST() != null) {
queryAst.setHavingAST(MetastoreUtil.copyAST(cubeql.getHavingAST()));
}
}
private String genWhereClauseWithDimPartitions(String originalWhere, Set<Dimension> queriedDims) {
StringBuilder whereBuf;
if (originalWhere != null) {
whereBuf = new StringBuilder(originalWhere);
} else {
whereBuf = new StringBuilder();
}
// add where clause for all dimensions
if (cubeql != null) {
boolean added = (originalWhere != null);
for (Dimension dim : queriedDims) {
CandidateDim cdim = dimsToQuery.get(dim);
String alias = cubeql.getAliasForTableName(dim.getName());
if (!cdim.isWhereClauseAdded() && !StringUtils.isBlank(cdim.getWhereClause())) {
appendWhereClause(whereBuf, StorageUtil.getWhereClause(cdim, alias), added);
added = true;
}
}
}
if (whereBuf.length() == 0) {
return null;
}
return whereBuf.toString();
}
private static void appendWhereClause(StringBuilder filterCondition, String whereClause, boolean hasMore) {
// Make sure we add AND only when there are already some conditions in where
// clause
if (hasMore && !filterCondition.toString().isEmpty() && !StringUtils.isBlank(whereClause)) {
filterCondition.append(" AND ");
}
if (!StringUtils.isBlank(whereClause)) {
filterCondition.append("(");
filterCondition.append(whereClause);
filterCondition.append(")");
}
}
private String getPostSelectionWhereClause() throws LensException {
return null;
}
void setAnswerableMeasurePhraseIndices(int index) {
answerableMeasurePhraseIndices.add(index);
}
public String toHQL(Set<Dimension> queriedDims) throws LensException {
setMissingExpressions(queriedDims);
// Check if the picked candidate is a StorageCandidate and in that case
// update the selectAST with final alias.
if (this == cubeql.getPickedCandidate()) {
CandidateUtil.updateFinalAlias(queryAst.getSelectAST(), cubeql);
updateOrderByWithFinalAlias(queryAst.getOrderByAST(), queryAst.getSelectAST());
} else {
queryAst.setHavingAST(null);
}
return CandidateUtil
.buildHQLString(queryAst.getSelectString(), fromString, whereString, queryAst.getGroupByString(),
queryAst.getOrderByString(), queryAst.getHavingString(), queryAst.getLimitValue());
}
/**
* Update Orderby children with final alias used in select
*
* @param orderby Order by AST
* @param select Select AST
*/
private void updateOrderByWithFinalAlias(ASTNode orderby, ASTNode select) {
if (orderby == null) {
return;
}
for (Node orderbyNode : orderby.getChildren()) {
ASTNode orderBychild = (ASTNode) orderbyNode;
for (Node selectNode : select.getChildren()) {
ASTNode selectChild = (ASTNode) selectNode;
if (selectChild.getChildCount() == 2) {
if (HQLParser.getString((ASTNode) selectChild.getChild(0))
.equals(HQLParser.getString((ASTNode) orderBychild.getChild(0)))) {
ASTNode alias = new ASTNode((ASTNode) selectChild.getChild(1));
orderBychild.replaceChildren(0, 0, alias);
break;
}
}
}
}
}
@Override
public String getStorageString(String alias) {
return storageName + " " + alias;
}
@Override
public AbstractCubeTable getTable() {
return fact;
}
@Override
public AbstractCubeTable getBaseTable() {
return (AbstractCubeTable) cube;
}
@Override
public Collection<String> getColumns() {
if (factColumns == null) {
factColumns = fact.getValidColumns();
if (factColumns == null) {
factColumns = fact.getAllFieldNames();
}
}
return factColumns;
}
@Override
public double getCost() {
return fact.weight();
}
@Override
public boolean contains(Candidate candidate) {
return this.equals(candidate);
}
@Override
public Collection<Candidate> getChildren() {
return null;
}
private void updatePartitionStorage(FactPartition part) throws LensException {
try {
if (client.factPartitionExists(fact, part, storageTable)) {
part.getStorageTables().add(storageTable);
part.setFound(true);
}
} catch (HiveException e) {
log.warn("Hive exception while getting storage table partition", e);
}
}
/**
* Gets FactPartitions for the given fact using the following logic
*
* 1. Find the max update interval that will be used for the query. Lets assume time
* range is 15 Sep to 15 Dec and the fact has two storage with update periods as MONTHLY,DAILY,HOURLY.
* In this case the data for [15 sep - 1 oct)U[1 Dec - 15 Dec) will be answered by DAILY partitions
* and [1 oct - 1Dec) will be answered by MONTHLY partitions. The max interavl for this query will be MONTHLY.
*
* 2.Prune Storgaes that do not fall in the queries time range.
* {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)}
*
* 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for
* these two months.Check validity of FactPartitions for Oct and Nov
* via {@link #updatePartitionStorage(FactPartition)}.
* If the partition is missing, try getting partitions for the time range form other update periods (DAILY,HOURLY).
* This is achieved by calling getPartitions() recursively but passing only 2 update periods (DAILY,HOURLY)
*
* 4.If the monthly partitions are found, check for lookahead partitions and call getPartitions recursively for the
* remaining time intervals i.e, [15 sep - 1 oct) and [1 Dec - 15 Dec)
*
* TODO union : Move this into util.
*/
private boolean getPartitions(Date fromDate, Date toDate, String partCol, Set<FactPartition> partitions,
TreeSet<UpdatePeriod> updatePeriods, boolean addNonExistingParts, boolean failOnPartialData,
PartitionRangesForPartitionColumns missingPartitions) throws LensException {
if (fromDate.equals(toDate) || fromDate.after(toDate)) {
return true;
}
if (updatePeriods == null | updatePeriods.isEmpty()) {
return false;
}
UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
if (maxInterval == null) {
log.info("No max interval for range: {} to {}", fromDate, toDate);
return false;
}
if (maxInterval == UpdatePeriod.CONTINUOUS
&& cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) {
FactPartition part = new FactPartition(partCol, fromDate, maxInterval, null, partWhereClauseFormat);
partitions.add(part);
part.getStorageTables().add(storageName);
part = new FactPartition(partCol, toDate, maxInterval, null, partWhereClauseFormat);
partitions.add(part);
part.getStorageTables().add(storageName);
this.participatingUpdatePeriods.add(maxInterval);
log.info("Added continuous fact partition for storage table {}", storageName);
return true;
}
if (!client.partColExists(this.getFact().getName(), storageName, partCol)) {
log.info("{} does not exist in {}", partCol, storageTable);
return false;
}
Date maxIntervalStorageTblStartDate = getStorageTableStartDate(maxInterval);
Date maxIntervalStorageTblEndDate = getStorageTableEndDate(maxInterval);
TreeSet<UpdatePeriod> remainingIntervals = new TreeSet<>(updatePeriods);
remainingIntervals.remove(maxInterval);
if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(
maxIntervalStorageTblStartDate, maxIntervalStorageTblEndDate, fromDate, toDate)) {
//Check the time range in remainingIntervals as maxInterval is not useful
return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals,
addNonExistingParts, failOnPartialData, missingPartitions);
}
Date ceilFromDate = DateUtil.getCeilDate(fromDate.after(maxIntervalStorageTblStartDate)
? fromDate : maxIntervalStorageTblStartDate, maxInterval);
Date floorToDate = DateUtil.getFloorDate(toDate.before(maxIntervalStorageTblEndDate)
? toDate : maxIntervalStorageTblEndDate, maxInterval);
if (ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) {
return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals,
addNonExistingParts, failOnPartialData, missingPartitions);
}
int lookAheadNumParts = conf
.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(maxInterval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, maxInterval, 1).iterator();
// add partitions from ceilFrom to floorTo
while (iter.hasNext()) {
Date dt = iter.next();
Date nextDt = iter.peekNext();
FactPartition part = new FactPartition(partCol, dt, maxInterval, null, partWhereClauseFormat);
updatePartitionStorage(part);
log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
if (part.isFound()) {
log.debug("Adding existing partition {}", part);
partitions.add(part);
this.participatingUpdatePeriods.add(maxInterval);
log.debug("Looking for look ahead process time partitions for {}", part);
if (processTimePartCol == null) {
log.debug("processTimePartCol is null");
} else if (partCol.equals(processTimePartCol)) {
log.debug("part column is process time col");
} else if (updatePeriods.first().equals(maxInterval)) {
log.debug("Update period is the least update period");
} else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
// see if this is the part of the last-n look ahead partitions
log.debug("Not a look ahead partition");
} else {
log.debug("Looking for look ahead process time partitions for {}", part);
// check if finer partitions are required
// final partitions are required if no partitions from
// look-ahead
// process time are present
TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, maxInterval, 1)
.iterator();
while (processTimeIter.hasNext()) {
Date pdt = processTimeIter.next();
Date nextPdt = processTimeIter.peekNext();
FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, maxInterval, null,
partWhereClauseFormat);
updatePartitionStorage(processTimePartition);
if (processTimePartition.isFound()) {
log.debug("Finer parts not required for look-ahead partition :{}", part);
} else {
log.debug("Looked ahead process time partition {} is not found", processTimePartition);
TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
newset.addAll(updatePeriods);
newset.remove(maxInterval);
log.debug("newset of update periods:{}", newset);
if (!newset.isEmpty()) {
// Get partitions for look ahead process time
log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
Set<FactPartition> processTimeParts = getPartitions(
TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(),
newset, true, failOnPartialData, missingPartitions);
log.debug("Look ahead partitions: {}", processTimeParts);
TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
for (FactPartition pPart : processTimeParts) {
log.debug("Looking for finer partitions in pPart: {}", pPart);
for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
partWhereClauseFormat);
updatePartitionStorage(innerPart);
innerPart.setFound(pPart.isFound());
if (innerPart.isFound()) {
partitions.add(innerPart);
}
}
log.debug("added all sub partitions blindly in pPart: {}", pPart);
}
}
}
}
}
} else {
log.info("Partition:{} does not exist in any storage table", part);
if (!getPartitions(dt, nextDt, partCol, partitions, remainingIntervals, false, failOnPartialData,
missingPartitions)) {
log.debug("Adding non existing partition {}", part);
if (addNonExistingParts) {
// Add non existing partitions for all cases of whether we populate all non existing or not.
this.participatingUpdatePeriods.add(maxInterval);
missingPartitions.add(part);
if (!failOnPartialData) {
partitions.add(part);
part.getStorageTables().add(storageName);
}
} else {
log.info("No finer granualar partitions exist for {}", part);
return false;
}
} else {
log.debug("Finer granualar partitions added for {}", part);
}
}
}
return getPartitions(fromDate, ceilFromDate, partCol, partitions, remainingIntervals,
addNonExistingParts, failOnPartialData, missingPartitions)
&& getPartitions(floorToDate, toDate, partCol, partitions, remainingIntervals,
addNonExistingParts, failOnPartialData, missingPartitions);
}
@Override
public boolean evaluateCompleteness(TimeRange timeRange, TimeRange queriedTimeRange, boolean failOnPartialData)
throws LensException {
// Check the measure tags.
if (!evaluateMeasuresCompleteness(timeRange)) {
log.info("Storage candidate:{} has partitions with incomplete data: {} for given ranges: {}", this,
dataCompletenessMap, cubeql.getTimeRanges());
if (failOnPartialData) {
return false;
}
}
PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs();
Set<String> unsupportedTimeDims = Sets.newHashSet();
Set<String> partColsQueried = Sets.newHashSet();
partColsQueried.add(timeRange.getPartitionColumn());
StringBuilder extraWhereClauseFallback = new StringBuilder();
Set<FactPartition> rangeParts = getPartitions(timeRange, validUpdatePeriods, true, failOnPartialData, missingParts);
String partCol = timeRange.getPartitionColumn();
boolean partColNotSupported = rangeParts.isEmpty();
String storageTableName = getStorageTable();
if (storagePruningMsgs.containsKey(this)) {
List<CandidateTablePruneCause> causes = storagePruningMsgs.get(this);
// Find the PART_COL_DOES_NOT_EXISTS
for (CandidateTablePruneCause cause : causes) {
if (cause.getCause().equals(CandidateTablePruneCode.PART_COL_DOES_NOT_EXIST)) {
partColNotSupported &= cause.getNonExistantPartCols().contains(partCol);
}
}
} else {
partColNotSupported = false;
}
TimeRange prevRange = timeRange;
String sep = "";
while (rangeParts.isEmpty()) {
String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
if (partColNotSupported && !CandidateUtil.factHasColumn(getFact(), timeDim)) {
unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn()));
break;
}
TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql);
log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange);
if (fallBackRange == null) {
break;
}
partColsQueried.add(fallBackRange.getPartitionColumn());
rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts);
extraWhereClauseFallback.append(sep)
.append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
sep = " AND ";
prevRange = fallBackRange;
partCol = prevRange.getPartitionColumn();
if (!rangeParts.isEmpty()) {
break;
}
}
// Add all the partitions. participatingPartitions contains all the partitions for previous time ranges also.
rangeToPartitions.put(queriedTimeRange, rangeParts);
numQueriedParts += rangeParts.size();
if (!unsupportedTimeDims.isEmpty()) {
log.info("Not considering storage candidate:{} as it doesn't support time dimensions: {}", this,
unsupportedTimeDims);
cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims));
return false;
}
Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
// TODO union : Relook at this.
nonExistingPartitions.addAll(nonExistingParts);
if (rangeParts.size() == 0 || (failOnPartialData && !nonExistingParts.isEmpty())) {
log.info("Not considering storage candidate:{} as no partitions for fallback range:{}", this, timeRange);
return false;
}
String extraWhere = extraWhereClauseFallback.toString();
if (!StringUtils.isEmpty(extraWhere)) {
rangeToExtraWhereFallBack.put(queriedTimeRange, extraWhere);
}
return true;
}
@Override
public Set<FactPartition> getParticipatingPartitions() {
Set<FactPartition> allPartitions = new HashSet<>(numQueriedParts);
for (Set<FactPartition> rangePartitions : rangeToPartitions.values()) {
allPartitions.addAll(rangePartitions);
}
return allPartitions;
}
private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException {
String factDataCompletenessTag = fact.getDataCompletenessTag();
if (factDataCompletenessTag == null) {
log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", fact);
return true;
}
Set<String> measureTag = new HashSet<>();
Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
processExpressionsForCompleteness(cubeql, measureTag, tagToMeasureOrExprMap);
Set<String> measures = cubeql.getQueriedMsrs();
if (measures == null) {
measures = new HashSet<>();
}
for (String measure : measures) {
processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
}
//Checking if dataCompletenessTag is set for the fact
if (measureTag.isEmpty()) {
log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
return true;
}
boolean isDataComplete = false;
DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
if (!timeRange.getPartitionColumn().equals(completenessPartCol)) {
log.info("Completeness check not available for partCol:{}", timeRange.getPartitionColumn());
return true;
}
Date from = timeRange.getFromDate();
Date to = timeRange.getToDate();
Map<String, Map<Date, Float>> completenessMap = completenessChecker
.getCompleteness(factDataCompletenessTag, from, to, measureTag);
if (completenessMap != null && !completenessMap.isEmpty()) {
for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
String tag = measureCompleteness.getKey();
for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
if (completenessResult.getValue() < completenessThreshold) {
log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
completenessResult.getValue(), completenessThreshold, formatter.format(completenessResult.getKey()));
String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
dataCompletenessMap.computeIfAbsent(measureorExprFromTag, k -> new HashMap<>())
.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
isDataComplete = false;
}
}
}
}
return isDataComplete;
}
private Set<FactPartition> getPartitions(TimeRange timeRange, TreeSet<UpdatePeriod> updatePeriods,
boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts)
throws LensException {
Set<FactPartition> partitions = new TreeSet<>();
if (timeRange != null && timeRange.isCoverableBy(updatePeriods)) {
getPartitions(timeRange.getFromDate(), timeRange.getToDate(), timeRange.getPartitionColumn(),
partitions, updatePeriods, addNonExistingParts, failOnPartialData, missingParts);
}
return partitions;
}
@Override
public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
return expr.isEvaluable(this);
}
/**
* Update selectAST for StorageCandidate
* 1. Delete projected select expression if it's not answerable by StorageCandidate.
* 2. Replace the queried alias with select alias if both are different in a select expr.
*
* @param cubeql
* @throws LensException
*/
public void updateAnswerableSelectColumns(CubeQueryContext cubeql) throws LensException {
// update select AST with selected fields
int currentChild = 0;
for (int i = 0; i < cubeql.getSelectAST().getChildCount(); i++) {
ASTNode selectExpr = (ASTNode) queryAst.getSelectAST().getChild(currentChild);
Set<String> exprCols = HQLParser.getColsInExpr(cubeql.getAliasForTableName(cubeql.getCube()), selectExpr);
if (getColumns().containsAll(exprCols)) {
ASTNode aliasNode = HQLParser.findNodeByPath(selectExpr, HiveParser.Identifier);
String alias = cubeql.getSelectPhrases().get(i).getSelectAlias();
if (aliasNode != null) {
String queryAlias = aliasNode.getText();
if (!queryAlias.equals(alias)) {
// replace the alias node
ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias));
queryAst.getSelectAST().getChild(currentChild)
.replaceChildren(selectExpr.getChildCount() - 1, selectExpr.getChildCount() - 1, newAliasNode);
}
} else {
// add column alias
ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias));
queryAst.getSelectAST().getChild(currentChild).addChild(newAliasNode);
}
} else {
queryAst.getSelectAST().deleteChild(currentChild);
currentChild--;
}
currentChild++;
}
}
@Override
public boolean equals(Object obj) {
if (super.equals(obj)) {
return true;
}
if (obj == null || !(obj instanceof StorageCandidate)) {
return false;
}
StorageCandidate storageCandidateObj = (StorageCandidate) obj;
//Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly
//on == check for these.
return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageTable
.equals(storageCandidateObj.storageTable));
}
@Override
public int hashCode() {
return this.storageTable.hashCode();
}
@Override
public String toString() {
return getResolvedName();
}
void addValidUpdatePeriod(UpdatePeriod updatePeriod) {
this.validUpdatePeriods.add(updatePeriod);
}
void updateFromString(CubeQueryContext query, Set<Dimension> queryDims,
Map<Dimension, CandidateDim> dimsToQuery) throws LensException {
this.dimsToQuery = dimsToQuery;
String alias = cubeql.getAliasForTableName(cubeql.getCube().getName());
fromString = getAliasForTable(alias);
if (query.isAutoJoinResolved()) {
fromString = query.getAutoJoinCtx().getFromString(fromString, this, queryDims, dimsToQuery, query, cubeql);
}
}
private String getFromTable() throws LensException {
if (cubeql.isAutoJoinResolved()) {
return fromString;
} else {
return cubeql.getQBFromString(this, getDimsToQuery());
}
}
public String getAliasForTable(String alias) {
String database = SessionState.get().getCurrentDatabase();
String ret;
if (alias == null || alias.isEmpty()) {
ret = getResolvedName();
} else {
ret = getResolvedName() + " " + alias;
}
if (StringUtils.isNotBlank(database) && !"default".equalsIgnoreCase(database)) {
ret = database + "." + ret;
}
return ret;
}
boolean isUpdatePeriodUseful(UpdatePeriod updatePeriod) {
return cubeql.getTimeRanges().stream().anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod));
}
/**
* Is the update period useful for this time range. e.g. for a time range of hours and days, monthly
* and yearly update periods are useless. DAILY and HOURLY are useful. It further checks if the update
* period answers the range at least partially based on start and end times configured at update period
* level or at storage or fact level.
* @param timeRange The time range
* @param updatePeriod Update period
* @return Whether it's useless
*/
private boolean isUpdatePeriodUseful(TimeRange timeRange, UpdatePeriod updatePeriod) {
try {
if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(getStorageTableStartDate(updatePeriod),
getStorageTableEndDate(updatePeriod), timeRange.getFromDate(), timeRange.getToDate())) {
return false;
}
Date storageTblStartDate = getStorageTableStartDate(updatePeriod);
Date storageTblEndDate = getStorageTableEndDate(updatePeriod);
TimeRange.getBuilder() //TODO date calculation to move to util method and resued
.fromDate(timeRange.getFromDate().after(storageTblStartDate) ? timeRange.getFromDate() : storageTblStartDate)
.toDate(timeRange.getToDate().before(storageTblEndDate) ? timeRange.getToDate() : storageTblEndDate)
.partitionColumn(timeRange.getPartitionColumn())
.build()
.truncate(updatePeriod);
return true;
} catch (LensException e) {
return false;
}
}
/**
* Is time range coverable based on valid update periods of this storage candidate
*
* @param timeRange
* @return
* @throws LensException
*/
public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException {
return isTimeRangeCoverable(timeRange.getFromDate(), timeRange.getToDate(), validUpdatePeriods);
}
/**
* Is the time range coverable by given update periods.
* Extracts the max update period, then extracts maximum amount of range from the middle that this update
* period can cover. Then recurses on the remaining ranges on the left and right side of the extracted chunk
* using one less update period.
*
* @param timeRangeStart
* @param timeRangeEnd
* @param intervals Update periods to check
* @return Whether time range is coverable by provided update periods or not.
*/
private boolean isTimeRangeCoverable(Date timeRangeStart, Date timeRangeEnd,
Set<UpdatePeriod> intervals) throws LensException {
if (timeRangeStart.equals(timeRangeEnd) || timeRangeStart.after(timeRangeEnd)) {
return true;
}
if (intervals == null || intervals.isEmpty()) {
return false;
}
UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(timeRangeStart, timeRangeEnd, intervals);
if (maxInterval == null) {
return false;
}
if (maxInterval == UpdatePeriod.CONTINUOUS
&& cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) {
return true;
}
Date maxIntervalStorageTableStartDate = getStorageTableStartDate(maxInterval);
Date maxIntervalStorageTableEndDate = getStorageTableEndDate(maxInterval);
Set<UpdatePeriod> remainingIntervals = Sets.difference(intervals, Sets.newHashSet(maxInterval));
if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(
maxIntervalStorageTableStartDate, maxIntervalStorageTableEndDate, timeRangeStart, timeRangeEnd)) {
//Check the time range in remainingIntervals as maxInterval is not useful
return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals);
}
Date ceilFromDate = DateUtil.getCeilDate(timeRangeStart.after(maxIntervalStorageTableStartDate)
? timeRangeStart : maxIntervalStorageTableStartDate, maxInterval);
Date floorToDate = DateUtil.getFloorDate(timeRangeEnd.before(maxIntervalStorageTableEndDate)
? timeRangeEnd : maxIntervalStorageTableEndDate, maxInterval);
if (ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) {
return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals);
}
//ceilFromDate to floorToDate time range is covered by maxInterval (though there may be holes.. but that's ok)
//Check the remaining part of time range in remainingIntervals
return isTimeRangeCoverable(timeRangeStart, ceilFromDate, remainingIntervals)
&& isTimeRangeCoverable(floorToDate, timeRangeEnd, remainingIntervals);
}
private Date getStorageTableStartDate(UpdatePeriod interval) throws LensException {
if (!isStorageTblsAtUpdatePeriodLevel) {
//In this case the start time and end time is at Storage Level and will be same for all update periods.
return this.startTime;
}
return client.getStorageTableStartDate(
client.getStorageTableName(fact.getName(), storageName, interval), fact.getName());
}
private Date getStorageTableEndDate(UpdatePeriod interval) throws LensException {
if (!isStorageTblsAtUpdatePeriodLevel) {
//In this case the start time and end time is at Storage Level and will be same for all update periods.
return this.endTime;
}
return client.getStorageTableEndDate(
client.getStorageTableName(fact.getName(), storageName, interval), fact.getName());
}
public String getResolvedName() {
if (resolvedName == null) {
return storageTable;
}
return resolvedName;
}
/**
* Splits the Storage Candidates into multiple Storage Candidates if storage candidate has multiple
* storage tables (one per update period)
*
* @return
* @throws LensException
*/
public Collection<StorageCandidate> splitAtUpdatePeriodLevelIfReq() throws LensException {
if (!isStorageTblsAtUpdatePeriodLevel) {
return Lists.newArrayList(this); // No need to explode in this case
}
return getPeriodSpecificStorageCandidates();
}
private Collection<StorageCandidate> getPeriodSpecificStorageCandidates() throws LensException {
List<StorageCandidate> periodSpecificScList = new ArrayList<>(participatingUpdatePeriods.size());
StorageCandidate updatePeriodSpecificSc;
for (UpdatePeriod period : participatingUpdatePeriods) {
updatePeriodSpecificSc = new StorageCandidate(this);
updatePeriodSpecificSc.truncatePartitions(period);
updatePeriodSpecificSc.setResolvedName(client.getStorageTableName(fact.getName(),
storageName, period));
periodSpecificScList.add(updatePeriodSpecificSc);
}
return periodSpecificScList;
}
/**
* Truncates partitions in {@link #rangeToPartitions} such that only partitions belonging to
* the passed undatePeriod are retained.
* @param updatePeriod
*/
private void truncatePartitions(UpdatePeriod updatePeriod) {
Iterator<Map.Entry<TimeRange, Set<FactPartition>>> rangeItr = rangeToPartitions.entrySet().iterator();
while (rangeItr.hasNext()) {
Map.Entry<TimeRange, Set<FactPartition>> rangeEntry = rangeItr.next();
Iterator<FactPartition> partitionItr = rangeEntry.getValue().iterator();
while (partitionItr.hasNext()) {
if (!partitionItr.next().getPeriod().equals(updatePeriod)) {
partitionItr.remove();
}
}
if (rangeEntry.getValue().isEmpty()) {
rangeItr.remove();
}
}
}
}