blob: 667992c7b1a8e5001b9e3c61047b984c3f34e1db [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.usergrid.persistence.core.astyanax;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.query.RowSliceQuery;
import com.netflix.astyanax.util.RangeBuilder;
/**
*
*
*/
public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
private static final Logger LOG = LoggerFactory.getLogger( MultiRowColumnIterator.class );
private final int pageSize;
private final ColumnFamily<R, C> cf;
private final ColumnParser<C, T> columnParser;
private final ColumnSearch<T> columnSearch;
private final Comparator<T> comparator;
private final Collection<R> rowKeys;
private final Keyspace keyspace;
private final ConsistencyLevel consistencyLevel;
private T startColumn;
private boolean moreToReturn;
private Iterator<T> currentColumnIterator;
/**
* Remove after finding bug
*/
// private int advanceCount;
//
// private final HashMap<T, SeekPosition> seenResults;
/**
* Complete Remove
*/
/**
* Create the iterator
*/
public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
final Collection<R> rowKeys, final int pageSize ) {
this.cf = cf;
this.pageSize = pageSize;
this.columnParser = columnParser;
this.columnSearch = columnSearch;
this.comparator = comparator;
this.rowKeys = rowKeys;
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.moreToReturn = true;
// seenResults = new HashMap<>( pageSize * 10 );
}
@Override
public boolean hasNext() {
if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
advance();
}
return currentColumnIterator.hasNext();
}
@Override
public T next() {
if ( !hasNext() ) {
throw new NoSuchElementException( "No new element exists" );
}
final T next = currentColumnIterator.next();
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException( "Remove is unsupported this is a read only iterator" );
}
public void advance() {
/**
* If the edge is present, we need to being seeking from this
*/
final boolean skipFirstColumn = startColumn != null;
//TODO, finalize why this isn't working as expected
final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
final RangeBuilder rangeBuilder = new RangeBuilder();
//set the range into the search
if ( startColumn == null ) {
columnSearch.buildRange( rangeBuilder );
}
else {
columnSearch.buildRange( rangeBuilder, startColumn );
}
rangeBuilder.setLimit( selectSize );
/**
* Get our list of slices
*/
final RowSliceQuery<R, C> query =
keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
.withColumnRange( rangeBuilder.build() );
final Rows<R, C> result;
try {
result = query.execute().getResult();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to casandra", e );
}
//now aggregate them together
//this is an optimization. It's faster to see if we only have values for one row,
// then return the iterator of those columns than
//do a merge if only one row has data.
final List<T> mergedResults;
if ( containsSingleRowOnly( result ) ) {
mergedResults = singleRowResult( result );
}
else {
mergedResults = mergeResults( result, selectSize );
}
//we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another
//trip back to cassandra
//discard our first element (maybe)
final int size = mergedResults.size();
moreToReturn = size == selectSize;
//we have a first column to to check
if( size > 0) {
final T firstResult = mergedResults.get( 0 );
//The search has either told us to skip the first element, or it matches our last, therefore we disregard it
if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
mergedResults.remove( 0 );
}
}
if(moreToReturn && mergedResults.size() > 0){
startColumn = mergedResults.get( mergedResults.size() - 1 );
}
currentColumnIterator = mergedResults.iterator();
LOG.trace( "Finished parsing {} rows for results", rowKeys.size() );
}
/**
* Return true if we have < 2 rows with columns, false otherwise
*/
private boolean containsSingleRowOnly( final Rows<R, C> result ) {
int count = 0;
for ( R key : result.getKeys() ) {
if ( result.getRow( key ).getColumns().size() > 0 ) {
count++;
//we have more than 1 row with values, return them
if ( count > 1 ) {
return false;
}
}
}
return true;
}
/**
* A single row is present, only parse the single row
* @param result
* @return
*/
private List<T> singleRowResult( final Rows<R, C> result ) {
for ( R key : result.getKeys() ) {
final ColumnList<C> columnList = result.getRow( key ).getColumns();
final int size = columnList.size();
if ( size > 0 ) {
final List<T> results = new ArrayList<>(size);
for(Column<C> column: columnList){
results.add(columnParser.parseColumn( column ));
}
return results;
}
}
//we didn't have any results, just return nothing
return Collections.<T>emptyList();
}
/**
* Multiple rows are present, merge them into a single result set
* @param result
* @return
*/
private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
final List<T> mergedResults = new ArrayList<>(maxSize);
for ( final R key : result.getKeys() ) {
final ColumnList<C> columns = result.getRow( key ).getColumns();
for (final Column<C> column :columns ) {
final T returnedValue = columnParser.parseColumn( column );
//Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator );
/**
* DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
* you'll
* need to enable this
*/
//
// if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) {
// throw new RuntimeException( String.format(
// "Cassandra returned 2 unique columns,
// but your comparator marked them as equal. This " +
// "indicates a bug in your comparator. Previous value was %s and
// current value is " +
// "%s",
// previous, returnedValue ) );
// }
//
// previous = returnedValue;
//we've already seen it, no-op
if(searchIndex > -1){
continue;
}
final int insertIndex = (searchIndex+1)*-1;
//it's at the end of the list, don't bother inserting just to remove it
if(insertIndex >= maxSize){
continue;
}
mergedResults.add( insertIndex, returnedValue );
//prune the mergedResults
while ( mergedResults.size() > maxSize ) {
//just remove from our tail until the size falls to the correct value
mergedResults.remove(mergedResults.size()-1);
}
}
LOG.trace( "Candidate result set size is {}", mergedResults.size() );
}
return mergedResults;
}
/**
* Iterator wrapper that parses as it iterates for single row cases
*/
private class SingleRowIterator implements Iterator<T> {
private Iterator<Column<C>> columnIterator;
private SingleRowIterator (final ColumnList<C> columns){
this.columnIterator = columns.iterator();
}
@Override
public boolean hasNext() {
return columnIterator.hasNext();
}
@Override
public T next() {
return columnParser.parseColumn( columnIterator.next() );
}
@Override
public void remove() {
throw new UnsupportedOperationException( "Unable to remove single row" );
}
}
}