blob: ea3fa2f3ddbdb25dd3e876ecacfa5d3a0b1dcf84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.thrift;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.utils.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.partitions.*;
/**
* Given an iterator on a partition of a compact table, this return an iterator that merges the
* static row columns with the other results.
*
* Compact tables stores thrift column_metadata as static columns (see CompactTables for
* details). When reading for thrift however, we want to merge those static values with other
* results because:
* 1) on thrift, all "columns" are sorted together, whether or not they are declared
* column_metadata.
* 2) it's possible that a table add a value for a "dynamic" column, and later that column
* is statically defined. Merging "static" and "dynamic" columns make sure we don't miss
* a value prior to the column declaration.
*
* For example, if a thrift table declare 2 columns "c1" and "c5" and the results from a query
* is:
* Partition: static: { c1: 3, c5: 4 }
* "a" : { value : 2 }
* "c3": { value : 8 }
* "c7": { value : 1 }
* then this class transform it into:
* Partition: "a" : { value : 2 }
* "c1": { value : 3 }
* "c3": { value : 8 }
* "c5": { value : 4 }
* "c7": { value : 1 }
*/
public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator>
{
private final int nowInSec;
private ThriftResultsMerger(int nowInSec)
{
this.nowInSec = nowInSec;
}
public static UnfilteredPartitionIterator maybeWrap(UnfilteredPartitionIterator iterator, CFMetaData metadata, int nowInSec)
{
if (!metadata.isStaticCompactTable() && !metadata.isSuper())
return iterator;
return Transformation.apply(iterator, new ThriftResultsMerger(nowInSec));
}
public static UnfilteredRowIterator maybeWrap(UnfilteredRowIterator iterator, int nowInSec)
{
if (!iterator.metadata().isStaticCompactTable() && !iterator.metadata().isSuper())
return iterator;
return iterator.metadata().isSuper()
? Transformation.apply(iterator, new SuperColumnsPartitionMerger(iterator, nowInSec))
: new PartitionMerger(iterator, nowInSec);
}
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
return iter.metadata().isSuper()
? Transformation.apply(iter, new SuperColumnsPartitionMerger(iter, nowInSec))
: new PartitionMerger(iter, nowInSec);
}
private static class PartitionMerger extends WrappingUnfilteredRowIterator
{
private final int nowInSec;
// We initialize lazily to avoid having this iterator fetch the wrapped iterator before it's actually asked for it.
private boolean isInit;
private Iterator<Cell> staticCells;
private final Row.Builder builder;
private Row nextToMerge;
private Unfiltered nextFromWrapped;
private PartitionMerger(UnfilteredRowIterator results, int nowInSec)
{
super(results);
assert results.metadata().isStaticCompactTable();
this.nowInSec = nowInSec;
this.builder = BTreeRow.sortedBuilder();
}
private void init()
{
assert !isInit;
Row staticRow = super.staticRow();
assert !staticRow.hasComplex();
staticCells = staticRow.cells().iterator();
updateNextToMerge();
isInit = true;
}
@Override
public Row staticRow()
{
return Rows.EMPTY_STATIC_ROW;
}
@Override
public boolean hasNext()
{
if (!isInit)
init();
return nextFromWrapped != null || nextToMerge != null || super.hasNext();
}
@Override
public Unfiltered next()
{
if (!isInit)
init();
if (nextFromWrapped == null && super.hasNext())
nextFromWrapped = super.next();
if (nextFromWrapped == null)
{
if (nextToMerge == null)
throw new NoSuchElementException();
return consumeNextToMerge();
}
if (nextToMerge == null)
return consumeNextWrapped();
int cmp = metadata().comparator.compare(nextToMerge, nextFromWrapped);
if (cmp < 0)
return consumeNextToMerge();
if (cmp > 0)
return consumeNextWrapped();
// Same row, so merge them
assert nextFromWrapped instanceof Row;
return Rows.merge((Row)consumeNextWrapped(), consumeNextToMerge(), nowInSec);
}
private Unfiltered consumeNextWrapped()
{
Unfiltered toReturn = nextFromWrapped;
nextFromWrapped = null;
return toReturn;
}
private Row consumeNextToMerge()
{
Row toReturn = nextToMerge;
updateNextToMerge();
return toReturn;
}
private void updateNextToMerge()
{
if (!staticCells.hasNext())
{
// Nothing more to merge.
nextToMerge = null;
return;
}
Cell cell = staticCells.next();
// Given a static cell, the equivalent row uses the column name as clustering and the value as unique cell value.
builder.newRow(new Clustering(cell.column().name.bytes));
builder.addCell(new BufferCell(metadata().compactValueColumn(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), cell.path()));
nextToMerge = builder.build();
}
}
private static class SuperColumnsPartitionMerger extends Transformation
{
private final int nowInSec;
private final Row.Builder builder;
private final ColumnDefinition superColumnMapColumn;
private final AbstractType<?> columnComparator;
private SuperColumnsPartitionMerger(UnfilteredRowIterator applyTo, int nowInSec)
{
assert applyTo.metadata().isSuper();
this.nowInSec = nowInSec;
this.superColumnMapColumn = applyTo.metadata().compactValueColumn();
assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
this.builder = BTreeRow.sortedBuilder();
this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
}
@Override
public Row applyToRow(Row row)
{
PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row));
if (!staticCells.hasNext())
return row;
builder.newRow(row.clustering());
ComplexColumnData complexData = row.getComplexColumnData(superColumnMapColumn);
PeekingIterator<Cell> dynamicCells;
if (complexData == null)
{
dynamicCells = Iterators.peekingIterator(Collections.<Cell>emptyIterator());
}
else
{
dynamicCells = Iterators.peekingIterator(complexData.iterator());
builder.addComplexDeletion(superColumnMapColumn, complexData.complexDeletion());
}
while (staticCells.hasNext() && dynamicCells.hasNext())
{
Cell staticCell = staticCells.peek();
Cell dynamicCell = dynamicCells.peek();
int cmp = columnComparator.compare(staticCell.column().name.bytes, dynamicCell.path().get(0));
if (cmp < 0)
builder.addCell(makeDynamicCell(staticCells.next()));
else if (cmp > 0)
builder.addCell(dynamicCells.next());
else
builder.addCell(Cells.reconcile(makeDynamicCell(staticCells.next()), dynamicCells.next(), nowInSec));
}
while (staticCells.hasNext())
builder.addCell(makeDynamicCell(staticCells.next()));
while (dynamicCells.hasNext())
builder.addCell(dynamicCells.next());
return builder.build();
}
private Cell makeDynamicCell(Cell staticCell)
{
return new BufferCell(superColumnMapColumn, staticCell.timestamp(), staticCell.ttl(), staticCell.localDeletionTime(), staticCell.value(), CellPath.create(staticCell.column().name.bytes));
}
private Iterator<Cell> simpleCellsIterator(Row row)
{
final Iterator<Cell> cells = row.cells().iterator();
return new AbstractIterator<Cell>()
{
protected Cell computeNext()
{
if (cells.hasNext())
{
Cell cell = cells.next();
if (cell.column().isSimple())
return cell;
}
return endOfData();
}
};
}
}
}