blob: 9125f4f7f6ea5d0c46fdfde07e94cc9b01c68423 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.cube.parse;
import static org.apache.lens.cube.parse.CandidateUtil.getColumns;
import java.util.*;
import org.apache.lens.cube.error.LensCubeErrorCode;
import org.apache.lens.cube.metadata.TimeRange;
import org.apache.lens.server.api.error.LensException;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CandidateCoveringSetsResolver implements ContextRewriter {
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
if (!cubeql.hasCubeInQuery()) {
return; //Dimension query
}
if (cubeql.getCandidates().size() == 0){
cubeql.throwNoCandidateFactException();
}
List<QueriedPhraseContext> qpcList = cubeql.getQueriedPhrases();
Set<QueriedPhraseContext> queriedMsrs = new HashSet<>();
for (QueriedPhraseContext qpc : qpcList) {
if (qpc.hasMeasures(cubeql)) {
queriedMsrs.add(qpc);
}
}
List<Candidate> timeRangeCoveringSet = resolveTimeRangeCoveringFactSet(cubeql, queriedMsrs, qpcList);
if (timeRangeCoveringSet.isEmpty()) {
throw new LensException(LensCubeErrorCode.NO_UNION_CANDIDATE_AVAILABLE.getLensErrorInfo(),
cubeql.getCube().getName(), cubeql.getTimeRanges().toString(), getColumns(queriedMsrs).toString());
}
log.info("Time covering candidates :{}", timeRangeCoveringSet);
if (queriedMsrs.isEmpty()) {
cubeql.getCandidates().clear();
cubeql.getCandidates().addAll(timeRangeCoveringSet);
} else if (!timeRangeCoveringSet.isEmpty()) {
List<List<Candidate>> measureCoveringSets = resolveJoinCandidates(timeRangeCoveringSet, queriedMsrs);
if (measureCoveringSets.isEmpty()) {
throw new LensException(LensCubeErrorCode.NO_JOIN_CANDIDATE_AVAILABLE.getLensErrorInfo(),
cubeql.getCube().getName(), getColumns(queriedMsrs).toString());
}
updateFinalCandidates(measureCoveringSets, cubeql);
}
log.info("Final Time and Measure covering candidates :{}", cubeql.getCandidates());
}
private Candidate createJoinCandidate(List<Candidate> childCandidates, CubeQueryContext cubeql) {
Candidate cand;
Candidate first = childCandidates.get(0);
Candidate second = childCandidates.get(1);
cand = new JoinCandidate(first, second, cubeql);
for (int i = 2; i < childCandidates.size(); i++) {
cand = new JoinCandidate(cand, childCandidates.get(i), cubeql);
}
return cand;
}
private void updateFinalCandidates(List<List<Candidate>> joinCandidates, CubeQueryContext cubeql) {
List<Candidate> finalCandidates = new ArrayList<>();
for (List<Candidate> joinCandidate : joinCandidates) {
if (joinCandidate.size() == 1) {
finalCandidates.add(joinCandidate.iterator().next());
} else {
finalCandidates.add(createJoinCandidate(joinCandidate, cubeql));
}
}
cubeql.getCandidates().clear();
cubeql.getCandidates().addAll(finalCandidates);
}
private boolean isCandidateCoveringTimeRanges(UnionCandidate uc, List<TimeRange> ranges) {
for (TimeRange range : ranges) {
if (!CandidateUtil.isTimeRangeCovered(uc.getChildren(), range.getFromDate(), range.getToDate())) {
return false;
}
}
return true;
}
private List<Candidate> resolveTimeRangeCoveringFactSet(CubeQueryContext cubeql,
Set<QueriedPhraseContext> queriedMsrs, List<QueriedPhraseContext> qpcList) throws LensException {
List<Candidate> candidateSet = new ArrayList<>();
// All Candidates
List<Candidate> allCandidates = new ArrayList<>(cubeql.getCandidates());
// Partially valid candidates
List<Candidate> allCandidatesPartiallyValid = new ArrayList<>();
for (Candidate cand : allCandidates) {
if (cand.isCompletelyValidForTimeRanges(cubeql.getTimeRanges())) {
candidateSet.add(cand.copy());
} else if (cand.isPartiallyValidForTimeRanges(cubeql.getTimeRanges())) {
allCandidatesPartiallyValid.add(cand.copy());
} else {
cubeql.addCandidatePruningMsg(cand, CandidateTablePruneCause.storageNotAvailableInRange(
cubeql.getTimeRanges()));
}
}
// Get all covering fact sets
// List<UnionCandidate> unionCoveringSet = getCombinations(new ArrayList<>(allCandidatesPartiallyValid), cubeql);
List<UnionCandidate> unionCoveringSet = getCombinationTailIterative(allCandidatesPartiallyValid, cubeql);
pruneRedundantUnionCoveringSets(unionCoveringSet);
// Sort the Collection based on no of elements
unionCoveringSet.sort(Comparator.comparing(Candidate::getChildrenCount));
// prune candidate set which doesn't contain any common measure i
if (!queriedMsrs.isEmpty()) {
pruneUnionCoveringSetWithoutAnyCommonMeasure(unionCoveringSet, queriedMsrs);
}
// pruing done in the previous steps, now create union candidates
candidateSet.addAll(unionCoveringSet);
updateQueriableMeasures(candidateSet, qpcList);
return candidateSet;
}
private void pruneUnionCoveringSetWithoutAnyCommonMeasure(List<UnionCandidate> ucs,
Set<QueriedPhraseContext> queriedMsrs) throws LensException {
for (ListIterator<UnionCandidate> itr = ucs.listIterator(); itr.hasNext();) {
boolean toRemove = true;
UnionCandidate uc = itr.next();
for (QueriedPhraseContext msr : queriedMsrs) {
if (uc.isPhraseAnswerable(msr)) {
toRemove = false;
break;
}
}
if (toRemove) {
itr.remove();
}
}
}
private void pruneRedundantUnionCoveringSets(List<UnionCandidate> candidates) {
for (int i = 0; i < candidates.size(); i++) {
UnionCandidate current = candidates.get(i);
int j = i + 1;
for (ListIterator<UnionCandidate> itr = candidates.listIterator(j); itr.hasNext();) {
UnionCandidate next = itr.next();
if (next.getChildren().containsAll(current.getChildren())) {
itr.remove();
}
}
}
}
@Deprecated
private List<UnionCandidate> getCombinations(final List<Candidate> candidates, CubeQueryContext cubeql)
throws LensException {
List<UnionCandidate> combinations = new LinkedList<>();
int size = candidates.size();
int threshold = Double.valueOf(Math.pow(2, size)).intValue() - 1;
for (int i = 1; i <= threshold; ++i) {
LinkedList<Candidate> individualCombinationList = new LinkedList<>();
int count = size - 1;
int clonedI = i;
while (count >= 0) {
if ((clonedI & 1) != 0) {
individualCombinationList.addFirst(candidates.get(count));
}
clonedI = clonedI >>> 1;
--count;
}
UnionCandidate uc = new UnionCandidate(individualCombinationList, cubeql);
if (isCandidateCoveringTimeRanges(uc, cubeql.getTimeRanges())) {
uc.cloneChildren();
combinations.add(uc);
}
}
return combinations;
}
/**
* The following function is iterative rewrite of the following tail-recursive implementation:
* (ignoring cubeql for clarity)
* getCombinations(candidates) = getCombinationsTailRecursive(emptyList(), candidates)
*
* getCombinationsTailRecursive(incompleteCombinations: List[List[Candidate]], candidates: List[Candidate]) =
* head, tail = head and tail of linked List candidates
* add head to all elements of incompleteCombinations.
* complete = remove now complete combinations from incompleteCombinations
* return complete ++ getCombinationsTailRecursive(incompleteCombinations, tail)
* @param candidates
* @param cubeql
* @return
*/
private List<UnionCandidate> getCombinationTailIterative(List<Candidate> candidates, CubeQueryContext cubeql)
throws LensException {
LinkedList<Candidate> candidateLinkedList = Lists.newLinkedList(candidates);
List<List<Candidate>> incompleteCombinations = Lists.newArrayList();
incompleteCombinations.add(Lists.newArrayList());
List<UnionCandidate> unionCandidates = Lists.newArrayList();
while(!candidateLinkedList.isEmpty()) {
List<List<Candidate>> moreIncomplete = Lists.newArrayList();
Candidate candidate = candidateLinkedList.remove();
for (List<Candidate> combination : incompleteCombinations) {
List<Candidate> incompleteCombination = Lists.newArrayList(combination);
incompleteCombination.add(candidate);
UnionCandidate unionCandidate = new UnionCandidate(incompleteCombination, cubeql);
if (isCandidateCoveringTimeRanges(unionCandidate, cubeql.getTimeRanges())) {
unionCandidate.cloneChildren();
unionCandidates.add(unionCandidate);
} else {
moreIncomplete.add(incompleteCombination);
}
}
incompleteCombinations.addAll(moreIncomplete);
}
return unionCandidates;
}
private List<List<Candidate>> resolveJoinCandidates(List<Candidate> candidates,
Set<QueriedPhraseContext> msrs) throws LensException {
List<List<Candidate>> msrCoveringSets = new ArrayList<>();
List<Candidate> ucSet = new ArrayList<>(candidates);
// Check if a single set can answer all the measures and exprsWithMeasures
for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext();) {
boolean allEvaluable = true;
boolean anyEvaluable = false;
Candidate uc = i.next();
for (QueriedPhraseContext msr : msrs) {
boolean evaluable = uc.isPhraseAnswerable(msr);
allEvaluable &= evaluable;
anyEvaluable |= evaluable;
}
if (allEvaluable) {
// single set can answer all the measures as an UnionCandidate
List<Candidate> one = new ArrayList<>();
one.add(uc);
msrCoveringSets.add(one);
i.remove();
}
if (!anyEvaluable) { // none evaluable
i.remove();
}
}
// sorting will make sure storage candidates come before complex candidates.
// ensuring maximum columns get selected from simpler candidates.
ucSet.sort(Comparator.comparing(Candidate::getChildrenCount));
// Sets that contain all measures or no measures are removed from iteration.
// find other facts
for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext();) {
Candidate candidate = i.next();
i.remove();
// find the remaining measures in other facts
if (i.hasNext()) {
Set<QueriedPhraseContext> remainingMsrs = new HashSet<>(msrs);
Set<QueriedPhraseContext> coveredMsrs = candidate.coveredPhrases(msrs);
remainingMsrs.removeAll(coveredMsrs);
List<List<Candidate>> coveringSets = resolveJoinCandidates(ucSet, remainingMsrs);
if (!coveringSets.isEmpty()) {
for (List<Candidate> candSet : coveringSets) {
candSet.add(candidate);
msrCoveringSets.add(candSet);
}
} else {
log.info("Couldnt find any set containing remaining measures:{} {} in {}", remainingMsrs,
ucSet);
}
}
}
log.info("Covering set {} for measures {} with factsPassed {}", msrCoveringSets, msrs, ucSet);
return msrCoveringSets;
}
private void updateQueriableMeasures(List<Candidate> cands,
List<QueriedPhraseContext> qpcList) throws LensException {
for (Candidate cand : cands) {
cand.updateStorageCandidateQueriablePhraseIndices(qpcList);
}
}
}