blob: 874ff884f9426b7e9b3fd295dcb4a236b2e7e848 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.UUID;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.exceptions.PersistenceException;
import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.SortDirection;
import org.apache.usergrid.persistence.index.query.Query.SortPredicate;
import org.apache.usergrid.persistence.index.query.tree.AndOperand;
import org.apache.usergrid.persistence.index.query.tree.ContainsOperand;
import org.apache.usergrid.persistence.index.query.tree.Equal;
import org.apache.usergrid.persistence.index.query.tree.EqualityOperand;
import org.apache.usergrid.persistence.index.query.tree.GreaterThan;
import org.apache.usergrid.persistence.index.query.tree.GreaterThanEqual;
import org.apache.usergrid.persistence.index.query.tree.LessThan;
import org.apache.usergrid.persistence.index.query.tree.LessThanEqual;
import org.apache.usergrid.persistence.index.query.tree.Literal;
import org.apache.usergrid.persistence.index.query.tree.NotOperand;
import org.apache.usergrid.persistence.index.query.tree.Operand;
import org.apache.usergrid.persistence.index.query.tree.OrOperand;
import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
import org.apache.usergrid.persistence.index.query.tree.StringLiteral;
import org.apache.usergrid.persistence.index.query.tree.WithinOperand;
import org.apache.usergrid.persistence.query.ir.AllNode;
import org.apache.usergrid.persistence.query.ir.AndNode;
import org.apache.usergrid.persistence.query.ir.EmailIdentifierNode;
import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
import org.apache.usergrid.persistence.query.ir.NotNode;
import org.apache.usergrid.persistence.query.ir.OrNode;
import org.apache.usergrid.persistence.query.ir.OrderByNode;
import org.apache.usergrid.persistence.query.ir.QueryNode;
import org.apache.usergrid.persistence.query.ir.QuerySlice;
import org.apache.usergrid.persistence.query.ir.SearchVisitor;
import org.apache.usergrid.persistence.query.ir.SliceNode;
import org.apache.usergrid.persistence.query.ir.UuidIdentifierNode;
import org.apache.usergrid.persistence.query.ir.WithinNode;
import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
import org.apache.usergrid.persistence.query.ir.result.ResultsLoader;
import org.apache.usergrid.persistence.query.ir.result.ResultsLoaderFactory;
import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueryProcessorImpl implements QueryProcessor {
public static final int PAGE_SIZE = 1000;
private static final Logger logger = LoggerFactory.getLogger( QueryProcessor.class );
private static final Schema SCHEMA = getDefaultSchema();
private final CollectionInfo collectionInfo;
private final EntityManager em;
private final ResultsLoaderFactory loaderFactory;
private Operand rootOperand;
private List<SortPredicate> sorts;
private CursorCache cursorCache;
private QueryNode rootNode;
private String entityType;
private int size;
private Query query;
private int pageSizeHint;
public QueryProcessorImpl( Query query, CollectionInfo collectionInfo, EntityManager em,
ResultsLoaderFactory loaderFactory ) throws PersistenceException {
setQuery( query );
this.collectionInfo = collectionInfo;
this.em = em;
this.loaderFactory = loaderFactory;
process();
}
public Query getQuery() {
return query;
}
public void setQuery( Query query ) {
this.sorts = query.getSortPredicates();
this.cursorCache = new CursorCache( query.getCursor() );
this.rootOperand = query.getRootOperand();
this.entityType = query.getEntityType();
this.size = query.getLimit();
this.query = query;
}
public CollectionInfo getCollectionInfo() {
return collectionInfo;
}
private void process() throws PersistenceException {
int opCount = 0;
// no operand. Check for sorts
if ( rootOperand != null ) {
// visit the tree
TreeEvaluator visitor = new TreeEvaluator();
rootOperand.visit( visitor );
rootNode = visitor.getRootNode();
opCount = visitor.getSliceCount();
}
// see if we have sorts, if so, we can add them all as a single node at
// the root
if ( sorts.size() > 0 ) {
OrderByNode order = generateSorts( opCount );
opCount += order.getFirstPredicate().getAllSlices().size();
rootNode = order;
}
//if we still don't have a root node, no query nor order by was specified,
// just use the all node or the identifiers
if ( rootNode == null ) {
//a name alias or email alias was specified
if ( query.containsSingleNameOrEmailIdentifier() ) {
Identifier ident = query.getSingleIdentifier();
//an email was specified. An edge case that only applies to users. This is fulgy to put here,
// but required
if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
rootNode = new EmailIdentifierNode( ident );
}
//use the ident with the default alias. could be an email
else {
rootNode = new NameIdentifierNode( ident.getName() );
}
}
//a uuid was specified
else if ( query.containsSingleUuidIdentifier() ) {
rootNode = new UuidIdentifierNode( query.getSingleUuidIdentifier() );
}
//nothing was specified, order it by uuid
else {
//this is a bit ugly, but how we handle the start parameter
UUID startResult = query.getStartResult();
boolean startResultSet = startResult != null;
AllNode allNode = new AllNode( 0, startResultSet );
if ( startResultSet ) {
cursorCache.setNextCursor( allNode.getSlice().hashCode(),
Serializers.ue.toByteBuffer( startResult ) );
}
rootNode = allNode;
}
}
if ( opCount > 1 ) {
pageSizeHint = PAGE_SIZE;
}
else {
pageSizeHint = Math.min( size, PAGE_SIZE );
}
}
public QueryNode getFirstNode() {
return rootNode;
}
/**
* Apply cursor position and sort order to this slice. This should only be invoke at evaluation time to ensure that
* the IR tree has already been fully constructed
*/
public void applyCursorAndSort( QuerySlice slice ) {
// apply the sort first, since this can change the hash code
SortPredicate sort = getSort( slice.getPropertyName() );
if ( sort != null ) {
boolean isReversed = sort.getDirection() == SortDirection.DESCENDING;
//we're reversing the direction of this slice, reverse the params as well
if ( isReversed != slice.isReversed() ) {
slice.reverse();
}
}
// apply the cursor
ByteBuffer cursor = cursorCache.getCursorBytes( slice.hashCode() );
if ( cursor != null ) {
slice.setCursor( cursor );
}
}
/**
* Return the node id from the cursor cache
* @param nodeId
* @return
*/
public ByteBuffer getCursorCache(int nodeId){
return cursorCache.getCursorBytes( nodeId );
}
private SortPredicate getSort( String propertyName ) {
for ( SortPredicate sort : sorts ) {
if ( sort.getPropertyName().equals( propertyName ) ) {
return sort;
}
}
return null;
}
/** Return the iterator results, ordered if required */
public Results getResults( SearchVisitor visitor ) throws Exception {
// if we have no order by just load the results
if ( rootNode == null ) {
return null;
}
rootNode.visit( visitor );
ResultIterator itr = visitor.getResults();
List<ScanColumn> entityIds = new ArrayList<ScanColumn>( Math.min( size, Query.MAX_LIMIT ) );
CursorCache resultsCursor = new CursorCache();
while ( entityIds.size() < size && itr.hasNext() ) {
entityIds.addAll( itr.next() );
}
//set our cursor, we paged through more entities than we want to return
if ( entityIds.size() > 0 ) {
int resultSize = Math.min( entityIds.size(), size );
entityIds = entityIds.subList( 0, resultSize );
if ( resultSize == size ) {
itr.finalizeCursor( resultsCursor, entityIds.get( resultSize - 1 ).getUUID() );
}
}
if (logger.isDebugEnabled()) {
logger.debug("Getting result for query: [{}], returning entityIds size: {}",
getQuery(), entityIds.size());
}
final ResultsLoader loader = loaderFactory.getResultsLoader( em, query, query.getResultsLevel() );
final Results results = loader.getResults( entityIds, query.getEntityType() );
if ( results == null ) {
return null;
}
// now we need to set the cursor from our tree evaluation for return
results.setCursor( resultsCursor.asString() );
results.setQuery( query );
results.setQueryProcessor( this );
results.setSearchVisitor( visitor );
return results;
}
private class TreeEvaluator implements QueryVisitor {
// stack for nodes that will be used to construct the tree and create
// objects
private CountingStack<QueryNode> nodes = new CountingStack<QueryNode>();
private int contextCount = -1;
/** Get the root node in our tree for runtime evaluation */
public QueryNode getRootNode() {
return nodes.peek();
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.AndOperand)
*/
@Override
public void visit( AndOperand op ) throws IndexException {
op.getLeft().visit( this );
QueryNode leftResult = nodes.peek();
op.getRight().visit( this );
QueryNode rightResult = nodes.peek();
// if the result of the left and right are the same, we don't want
// to create an AND. We'll use the same SliceNode. Do nothing
if ( leftResult == rightResult ) {
return;
}
// otherwise create a new AND node from the result of the visit
QueryNode right = nodes.pop();
QueryNode left = nodes.pop();
AndNode newNode = new AndNode( left, right );
nodes.push( newNode );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.OrOperand)
*/
@Override
public void visit( OrOperand op ) throws IndexException {
// we need to create a new slicenode for the children of this
// operation
Operand left = op.getLeft();
Operand right = op.getRight();
// we only create a new slice node if our children are && and ||
// operations
createNewSlice( left );
left.visit( this );
// we only create a new slice node if our children are && and ||
// operations
createNewSlice( right );
right.visit( this );
QueryNode rightResult = nodes.pop();
QueryNode leftResult = nodes.pop();
// rewrite with the new Or operand
OrNode orNode = new OrNode( leftResult, rightResult, ++contextCount );
nodes.push( orNode );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.NotOperand)
*/
@Override
public void visit( NotOperand op ) throws IndexException {
// create a new context since any child of NOT will need to be
// evaluated independently
Operand child = op.getOperation();
createNewSlice( child );
child.visit( this );
nodes.push( new NotNode( nodes.pop(), new AllNode( ++contextCount, false ) ) );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.ContainsOperand)
*/
@Override
public void visit( ContainsOperand op ) throws NoFullTextIndexException {
String propertyName = op.getProperty().getValue();
if ( !SCHEMA.isPropertyFulltextIndexed( entityType, propertyName ) ) {
throw new NoFullTextIndexException( entityType, propertyName );
}
StringLiteral string = op.getString();
String indexName = op.getProperty().getIndexedValue();
SliceNode node = null;
// sdg - if left & right have same field name, we need to create a new
// slice
if ( !nodes.isEmpty() && nodes.peek() instanceof SliceNode
&& ( ( SliceNode ) nodes.peek() ).getSlice( indexName ) != null ) {
node = newSliceNode();
}
else {
node = getUnionNode( op );
}
String fieldName = op.getProperty().getIndexedValue();
node.setStart( fieldName, string.getValue(), true );
node.setFinish( fieldName, string.getEndValue(), true );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.WithinOperand)
*/
@Override
public void visit( WithinOperand op ) {
// change the property name to coordinates
nodes.push( new WithinNode( op.getProperty().getIndexedName(), op.getDistance().getFloatValue(),
op.getLatitude().getFloatValue(), op.getLongitude().getFloatValue(), ++contextCount ) );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.LessThan)
*/
@Override
public void visit( LessThan op ) throws NoIndexException {
String propertyName = op.getProperty().getValue();
checkIndexed( propertyName );
getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), false );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.LessThanEqual)
*/
@Override
public void visit( LessThanEqual op ) throws NoIndexException {
String propertyName = op.getProperty().getValue();
checkIndexed( propertyName );
getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), true );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.Equal)
*/
@Override
public void visit( Equal op ) throws NoIndexException {
String fieldName = op.getProperty().getValue();
checkIndexed( fieldName );
Literal<?> literal = op.getLiteral();
SliceNode node = getUnionNode( op );
// this is an edge case. If we get more edge cases, we need to push
// this down into the literals and let the objects
// handle this
if ( literal instanceof StringLiteral ) {
StringLiteral stringLiteral = ( StringLiteral ) literal;
String endValue = stringLiteral.getEndValue();
if ( endValue != null ) {
node.setFinish( fieldName, endValue, true );
}
}
else {
node.setFinish( fieldName, literal.getValue(), true );
}
node.setStart( fieldName, literal.getValue(), true );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.GreaterThan)
*/
@Override
public void visit( GreaterThan op ) throws NoIndexException {
String propertyName = op.getProperty().getValue();
checkIndexed( propertyName );
getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), false );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
* .persistence.query.tree.GreaterThanEqual)
*/
@Override
public void visit( GreaterThanEqual op ) throws NoIndexException {
String propertyName = op.getProperty().getValue();
checkIndexed( propertyName );
getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), true );
}
/**
* Return the current leaf node to add to if it exists. This means that we can compress multiple 'AND'
* operations and ranges into a single node. Otherwise a new node is created and pushed to the stack
*
* @param current The current operand node
*/
private SliceNode getUnionNode( EqualityOperand current ) {
/**
* we only create a new slice node in 3 situations 1. No nodes exist 2.
* The parent node is not an AND node. Meaning we can't add this slice to
* the current set of slices 3. Our current top of stack is not a slice
* node.
*/
// no nodes exist
if ( nodes.size() == 0 || !( nodes.peek() instanceof SliceNode ) ) {
return newSliceNode();
}
return ( SliceNode ) nodes.peek();
}
/** The new slice node */
private SliceNode newSliceNode() {
SliceNode sliceNode = new SliceNode( ++contextCount );
nodes.push( sliceNode );
return sliceNode;
}
/** Create a new slice if one will be required within the context of this node */
private void createNewSlice( Operand child ) {
if ( child instanceof EqualityOperand || child instanceof AndOperand || child instanceof ContainsOperand ) {
newSliceNode();
}
}
public int getSliceCount() {
return nodes.getSliceCount();
}
@Override
public QueryBuilder getQueryBuilder() {
throw new UnsupportedOperationException("Not supported by this vistor implementation.");
}
@Override
public FilterBuilder getFilterBuilder() {
throw new UnsupportedOperationException("Not supported by this vistor implementation.");
}
}
private static class CountingStack<T> extends Stack<T> {
private int count = 0;
private static final long serialVersionUID = 1L;
/* (non-Javadoc)
* @see java.util.Stack#pop()
*/
@Override
public synchronized T pop() {
T entry = super.pop();
if ( entry instanceof SliceNode ) {
count += ( ( SliceNode ) entry ).getAllSlices().size();
}
return entry;
}
public int getSliceCount() {
Iterator<T> itr = this.iterator();
T entry;
while ( itr.hasNext() ) {
entry = itr.next();
if ( entry instanceof SliceNode ) {
count += ( ( SliceNode ) entry ).getAllSlices().size();
}
}
return count;
}
}
/** @return the pageSizeHint */
public int getPageSizeHint( QueryNode node ) {
/*****
* DO NOT REMOVE THIS PIECE OF CODE!!!!!!!!!!!
* It is crucial that the root iterator only needs the result set size per page
* otherwise our cursor logic will fail when passing cursor data to the leaf nodes
*******/
if(node == rootNode){
return size;
}
return pageSizeHint;
}
/** Generate a slice node with scan ranges for all the properties in our sort cache */
private OrderByNode generateSorts( int opCount ) throws NoIndexException {
// the value is irrelevant since we'll only ever have 1 slice node
// if this is called
SliceNode slice = new SliceNode( opCount );
SortPredicate first = sorts.get( 0 );
String propertyName = first.getPropertyName();
checkIndexed( propertyName );
slice.setStart( propertyName, null, true );
slice.setFinish( propertyName, null, true );
for ( int i = 1; i < sorts.size(); i++ ) {
checkIndexed( sorts.get( i ).getPropertyName() );
}
return new OrderByNode( slice, sorts.subList( 1, sorts.size() ), rootNode );
}
private void checkIndexed( String propertyName ) throws NoIndexException {
if ( propertyName == null || propertyName.isEmpty() || ( !SCHEMA.isPropertyIndexed( entityType, propertyName )
&& collectionInfo != null ) ) {
throw new NoIndexException( entityType, propertyName );
}
}
public EntityManager getEntityManager() {
return em;
}
}