blob: ee4aea0a3d19a615599fbb919b8953c98d14c27c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.mq.cassandra.io;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.mq.QueryProcessor;
import org.apache.usergrid.mq.QueryProcessor.QuerySlice;
import org.apache.usergrid.mq.QueueQuery;
import org.apache.usergrid.mq.QueueResults;
import com.fasterxml.uuid.UUIDComparator;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
import static org.apache.usergrid.mq.Queue.getQueueId;
import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.DEFAULT_SEARCH_COUNT;
import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.QUEUE_SHARD_INTERVAL;
import static org.apache.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import static org.apache.usergrid.utils.CompositeUtils.setEqualityFlag;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.NumberUtils.roundLong;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
/**
* Searches in the queue without transactions
*
* @author tnine
*/
public class FilterSearch extends NoTransactionSearch
{
private static final Logger logger = LoggerFactory.getLogger( FilterSearch.class );
/**
*
*/
public FilterSearch( Keyspace ko )
{
super( ko );
}
/*
* (non-Javadoc)
*
* @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
* org.apache.usergrid.mq.QueueQuery)
*/
@Override
public QueueResults getResults( String queuePath, QueueQuery query )
{
QueryProcessor qp = new QueryProcessor( query );
List<QuerySlice> slices = qp.getSlices();
long limit = query.getLimit();
UUID queueId = getQueueId( queuePath );
UUID consumerId = getConsumerId( queueId, query );
QueueBounds bounds = getQueueBounds( queueId );
UUIDComparator comparator = query.isReversed() ? new ReverseUUIDComparator() : new UUIDComparator();
SortedSet<UUID> merged = null;
for ( QuerySlice slice : slices )
{
SortedSet<UUID> results =
searchQueueRange( ko, queueId, bounds, slice, query.getLastMessageId(), query.isReversed(),
comparator );
if ( merged == null )
{
merged = results;
}
else
{
merged.retainAll( results );
}
}
// now trim. Not efficient, but when indexing is updated, seeking will
// change, so I'm not worried about this.
if ( merged.size() > limit )
{
Iterator<UUID> current = merged.iterator();
UUID max = null;
for ( int i = 0; i <= limit && current.hasNext(); i++ )
{
max = current.next();
}
merged = merged.headSet( max );
}
List<Message> messages = loadMessages( merged, query.isReversed() );
QueueResults results = createResults( messages, queuePath, queueId, consumerId );
return results;
}
public SortedSet<UUID> searchQueueRange( Keyspace ko, UUID queueId, QueueBounds bounds, QuerySlice slice, UUID last,
boolean reversed, UUIDComparator comparator )
{
TreeSet<UUID> uuid_set = new TreeSet<UUID>( comparator );
if ( bounds == null )
{
logger.error( "Necessary queue bounds not found" );
return uuid_set;
}
UUID start_uuid = reversed ? bounds.getNewest() : bounds.getOldest();
UUID finish_uuid = reversed ? bounds.getOldest() : bounds.getNewest();
if ( last != null )
{
start_uuid = last;
}
if ( finish_uuid == null )
{
logger.error( "No last message in queue" );
return uuid_set;
}
long start_ts_shard = roundLong( getTimestampInMillis( start_uuid ), QUEUE_SHARD_INTERVAL );
long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
long current_ts_shard = start_ts_shard;
if ( reversed )
{
current_ts_shard = finish_ts_shard;
}
ByteBuffer start = null;
if ( slice.getCursor() != null )
{
start = slice.getCursor();
}
else if ( slice.getStart() != null )
{
DynamicComposite s = new DynamicComposite( slice.getStart().getCode(), slice.getStart().getValue() );
if ( !slice.getStart().isInclusive() )
{
setEqualityFlag( s, ComponentEquality.GREATER_THAN_EQUAL );
}
start = s.serialize();
}
ByteBuffer finish = null;
if ( slice.getFinish() != null )
{
DynamicComposite f = new DynamicComposite( slice.getFinish().getCode(), slice.getFinish().getValue() );
if ( slice.getFinish().isInclusive() )
{
setEqualityFlag( f, ComponentEquality.GREATER_THAN_EQUAL );
}
finish = f.serialize();
}
while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) && ( uuid_set.size()
< DEFAULT_SEARCH_COUNT ) )
{
while ( true )
{
List<HColumn<ByteBuffer, ByteBuffer>> results =
createSliceQuery( ko, be, be, be ).setColumnFamily( PROPERTY_INDEX.getColumnFamily() )
.setKey( bytebuffer( key( queueId, current_ts_shard, slice.getPropertyName() ) ) )
.setRange( start, finish, false, DEFAULT_SEARCH_COUNT ).execute().get().getColumns();
for ( HColumn<ByteBuffer, ByteBuffer> column : results )
{
DynamicComposite c = DynamicComposite.fromByteBuffer( column.getName().duplicate() );
UUID uuid = c.get( 2, ue );
uuid_set.add( uuid );
}
if ( results.size() < DEFAULT_SEARCH_COUNT )
{
break;
}
start = results.get( results.size() - 1 ).getName().duplicate();
}
if ( reversed )
{
current_ts_shard -= QUEUE_SHARD_INTERVAL;
}
else
{
current_ts_shard += QUEUE_SHARD_INTERVAL;
}
}
// trim the results
return uuid_set.headSet( finish_uuid ).tailSet( start_uuid );
}
private static class ReverseUUIDComparator extends UUIDComparator
{
/*
* (non-Javadoc)
*
* @see com.fasterxml.uuid.UUIDComparator#compare(java.util.UUID,
* java.util.UUID)
*/
@Override
public int compare( UUID u1, UUID u2 )
{
return super.compare( u1, u2 ) * -1;
}
}
}