blob: 30fa873c255df408563f3c20062a6ccfca0f3811 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.cube.parse;
import static org.apache.lens.cube.parse.CandidateTablePruneCause.denormColumnNotFound;
import static org.apache.hadoop.hive.ql.parse.HiveParser.Identifier;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABLE_OR_COL;
import java.util.*;
import org.apache.lens.cube.error.LensCubeErrorCode;
import org.apache.lens.cube.metadata.*;
import org.apache.lens.cube.metadata.ReferencedDimAttribute.ChainRefCol;
import org.apache.lens.cube.parse.ExpressionResolver.ExprSpecContext;
import org.apache.lens.cube.parse.ExpressionResolver.ExpressionContext;
import org.apache.lens.server.api.error.LensException;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.antlr.runtime.CommonToken;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* This class resolves all the reference columns that are queried.
* <p></p>
* Keeps track of the context that if any candidate needs to use columns through tables referenced and replaces the
* columns from referenced tables in all the ASTs
*/
@Slf4j
public class DenormalizationResolver implements ContextRewriter {
@ToString
static class ReferencedQueriedColumn {
ReferencedDimAttribute col;
AbstractCubeTable srcTable;
transient List<ChainRefCol> chainRefCols = new ArrayList<>();
ReferencedQueriedColumn(ReferencedDimAttribute col, AbstractCubeTable srcTable) {
this.col = col;
this.srcTable = srcTable;
chainRefCols.addAll(col.getChainRefColumns());
}
}
@ToString
static class PickedReference {
@Getter
ChainRefCol chainRef;
String srcAlias;
String pickedFor;
PickedReference(ChainRefCol chainRef, String srcAlias, String pickedFor) {
this.srcAlias = srcAlias;
this.chainRef = chainRef;
this.pickedFor = pickedFor;
}
}
static class DenormalizationContext {
// map of column name to all references
@Getter
private Map<String, Set<ReferencedQueriedColumn>> referencedCols = new HashMap<>();
// candidate table name to all the references columns it needs
@Getter
private Map<String, Set<ReferencedQueriedColumn>> tableToRefCols = new HashMap<>();
// set of all picked references once all candidate tables are picked
private Set<PickedReference> pickedRefs = new HashSet<>();
// index on column name for picked references with map from column name to
// pickedrefs
private Map<String, Set<PickedReference>> pickedReferences = new HashMap<>();
void addReferencedCol(String col, ReferencedQueriedColumn refer) {
referencedCols.computeIfAbsent(col, k -> new HashSet<>()).add(refer);
}
// When candidate table does not have the field, this method checks
// if the field can be reached through reference,
// if yes adds the ref usage and returns to true, if not returns false.
boolean addRefUsage(CubeQueryContext cubeql, CandidateTable table, String col, String srcTbl) throws LensException {
// available as referenced col
if (referencedCols.containsKey(col)) {
for (ReferencedQueriedColumn refer : referencedCols.get(col)) {
if (refer.srcTable.getName().equalsIgnoreCase(srcTbl)) {
// check if reference source column is available in src table?
// should not be required here. Join resolution will figure out if
// there is no path
// to the source table
log.info("Adding denormalized column for column:{} for table:{}", col, table);
String name = (table instanceof CandidateDim) ? table.getName() : table.getStorageTable();
tableToRefCols.computeIfAbsent(name, k -> new HashSet<>()).add(refer);
// Add to optional tables
for (ChainRefCol refCol : refer.col.getChainRefColumns()) {
cubeql.addOptionalDimTable(refCol.getChainName(), table, false, refer.col.getName(), true,
refCol.getRefColumn());
}
return true;
}
}
}
return false;
}
private void addPickedReference(String col, PickedReference refer) {
pickedReferences.computeIfAbsent(col, k -> new HashSet<>()).add(refer);
}
private PickedReference getPickedReference(String col, String srcAlias) {
if (pickedReferences.containsKey(col)) {
for (PickedReference ref : pickedReferences.get(col)) {
if (ref.srcAlias.equalsIgnoreCase(srcAlias)) {
log.info("Picked reference for {} ref:{}", col, pickedReferences.get(col));
return ref;
}
}
}
return null;
}
Set<Dimension> rewriteDenormctx(CubeQueryContext cubeql,
StorageCandidate sc, Map<Dimension, CandidateDim> dimsToQuery, boolean replaceFact) throws LensException {
Set<Dimension> refTbls = new HashSet<>();
log.info("Doing denorm changes for fact :{}", sc);
if (!tableToRefCols.isEmpty()) {
// pick referenced columns for fact
if (sc != null) {
pickColumnsForTable(cubeql, sc.getStorageTable());
}
// pick referenced columns for dimensions
if (dimsToQuery != null) {
for (CandidateDim cdim : dimsToQuery.values()) {
pickColumnsForTable(cubeql, cdim.getName());
}
}
// Replace picked reference in all the base trees
replaceReferencedColumns(cubeql, sc, replaceFact);
// Add the picked references to dimsToQuery
for (PickedReference picked : pickedRefs) {
if (isPickedFor(picked, sc, dimsToQuery)) {
refTbls.add((Dimension) cubeql.getCubeTableForAlias(picked.getChainRef().getChainName()));
cubeql.addColumnsQueried(picked.getChainRef().getChainName(), picked.getChainRef().getRefColumn());
}
}
}
pickedReferences.clear();
pickedRefs.clear();
return refTbls;
}
boolean hasReferences() {
return !tableToRefCols.isEmpty();
}
Set<Dimension> rewriteDenormctxInExpression(CubeQueryContext cubeql, StorageCandidate sc, Map<Dimension,
CandidateDim> dimsToQuery, ASTNode exprAST) throws LensException {
Set<Dimension> refTbls = new HashSet<>();
if (!tableToRefCols.isEmpty()) {
// pick referenced columns for fact
if (sc != null) {
pickColumnsForTable(cubeql, sc.getStorageTable());
}
// pick referenced columns for dimensions
if (dimsToQuery != null) {
for (CandidateDim cdim : dimsToQuery.values()) {
pickColumnsForTable(cubeql, cdim.getName());
}
}
// Replace picked reference in expression ast
resolveClause(exprAST);
// Add the picked references to dimsToQuery
for (PickedReference picked : pickedRefs) {
if (isPickedFor(picked, sc, dimsToQuery)) {
refTbls.add((Dimension) cubeql.getCubeTableForAlias(picked.getChainRef().getChainName()));
cubeql.addColumnsQueried(picked.getChainRef().getChainName(), picked.getChainRef().getRefColumn());
}
}
}
pickedReferences.clear();
pickedRefs.clear();
return refTbls;
}
// checks if the reference if picked for facts and dimsToQuery passed
private boolean isPickedFor(PickedReference picked, StorageCandidate sc, Map<Dimension, CandidateDim> dimsToQuery) {
if (sc != null && picked.pickedFor.equalsIgnoreCase(sc.getStorageTable())) {
return true;
}
if (dimsToQuery != null) {
for (CandidateDim cdim : dimsToQuery.values()) {
if (picked.pickedFor.equalsIgnoreCase(cdim.getName())) {
return true;
}
}
}
return false;
}
private void pickColumnsForTable(CubeQueryContext cubeql, String tbl) throws LensException {
if (tableToRefCols.containsKey(tbl)) {
for (ReferencedQueriedColumn refered : tableToRefCols.get(tbl)) {
// remove unreachable references
refered.chainRefCols.removeIf(reference -> !cubeql.getAutoJoinCtx().isReachableDim(
(Dimension) cubeql.getCubeTableForAlias(reference.getChainName()), reference.getChainName()));
if (refered.chainRefCols.isEmpty()) {
throw new LensException(LensCubeErrorCode.NO_REF_COL_AVAILABLE.getLensErrorInfo(), refered.col.getName());
}
PickedReference picked =
new PickedReference(refered.chainRefCols.iterator().next(),
cubeql.getAliasForTableName(refered.srcTable.getName()), tbl);
addPickedReference(refered.col.getName(), picked);
pickedRefs.add(picked);
}
}
}
void pruneReferences(CubeQueryContext cubeql) {
for (Set<ReferencedQueriedColumn> referencedQueriedColumns : referencedCols.values()) {
for(Iterator<ReferencedQueriedColumn> iterator = referencedQueriedColumns.iterator(); iterator.hasNext();) {
ReferencedQueriedColumn rqc = iterator.next();
for (Iterator<ChainRefCol> iter = rqc.chainRefCols.iterator(); iter.hasNext();) {
// remove unreachable references
ChainRefCol reference = iter.next();
if (cubeql.getAutoJoinCtx() == null || !cubeql.getAutoJoinCtx().isReachableDim(
(Dimension) cubeql.getCubeTableForAlias(reference.getChainName()), reference.getChainName())) {
log.info("{} is not reachable", reference.getChainName());
iter.remove();
}
}
if (rqc.chainRefCols.isEmpty()) {
log.info("The referenced column: {} is not reachable", rqc.col.getName());
iterator.remove();
continue;
}
// do column life validation
for (TimeRange range : cubeql.getTimeRanges()) {
if (!rqc.col.isColumnAvailableInTimeRange(range)) {
log.info("The referenced column: {} is not in the range queried", rqc.col.getName());
iterator.remove();
break;
}
}
}
}
}
private void replaceReferencedColumns(CubeQueryContext cubeql, StorageCandidate sc, boolean replaceFact)
throws LensException {
QueryAST ast = cubeql;
boolean factRefExists = sc != null && tableToRefCols.get(sc.getStorageTable()) != null
&& !tableToRefCols.get(sc.getStorageTable()).isEmpty();
if (replaceFact && factRefExists) {
ast = sc.getQueryAst();
}
resolveClause(ast.getSelectAST());
if (factRefExists) {
resolveClause(sc.getQueryAst().getWhereAST());
} else {
resolveClause(ast.getWhereAST());
}
resolveClause(ast.getGroupByAST());
resolveClause(ast.getHavingAST());
resolveClause(ast.getOrderByAST());
}
private void resolveClause(ASTNode node) throws LensException {
if (node == null) {
return;
}
int nodeType = node.getToken().getType();
if (nodeType == HiveParser.DOT) {
String colName = HQLParser.getColName(node).toLowerCase();
if (!pickedReferences.containsKey(colName)) {
return;
}
// No need to create a new node,
// replace the table name identifier and column name identifier
ASTNode tableNode = (ASTNode) node.getChild(0);
ASTNode tabident = HQLParser.findNodeByPath(node, TOK_TABLE_OR_COL, Identifier);
assert tabident != null;
PickedReference refered = getPickedReference(colName, tabident.getText().toLowerCase());
if (refered == null) {
return;
}
ASTNode newTableNode =
new ASTNode(new CommonToken(HiveParser.Identifier, refered.getChainRef().getChainName()));
tableNode.setChild(0, newTableNode);
ASTNode newColumnNode = new ASTNode(new CommonToken(HiveParser.Identifier,
refered.getChainRef().getRefColumn()));
node.setChild(1, newColumnNode);
} else {
// recurse down
for (int i = 0; i < node.getChildCount(); i++) {
ASTNode child = (ASTNode) node.getChild(i);
resolveClause(child);
}
}
}
Set<String> getNonReachableReferenceFields(String table) {
Set<String> nonReachableFields = new HashSet<>();
if (tableToRefCols.containsKey(table)) {
for (ReferencedQueriedColumn refcol : tableToRefCols.get(table)) {
if (getReferencedCols().get(refcol.col.getName()).isEmpty()) {
log.info("For table:{}, the column {} is not available", table, refcol.col);
nonReachableFields.add(refcol.col.getName());
}
}
}
return nonReachableFields;
}
}
private void addRefColsQueried(CubeQueryContext cubeql, TrackQueriedColumns tqc, DenormalizationContext denormCtx) {
for (Map.Entry<String, Set<String>> entry : tqc.getTblAliasToColumns().entrySet()) {
// skip default alias
if (Objects.equals(entry.getKey(), CubeQueryContext.DEFAULT_TABLE)) {
continue;
}
// skip join chain aliases
if (cubeql.getJoinchains().keySet().contains(entry.getKey().toLowerCase())) {
continue;
}
AbstractCubeTable tbl = cubeql.getCubeTableForAlias(entry.getKey());
Set<String> columns = entry.getValue();
for (String column : columns) {
CubeColumn col;
if (tbl instanceof CubeInterface) {
col = ((CubeInterface) tbl).getColumnByName(column);
} else {
col = ((Dimension) tbl).getColumnByName(column);
}
if (col instanceof ReferencedDimAttribute) {
// considering all referenced dimensions to be denormalized columns
denormCtx.addReferencedCol(column, new ReferencedQueriedColumn((ReferencedDimAttribute) col, tbl));
}
}
}
}
private static DenormalizationContext getOrCreateDeNormCtx(TrackDenormContext tdc) {
DenormalizationContext denormCtx = tdc.getDeNormCtx();
if (denormCtx == null) {
denormCtx = new DenormalizationContext();
tdc.setDeNormCtx(denormCtx);
}
return denormCtx;
}
/**
* Find all de-normalized columns, if these columns are not directly available in candidate tables, query will be
* replaced with the corresponding table reference
*/
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
DenormalizationContext denormCtx = cubeql.getDeNormCtx();
if (denormCtx == null) {
DenormalizationContext ctx = getOrCreateDeNormCtx(cubeql);
// Adds all the reference dimensions as eligible for denorm fields
// add ref columns in cube
addRefColsQueried(cubeql, cubeql, ctx);
// add ref columns from expressions
for (Set<ExpressionContext> ecSet : cubeql.getExprCtx().getAllExprsQueried().values()) {
for (ExpressionContext ec : ecSet) {
for (ExprSpecContext esc : ec.getAllExprs()) {
addRefColsQueried(cubeql, esc, getOrCreateDeNormCtx(esc));
}
}
}
} else if (!denormCtx.tableToRefCols.isEmpty()) {
denormCtx.pruneReferences(cubeql);
// In the second iteration of denorm resolver
// candidate tables which require denorm fields and the refernces are no
// more valid will be pruned
if (cubeql.getCube() != null && !cubeql.getCandidates().isEmpty()) {
for (Iterator<StorageCandidate> i =
CandidateUtil.getStorageCandidates(cubeql.getCandidates()).iterator(); i.hasNext();) {
StorageCandidate candidate = i.next();
Set<String> nonReachableFields = denormCtx.getNonReachableReferenceFields(candidate.getStorageTable());
if (!nonReachableFields.isEmpty()) {
log.info("Not considering fact table:{} as columns {} are not available", candidate, nonReachableFields);
cubeql.addCandidatePruningMsg(candidate, denormColumnNotFound(nonReachableFields));
i.remove();
}
}
if (cubeql.getCandidates().size() == 0) {
throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(),
cubeql.getColumnsQueriedForTable(cubeql.getCube().getName()).toString());
}
}
if (cubeql.getDimensions() != null && !cubeql.getDimensions().isEmpty()) {
for (Dimension dim : cubeql.getDimensions()) {
for (Iterator<CandidateDim> i = cubeql.getCandidateDimTables().get(dim).iterator(); i.hasNext();) {
CandidateDim cdim = i.next();
Set<String> nonReachableFields = denormCtx.getNonReachableReferenceFields(cdim.getName());
if (!nonReachableFields.isEmpty()) {
log.info("Not considering dim table:{} as column {} is not available", cdim, nonReachableFields);
cubeql.addDimPruningMsgs(dim, cdim.dimtable, denormColumnNotFound(nonReachableFields));
i.remove();
}
}
if (cubeql.getCandidateDimTables().get(dim).size() == 0) {
throw new LensException(LensCubeErrorCode.NO_DIM_HAS_COLUMN.getLensErrorInfo(),
dim.toString(), cubeql.getColumnsQueriedForTable(dim.getName()).toString());
}
}
}
}
}
}