blob: 951c867d350a0cdff38b0c015016cd893d708a09 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.geode.cache.query.Aggregator;
import org.apache.geode.cache.query.AmbiguousNameException;
import org.apache.geode.cache.query.FunctionDomainException;
import org.apache.geode.cache.query.NameResolutionException;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.apache.geode.cache.query.internal.types.TypeUtils;
import org.apache.geode.cache.query.internal.utils.PDXUtils;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.cache.query.types.StructType;
public class CompiledGroupBySelect extends CompiledSelect {
private final BitSet aggregateColsPos;
private final CompiledAggregateFunction[] aggregateFunctions;
private final boolean isDistinct;
private final List<CompiledSortCriterion> originalOrderByClause;
private final CompiledValue limit;
@Override
public int getType() {
return GROUP_BY_SELECT;
}
public CompiledGroupBySelect(boolean distinct, boolean count, CompiledValue whereClause,
List iterators, List projAttrs, List<CompiledSortCriterion> orderByAttrs, CompiledValue limit,
List<String> hints, List<CompiledValue> groupByClause,
LinkedHashMap<Integer, CompiledAggregateFunction> aggMap) {
super(false, false, whereClause, iterators, projAttrs, null, null, hints, groupByClause);
this.aggregateFunctions = new CompiledAggregateFunction[aggMap != null ? aggMap.size() : 0];
this.aggregateColsPos = new BitSet(this.projAttrs.size());
if (aggMap != null) {
int i = 0;
for (Map.Entry<Integer, CompiledAggregateFunction> entry : aggMap.entrySet()) {
this.aggregateColsPos.set(entry.getKey());
this.aggregateFunctions[i++] = entry.getValue();
}
}
this.originalOrderByClause = orderByAttrs;
this.isDistinct = distinct;
this.limit = limit;
}
@Override
public Set computeDependencies(ExecutionContext context)
throws TypeMismatchException, AmbiguousNameException, NameResolutionException {
if (!this.transformationDone) {
replaceAggregateFunctionInProjection();
}
return super.computeDependencies(context);
}
private void replaceAggregateFunctionInProjection() {
// Extract the parameter compiledValues out of aggregate functions &
// modify the projection
// attributes to have that instead. Empty out the groupByList
// Create orderby attribute out of group by
int bitStart = 0;
for (CompiledAggregateFunction aggFunc : this.aggregateFunctions) {
int index = this.aggregateColsPos.nextSetBit(bitStart);
bitStart = index + 1;
CompiledValue param = aggFunc.getParameter();
if (param == null && aggFunc.getFunctionType() == OQLLexerTokenTypes.COUNT) {
// * case of *, substitue a dummy parameter of compiled literal = 0 to
// satisfy the code
param = new CompiledLiteral(0);
} else if (param == null) {
throw new QueryInvalidException("aggregate function passed invalid parameter");
}
Object[] projAtt = (Object[]) this.projAttrs.get(index);
projAtt[1] = param;
}
}
private void revertAggregateFunctionInProjection() {
// Extract the parameter compiledValues out of aggregate functions &
// modify the projection
// attributes to have that instead. Empty out the groupByList
// Create orderby attribute out of group by
int bitStart = 0;
for (CompiledAggregateFunction aggFunc : this.aggregateFunctions) {
int index = this.aggregateColsPos.nextSetBit(bitStart);
bitStart = index + 1;
Object[] projAtt = (Object[]) this.projAttrs.get(index);
projAtt[1] = aggFunc;
}
}
@Override
protected void doTreeTransformation(ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
if (!this.transformationDone) {
checkAllProjectedFieldsInGroupBy(context);
this.cachedElementTypeForOrderBy = prepareResultType(context);
if (this.groupBy != null && !this.groupBy.isEmpty()) {
this.modifyGroupByToOrderBy(false, context);
}
if (this.originalOrderByClause != null) {
this.mapOriginalOrderByColumns(context);
}
}
this.transformationDone = true;
}
private void mapOriginalOrderByColumns(ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
this.revertAggregateFunctionInProjection();
Iterator<CompiledSortCriterion> iter = this.originalOrderByClause.iterator();
while (iter.hasNext()) {
CompiledSortCriterion csc = iter.next();
if (!csc.mapExpressionToProjectionField(this.projAttrs, context)) {
throw new QueryInvalidException(
"Query contains atleast one order by field which is not present in projected fields.");
}
}
this.replaceAggregateFunctionInProjection();
}
@Override
public SelectResults evaluate(ExecutionContext context) throws FunctionDomainException,
TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
SelectResults selectResults = super.evaluate(context);
QueryObserverHolder.getInstance().beforeAggregationsAndGroupBy(selectResults);
return this.applyAggregateAndGroupBy(selectResults, context);
}
public SelectResults applyAggregateAndGroupBy(SelectResults baseResults, ExecutionContext context)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
QueryInvocationTargetException {
ObjectType elementType = baseResults.getCollectionType().getElementType();
boolean isStruct = elementType != null && elementType.isStructType();
boolean isBucketNodes = context.getBucketList() != null;
boolean createOrderedResultSet = isBucketNodes && this.orderByAttrs != null;
boolean[] objectChangedMarker = new boolean[] {false};
int limitValue = evaluateLimitValue(context, limit);
SelectResults newResults =
createResultSet(context, elementType, isStruct, createOrderedResultSet);
Aggregator[] aggregators = new Aggregator[this.aggregateFunctions.length];
refreshAggregators(aggregators, context);
if (this.orderByAttrs != null) {
applyGroupBy(baseResults, context, isStruct, newResults, aggregators, !createOrderedResultSet,
objectChangedMarker, limitValue);
} else {
Iterator iter = baseResults.iterator();
Object current = null;
boolean unterminated = iter.hasNext();
while (iter.hasNext()) {
current = iter.next();
accumulate(isStruct, aggregators, current, objectChangedMarker);
}
if (unterminated) {
this.terminateAndAddToResults(isStruct, newResults, aggregators, current, context,
!createOrderedResultSet, limitValue);
}
}
return newResults;
}
private SelectResults createResultSet(ExecutionContext context, ObjectType elementType,
boolean isStruct, boolean createOrderedResults) {
elementType = createNewElementType(elementType, isStruct);
SelectResults newResults;
// boolean isBucketNodes = context.getBucketList() != null;
boolean isPrQueryNode = context.getIsPRQueryNode();
// If it is bucket nodes query, we need to return ordered data
if (isStruct) {
if (createOrderedResults) {
newResults = new SortedResultsBag((StructTypeImpl) elementType, true);
} else {
if (this.originalOrderByClause != null) {
Comparator comparator =
new OrderByComparator(this.originalOrderByClause, elementType, context);
newResults = new SortedStructBag(comparator, (StructType) elementType,
!this.originalOrderByClause.get(0).getCriterion());
} else {
newResults =
QueryUtils.createStructCollection(this.isDistinct, (StructType) elementType, context);
}
}
} else {
if (createOrderedResults) {
newResults = new SortedResultsBag(elementType, true);
} else {
if (this.originalOrderByClause != null) {
Comparator comparator =
new OrderByComparator(this.originalOrderByClause, elementType, context);
newResults = new SortedResultsBag(comparator, elementType,
!this.originalOrderByClause.get(0).getCriterion());
} else {
newResults = QueryUtils.createResultCollection(this.isDistinct, elementType, context);
}
}
}
return newResults;
}
private ObjectType createNewElementType(ObjectType elementType, boolean isStruct) {
if (isStruct) {
StructType oldType = (StructType) elementType;
if (this.aggregateFunctions.length > 0) {
ObjectType[] oldFieldTypes = oldType.getFieldTypes();
ObjectType[] newFieldTypes = new ObjectType[oldFieldTypes.length];
int i = 0;
int aggFuncIndex = 0;
for (ObjectType oldFieldType : oldFieldTypes) {
if (this.aggregateColsPos.get(i)) {
newFieldTypes[i] = this.aggregateFunctions[aggFuncIndex++].getObjectType();
} else {
newFieldTypes[i] = oldFieldType;
}
++i;
}
return new StructTypeImpl(oldType.getFieldNames(), newFieldTypes);
} else {
return oldType;
}
} else {
return this.aggregateFunctions.length > 0 ? this.aggregateFunctions[0].getObjectType()
: elementType;
}
}
private void applyGroupBy(SelectResults baseResults, ExecutionContext context, boolean isStruct,
SelectResults newResults, Aggregator[] aggregators, boolean isStructFields,
boolean[] objectChangedMarker, int limitValue) throws FunctionDomainException,
TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
Iterator iter = baseResults.iterator();
Object[] orderByTupleHolderCurrent = null;
Object[] orderByTupleHolderPrev = null;
Object orderByCurrent = null;
Object orderByPrev = null;
boolean isSingleOrderBy = this.orderByAttrs.size() <= 1;
if (!isSingleOrderBy) {
orderByTupleHolderPrev = new Object[orderByAttrs.size()];
orderByTupleHolderCurrent = new Object[orderByAttrs.size()];
}
boolean isFirst = true;
Object prev = null;
boolean unterminated = false;
boolean keepAdding = true;
while (iter.hasNext() && keepAdding) {
Object current = iter.next();
if (isSingleOrderBy) {
orderByCurrent = this.getOrderByEvaluatedTuple(context, isSingleOrderBy, null,
isStruct ? ((Struct) current).getFieldValues() : current, objectChangedMarker);
} else {
orderByTupleHolderCurrent = (Object[]) this.getOrderByEvaluatedTuple(context,
isSingleOrderBy, orderByTupleHolderCurrent,
isStruct ? ((Struct) current).getFieldValues() : current, objectChangedMarker);
}
if (isFirst || areOrderByTupleEqual(isSingleOrderBy, orderByPrev, orderByCurrent,
orderByTupleHolderPrev, orderByTupleHolderCurrent)) {
accumulate(isStruct, aggregators, current, objectChangedMarker);
unterminated = true;
isFirst = false;
} else {
keepAdding = terminateAndAddToResults(isStruct, newResults, aggregators, prev, context,
isStructFields, limitValue);
this.accumulate(isStruct, aggregators, current, objectChangedMarker);
unterminated = true;
}
// swap the holder arrays
Object[] temp = orderByTupleHolderCurrent;
orderByTupleHolderCurrent = orderByTupleHolderPrev;
orderByTupleHolderPrev = temp;
orderByPrev = orderByCurrent;
prev = current;
}
if (unterminated && keepAdding) {
this.terminateAndAddToResults(isStruct, newResults, aggregators, prev, context,
isStructFields, limitValue);
}
if (this.originalOrderByClause != null && limitValue > 0
&& (context.getIsPRQueryNode() || context.getBucketList() == null)) {
((Bag) newResults).applyLimit(limitValue);
}
}
private boolean terminateAndAddToResults(boolean isStruct, SelectResults newResults,
Aggregator[] aggregators, Object prev, ExecutionContext context, boolean isStrucFields,
int limitValue) throws FunctionDomainException, TypeMismatchException,
NameResolutionException, QueryInvocationTargetException {
Object[] newRowArray = isStruct ? copyStruct((Struct) prev) : null;
Object newObject = null;
int bitstart = 0;
if (limitValue == 0) {
return false;
}
for (Aggregator aggregator : aggregators) {
if (isStruct) {
int pos = this.aggregateColsPos.nextSetBit(bitstart);
bitstart = pos + 1;
Object scalarResult = aggregator.terminate();
newRowArray[pos] = scalarResult;
} else {
newObject = aggregator.terminate();
}
}
if (isStruct) {
if (isStrucFields) {
((StructFields) newResults).addFieldValues(newRowArray);
} else {
newResults
.add(new StructImpl((StructTypeImpl) ((Struct) prev).getStructType(), newRowArray));
}
} else {
newResults.add(newObject);
}
boolean keepAdding = true;
if (this.originalOrderByClause == null && limitValue > 0
&& (context.getIsPRQueryNode() || context.getBucketList() == null)
&& newResults.size() == limitValue) {
keepAdding = false;
}
// rfresh the aggregators
refreshAggregators(aggregators, context);
return keepAdding;
}
private void refreshAggregators(Aggregator[] aggregators, ExecutionContext context)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
QueryInvocationTargetException {
int i = 0;
for (CompiledAggregateFunction aggFunc : this.aggregateFunctions) {
Aggregator agg = (Aggregator) aggFunc.evaluate(context);
aggregators[i++] = agg;
}
}
private Object[] copyStruct(Struct struct) {
Object[] prevValues = struct.getFieldValues();
Object[] newRow = new Object[prevValues.length];
System.arraycopy(prevValues, 0, newRow, 0, prevValues.length);
return newRow;
}
private void accumulate(boolean isStruct, Aggregator[] aggregators, Object current,
boolean[] objectChangedMarker) {
int bitstart = 0;
for (Aggregator aggregator : aggregators) {
if (isStruct) {
int pos = this.aggregateColsPos.nextSetBit(bitstart);
bitstart = pos + 1;
Struct struct = (Struct) current;
Object scalar = PDXUtils.convertPDX(struct.getFieldValues()[pos], false, true, true, true,
objectChangedMarker, isStruct);
aggregator.accumulate(scalar);
} else {
current =
PDXUtils.convertPDX(current, false, true, true, true, objectChangedMarker, isStruct);
aggregator.accumulate(current);
}
}
}
private boolean areOrderByTupleEqual(boolean isSingleOrderBy, Object prev, Object current,
Object[] prevHolder, Object[] currentHolder) {
if (isSingleOrderBy) {
if (prev == null && current == null) {
return true;
} else if (prev != null) {
return prev.equals(current);
} else {
return current.equals(prev);
}
} else {
return Arrays.equals(prevHolder, currentHolder);
}
}
private Object getOrderByEvaluatedTuple(ExecutionContext context, boolean isOrderByTupleSingle,
Object[] holder, Object data, boolean[] objectChangedMarker) {
if (isOrderByTupleSingle) {
return PDXUtils.convertPDX(this.orderByAttrs.get(0).evaluate(data, context), false, true,
true, true, objectChangedMarker, false);
} else {
int i = 0;
for (CompiledSortCriterion csc : this.orderByAttrs) {
holder[i++] = PDXUtils.convertPDX(csc.evaluate(data, context), false, true, true, true,
objectChangedMarker, false);
}
return holder;
}
}
@Override
public boolean isGroupBy() {
return true;
}
private void checkAllProjectedFieldsInGroupBy(ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
int index = 0;
for (Object o : this.projAttrs) {
Object[] projElem = (Object[]) o;
// if the projection is aggregate expression skip validating
if (!this.aggregateColsPos.get(index)) {
if (!checkProjectionInGroupBy(projElem, context)) {
throw new QueryInvalidException(
"Query contains projected column not present in group by clause");
}
}
++index;
}
// check if all the group by fields are present in projected columns
if (this.groupBy != null) {
int numGroupCols = this.groupBy.size();
int numColsInProj = this.projAttrs.size();
numColsInProj -= this.aggregateFunctions.length;
if (numGroupCols != numColsInProj) {
throw new QueryInvalidException(
"Query contains group by columns not present in projected fields");
}
}
}
private boolean checkProjectionInGroupBy(Object[] projElem, ExecutionContext context)
throws AmbiguousNameException, TypeMismatchException, NameResolutionException {
boolean found = false;
StringBuilder projAttribBuffer = new StringBuilder();
CompiledValue cvProj = (CompiledValue) TypeUtils.checkCast(projElem[1], CompiledValue.class);
cvProj.generateCanonicalizedExpression(projAttribBuffer, context);
String projAttribStr = projAttribBuffer.toString();
if (this.groupBy != null) {
for (CompiledValue grpBy : this.groupBy) {
if (grpBy.getType() == OQLLexerTokenTypes.Identifier) {
if (projElem[0] != null && projElem[0].equals(((CompiledID) grpBy).getId())) {
found = true;
break;
}
}
// the grpup by expr is not an alias check for path
StringBuilder groupByExprBuffer = new StringBuilder();
grpBy.generateCanonicalizedExpression(groupByExprBuffer, context);
final String grpByExprStr = groupByExprBuffer.toString();
if (projAttribStr.equals(grpByExprStr)) {
found = true;
break;
}
}
}
return found;
}
}