blob: 97a6a420dea6279c2849615c83b931d333275fdc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.plan;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
import org.apache.pinot.common.request.context.predicate.TextContainsPredicate;
import org.apache.pinot.common.request.context.predicate.TextMatchPredicate;
import org.apache.pinot.core.geospatial.transform.function.StDistanceFunction;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
import org.apache.pinot.core.operator.filter.ExpressionFilterOperator;
import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
import org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator;
import org.apache.pinot.core.operator.filter.H3IndexFilterOperator;
import org.apache.pinot.core.operator.filter.JsonMatchFilterOperator;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.operator.filter.TextContainsFilterOperator;
import org.apache.pinot.core.operator.filter.TextMatchFilterOperator;
import org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEvaluatorFactory;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex;
import org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
public class FilterPlanNode implements PlanNode {
private final IndexSegment _indexSegment;
private final QueryContext _queryContext;
private final FilterContext _filter;
// Cache the predicate evaluators
private final List<Pair<Predicate, PredicateEvaluator>> _predicateEvaluators = new ArrayList<>(4);
public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
this(indexSegment, queryContext, null);
}
public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, @Nullable FilterContext filter) {
_indexSegment = indexSegment;
_queryContext = queryContext;
_filter = filter;
}
@Override
public BaseFilterOperator run() {
// NOTE: Snapshot the validDocIds before reading the numDocs to prevent the latest updates getting lost
ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
MutableRoaringBitmap validDocIdsSnapshot =
validDocIds != null && !_queryContext.isSkipUpsert() ? validDocIds.getMutableRoaringBitmap() : null;
int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
FilterContext filter = _filter != null ? _filter : _queryContext.getFilter();
if (filter != null) {
BaseFilterOperator filterOperator = constructPhysicalOperator(filter, numDocs);
if (validDocIdsSnapshot != null) {
BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
return FilterOperatorUtils.getAndFilterOperator(_queryContext, Arrays.asList(filterOperator, validDocFilter),
numDocs);
} else {
return filterOperator;
}
} else if (validDocIdsSnapshot != null) {
return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
} else {
return new MatchAllFilterOperator(numDocs);
}
}
/**
* Returns a mapping from predicates to their evaluators.
*/
public List<Pair<Predicate, PredicateEvaluator>> getPredicateEvaluators() {
return _predicateEvaluators;
}
/**
* H3 index can be applied on ST_Distance iff:
* <ul>
* <li>Predicate is of type RANGE</li>
* <li>Left-hand-side of the predicate is an ST_Distance function</li>
* <li>One argument of the ST_Distance function is an identifier, the other argument is an literal</li>
* <li>The identifier column has H3 index</li>
* </ul>
*/
private boolean canApplyH3IndexForDistanceCheck(Predicate predicate, FunctionContext function) {
if (predicate.getType() != Predicate.Type.RANGE) {
return false;
}
String functionName = function.getFunctionName();
if (!functionName.equals("st_distance") && !functionName.equals("stdistance")) {
return false;
}
List<ExpressionContext> arguments = function.getArguments();
if (arguments.size() != 2) {
throw new BadQueryRequestException("Expect 2 arguments for function: " + StDistanceFunction.FUNCTION_NAME);
}
// TODO: handle nested geography/geometry conversion functions
String columnName = null;
boolean findLiteral = false;
for (ExpressionContext argument : arguments) {
if (argument.getType() == ExpressionContext.Type.IDENTIFIER) {
columnName = argument.getIdentifier();
} else if (argument.getType() == ExpressionContext.Type.LITERAL) {
findLiteral = true;
}
}
return columnName != null && _indexSegment.getDataSource(columnName).getH3Index() != null && findLiteral;
}
/**
* H3 index can be applied for inclusion check iff:
* <ul>
* <li>Predicate is of type EQ</li>
* <li>Left-hand-side of the predicate is an ST_Within or ST_Contains function</li>
* <li>For ST_Within, the first argument is an identifier, the second argument is literal</li>
* <li>For ST_Contains function the first argument is literal, the second argument is an identifier</li>
* <li>The identifier column has H3 index</li>
* </ul>
*/
private boolean canApplyH3IndexForInclusionCheck(Predicate predicate, FunctionContext function) {
if (predicate.getType() != Predicate.Type.EQ) {
return false;
}
String functionName = function.getFunctionName();
if (!functionName.equals("stwithin") && !functionName.equals("stcontains")) {
return false;
}
List<ExpressionContext> arguments = function.getArguments();
if (arguments.size() != 2) {
throw new BadQueryRequestException("Expect 2 arguments for function: " + functionName);
}
// TODO: handle nested geography/geometry conversion functions
if (functionName.equals("stwithin")) {
if (arguments.get(0).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(1).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(0).getIdentifier();
return _indexSegment.getDataSource(columnName).getH3Index() != null;
}
return false;
} else {
if (arguments.get(1).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(0).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(1).getIdentifier();
return _indexSegment.getDataSource(columnName).getH3Index() != null;
}
return false;
}
}
/**
* Helper method to build the operator tree from the filter.
*/
private BaseFilterOperator constructPhysicalOperator(FilterContext filter, int numDocs) {
switch (filter.getType()) {
case AND:
List<FilterContext> childFilters = filter.getChildren();
List<BaseFilterOperator> childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultEmpty()) {
// Return empty filter operator if any of the child filter operator's result is empty
return EmptyFilterOperator.getInstance();
} else if (!childFilterOperator.isResultMatchingAll()) {
// Remove child filter operators that match all records
childFilterOperators.add(childFilterOperator);
}
}
return FilterOperatorUtils.getAndFilterOperator(_queryContext, childFilterOperators, numDocs);
case OR:
childFilters = filter.getChildren();
childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultMatchingAll()) {
// Return match all filter operator if any of the child filter operator matches all records
return new MatchAllFilterOperator(numDocs);
} else if (!childFilterOperator.isResultEmpty()) {
// Remove child filter operators whose result is empty
childFilterOperators.add(childFilterOperator);
}
}
return FilterOperatorUtils.getOrFilterOperator(_queryContext, childFilterOperators, numDocs);
case NOT:
childFilters = filter.getChildren();
assert childFilters.size() == 1;
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilters.get(0), numDocs);
return FilterOperatorUtils.getNotFilterOperator(_queryContext, childFilterOperator, numDocs);
case PREDICATE:
Predicate predicate = filter.getPredicate();
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
if (canApplyH3IndexForDistanceCheck(predicate, lhs.getFunction())) {
return new H3IndexFilterOperator(_indexSegment, predicate, numDocs);
} else if (canApplyH3IndexForInclusionCheck(predicate, lhs.getFunction())) {
return new H3InclusionIndexFilterOperator(_indexSegment, predicate, _queryContext, numDocs);
} else {
// TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL,
// IS_NOT_NULL, TEXT_MATCH)
return new ExpressionFilterOperator(_indexSegment, predicate, numDocs);
}
} else {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column);
PredicateEvaluator predicateEvaluator;
switch (predicate.getType()) {
case TEXT_CONTAINS:
TextIndexReader textIndexReader = dataSource.getTextIndex();
if (!(textIndexReader instanceof NativeTextIndexReader)
&& !(textIndexReader instanceof NativeMutableTextIndex)) {
throw new UnsupportedOperationException("TEXT_CONTAINS is supported only on native text index");
}
return new TextContainsFilterOperator(textIndexReader, (TextContainsPredicate) predicate, numDocs);
case TEXT_MATCH:
textIndexReader = dataSource.getTextIndex();
Preconditions
.checkState(textIndexReader != null, "Cannot apply TEXT_MATCH on column: %s without text index",
column);
// We could check for real time and segment Lucene reader, but easier to check the other way round
if (textIndexReader instanceof NativeTextIndexReader
|| textIndexReader instanceof NativeMutableTextIndex) {
throw new UnsupportedOperationException("TEXT_MATCH is not supported on native text index");
}
return new TextMatchFilterOperator(textIndexReader, (TextMatchPredicate) predicate, numDocs);
case REGEXP_LIKE:
// FST Index is available only for rolled out segments. So, we use different evaluator for rolled out and
// consuming segments.
//
// Rolled out segments (immutable): FST Index reader is available use FSTBasedEvaluator
// else use regular flow of getting predicate evaluator.
//
// Consuming segments: When FST is enabled, use AutomatonBasedEvaluator so that regexp matching logic is
// similar to that of FSTBasedEvaluator, else use regular flow of getting predicate evaluator.
if (dataSource.getFSTIndex() != null) {
predicateEvaluator =
FSTBasedRegexpPredicateEvaluatorFactory.newFSTBasedEvaluator((RegexpLikePredicate) predicate,
dataSource.getFSTIndex(), dataSource.getDictionary());
} else {
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource.getDictionary(),
dataSource.getDataSourceMetadata().getDataType());
}
_predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs,
_queryContext.isNullHandlingEnabled());
case JSON_MATCH:
JsonIndexReader jsonIndex = dataSource.getJsonIndex();
Preconditions.checkState(jsonIndex != null, "Cannot apply JSON_MATCH on column: %s without json index",
column);
return new JsonMatchFilterOperator(jsonIndex, (JsonMatchPredicate) predicate, numDocs);
case IS_NULL:
NullValueVectorReader nullValueVector = dataSource.getNullValueVector();
if (nullValueVector != null) {
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
} else {
return EmptyFilterOperator.getInstance();
}
case IS_NOT_NULL:
nullValueVector = dataSource.getNullValueVector();
if (nullValueVector != null) {
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
} else {
return new MatchAllFilterOperator(numDocs);
}
default:
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource.getDictionary(),
dataSource.getDataSourceMetadata().getDataType());
_predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs,
_queryContext.isNullHandlingEnabled());
}
}
default:
throw new IllegalStateException();
}
}
}