blob: ec5d1a601a0f510e46c344181dded2713afd0d53 [file] [log] [blame]
/*******************************************************************************
* Copyright 2012 Apigee Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.usergrid.persistence.query.ir.result;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.usergrid.persistence.cassandra.CursorCache;
import org.usergrid.utils.UUIDUtils;
/**
* Simple iterator to perform Unions
*
* @author tnine
*/
public class UnionIterator extends MultiIterator {
private static final ScanColumnComparator COMP = new ScanColumnComparator();
private SortedColumnList list;
/**
* @param pageSize
*/
public UnionIterator( int pageSize ) {
super( pageSize );
list = new SortedColumnList( pageSize );
}
/*
* (non-Javadoc)
*
* @see org.usergrid.persistence.query.ir.result.MergeIterator#advance()
*/
@Override
protected Set<ScanColumn> advance() {
int size = iterators.size();
if ( size == 0 ) {
return null;
}
list.clear();
for ( ResultIterator itr : iterators ) {
while ( itr.hasNext() ) {
list.addAll( itr.next() );
}
itr.reset();
}
//mark us for the next page
list.mark();
return list.asSet();
}
/*
* (non-Javadoc)
*
* @see
* org.usergrid.persistence.query.ir.result.ResultIterator#finalizeCursor(
* org.usergrid.persistence.cassandra.CursorCache)
*/
@Override
public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
//get our scan column and put them in the cache
//we finalize the cursor of the min
}
/**
* A Sorted Set with a max size. When a new entry is added, the max is removed. You can mark the next
* "min" by calling the mark method. Values > min are accepted. Values > min and that are over size are
* discarded
*/
public static final class SortedColumnList {
private static final ScanColumnComparator COMP = new ScanColumnComparator();
private final int maxSize;
private final List<ScanColumn> list;
private ScanColumn min;
public SortedColumnList( int maxSize ) {
//we need to allocate the extra space if required
this.list = new ArrayList<ScanColumn>(maxSize);
this.maxSize = maxSize;
}
/**
* Add the column to this list
* @param col
*/
public void add( ScanColumn col ) {
//less than our min, don't add
if(COMP.compare( min, col ) >= 0){
return;
}
int index = Collections.binarySearch( this.list, col, COMP );
//already present
if(index > -1){
return ;
}
index = (index * -1) - 1;
//outside the renage
if(index >= maxSize){
return;
}
this.list.add( index, col );
final int size = this.list.size();
if(size > maxSize){
this.list.subList( maxSize, size ).clear();
}
}
/**
* Add all the elements to this list
* @param cols
*/
public void addAll( final Collection<? extends ScanColumn> cols ) {
for(ScanColumn col:cols){
add(col);
}
}
/**
* Returns a new list. If no elements are present, returns null
* @return
*/
public Set<ScanColumn> asSet(){
if(this.list.size() == 0){
return null;
}
return new LinkedHashSet<ScanColumn>(this.list);
}
/**
* Mark our last element in the tree as the max
*/
public void mark() {
final int size = this.list.size();
//we don't have any elements in the list, and we've never set a min
if(size == 0){
return;
}
min = this.list.get( size -1);
}
/**
* Clear the list
*/
public void clear(){
this.list.clear();
}
}
/**
* Simple comparator for comparing scan columns. Orders them by time uuid
*/
private static class ScanColumnComparator implements Comparator<ScanColumn> {
@Override
public int compare( final ScanColumn o1, final ScanColumn o2 ) {
if ( o1 == null ) {
if ( o2 == null ) {
return 0;
}
return -1;
}
else if ( o2 == null ) {
return 1;
}
return UUIDUtils.compare( o1.getUUID(), o2.getUUID() );
}
}
}