| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.thrift; |
| |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| |
| import org.apache.cassandra.db.transform.Transformation; |
| import org.apache.cassandra.utils.AbstractIterator; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.PeekingIterator; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.MapType; |
| import org.apache.cassandra.db.partitions.*; |
| |
| /** |
| * Given an iterator on a partition of a compact table, this return an iterator that merges the |
| * static row columns with the other results. |
| * |
| * Compact tables stores thrift column_metadata as static columns (see CompactTables for |
| * details). When reading for thrift however, we want to merge those static values with other |
| * results because: |
| * 1) on thrift, all "columns" are sorted together, whether or not they are declared |
| * column_metadata. |
| * 2) it's possible that a table add a value for a "dynamic" column, and later that column |
| * is statically defined. Merging "static" and "dynamic" columns make sure we don't miss |
| * a value prior to the column declaration. |
| * |
| * For example, if a thrift table declare 2 columns "c1" and "c5" and the results from a query |
| * is: |
| * Partition: static: { c1: 3, c5: 4 } |
| * "a" : { value : 2 } |
| * "c3": { value : 8 } |
| * "c7": { value : 1 } |
| * then this class transform it into: |
| * Partition: "a" : { value : 2 } |
| * "c1": { value : 3 } |
| * "c3": { value : 8 } |
| * "c5": { value : 4 } |
| * "c7": { value : 1 } |
| */ |
| public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator> |
| { |
| private final int nowInSec; |
| |
| private ThriftResultsMerger(int nowInSec) |
| { |
| this.nowInSec = nowInSec; |
| } |
| |
| public static UnfilteredPartitionIterator maybeWrap(UnfilteredPartitionIterator iterator, CFMetaData metadata, int nowInSec) |
| { |
| if (!metadata.isStaticCompactTable() && !metadata.isSuper()) |
| return iterator; |
| |
| return Transformation.apply(iterator, new ThriftResultsMerger(nowInSec)); |
| } |
| |
| public static UnfilteredRowIterator maybeWrap(UnfilteredRowIterator iterator, int nowInSec) |
| { |
| if (!iterator.metadata().isStaticCompactTable() && !iterator.metadata().isSuper()) |
| return iterator; |
| |
| return iterator.metadata().isSuper() |
| ? Transformation.apply(iterator, new SuperColumnsPartitionMerger(iterator, nowInSec)) |
| : new PartitionMerger(iterator, nowInSec); |
| } |
| |
| @Override |
| public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) |
| { |
| return iter.metadata().isSuper() |
| ? Transformation.apply(iter, new SuperColumnsPartitionMerger(iter, nowInSec)) |
| : new PartitionMerger(iter, nowInSec); |
| } |
| |
| private static class PartitionMerger extends WrappingUnfilteredRowIterator |
| { |
| private final int nowInSec; |
| |
| // We initialize lazily to avoid having this iterator fetch the wrapped iterator before it's actually asked for it. |
| private boolean isInit; |
| |
| private Iterator<Cell> staticCells; |
| |
| private final Row.Builder builder; |
| private Row nextToMerge; |
| private Unfiltered nextFromWrapped; |
| |
| private PartitionMerger(UnfilteredRowIterator results, int nowInSec) |
| { |
| super(results); |
| assert results.metadata().isStaticCompactTable(); |
| this.nowInSec = nowInSec; |
| this.builder = BTreeRow.sortedBuilder(); |
| } |
| |
| private void init() |
| { |
| assert !isInit; |
| Row staticRow = super.staticRow(); |
| assert !staticRow.hasComplex(); |
| |
| staticCells = staticRow.cells().iterator(); |
| updateNextToMerge(); |
| isInit = true; |
| } |
| |
| @Override |
| public Row staticRow() |
| { |
| return Rows.EMPTY_STATIC_ROW; |
| } |
| |
| @Override |
| public boolean hasNext() |
| { |
| if (!isInit) |
| init(); |
| |
| return nextFromWrapped != null || nextToMerge != null || super.hasNext(); |
| } |
| |
| @Override |
| public Unfiltered next() |
| { |
| if (!isInit) |
| init(); |
| |
| if (nextFromWrapped == null && super.hasNext()) |
| nextFromWrapped = super.next(); |
| |
| if (nextFromWrapped == null) |
| { |
| if (nextToMerge == null) |
| throw new NoSuchElementException(); |
| |
| return consumeNextToMerge(); |
| } |
| |
| if (nextToMerge == null) |
| return consumeNextWrapped(); |
| |
| int cmp = metadata().comparator.compare(nextToMerge, nextFromWrapped); |
| if (cmp < 0) |
| return consumeNextToMerge(); |
| if (cmp > 0) |
| return consumeNextWrapped(); |
| |
| // Same row, so merge them |
| assert nextFromWrapped instanceof Row; |
| return Rows.merge((Row)consumeNextWrapped(), consumeNextToMerge(), nowInSec); |
| } |
| |
| private Unfiltered consumeNextWrapped() |
| { |
| Unfiltered toReturn = nextFromWrapped; |
| nextFromWrapped = null; |
| return toReturn; |
| } |
| |
| private Row consumeNextToMerge() |
| { |
| Row toReturn = nextToMerge; |
| updateNextToMerge(); |
| return toReturn; |
| } |
| |
| private void updateNextToMerge() |
| { |
| if (!staticCells.hasNext()) |
| { |
| // Nothing more to merge. |
| nextToMerge = null; |
| return; |
| } |
| |
| Cell cell = staticCells.next(); |
| |
| // Given a static cell, the equivalent row uses the column name as clustering and the value as unique cell value. |
| builder.newRow(Clustering.make(cell.column().name.bytes)); |
| builder.addCell(new BufferCell(metadata().compactValueColumn(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), cell.path())); |
| nextToMerge = builder.build(); |
| } |
| } |
| |
| private static class SuperColumnsPartitionMerger extends Transformation |
| { |
| private final int nowInSec; |
| private final Row.Builder builder; |
| private final ColumnDefinition superColumnMapColumn; |
| private final AbstractType<?> columnComparator; |
| |
| private SuperColumnsPartitionMerger(UnfilteredRowIterator applyTo, int nowInSec) |
| { |
| assert applyTo.metadata().isSuper(); |
| this.nowInSec = nowInSec; |
| |
| this.superColumnMapColumn = applyTo.metadata().compactValueColumn(); |
| assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType; |
| |
| this.builder = BTreeRow.sortedBuilder(); |
| this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator(); |
| } |
| |
| @Override |
| public Row applyToRow(Row row) |
| { |
| PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row)); |
| if (!staticCells.hasNext()) |
| return row; |
| |
| builder.newRow(row.clustering()); |
| |
| ComplexColumnData complexData = row.getComplexColumnData(superColumnMapColumn); |
| |
| PeekingIterator<Cell> dynamicCells; |
| if (complexData == null) |
| { |
| dynamicCells = Iterators.peekingIterator(Collections.<Cell>emptyIterator()); |
| } |
| else |
| { |
| dynamicCells = Iterators.peekingIterator(complexData.iterator()); |
| builder.addComplexDeletion(superColumnMapColumn, complexData.complexDeletion()); |
| } |
| |
| while (staticCells.hasNext() && dynamicCells.hasNext()) |
| { |
| Cell staticCell = staticCells.peek(); |
| Cell dynamicCell = dynamicCells.peek(); |
| int cmp = columnComparator.compare(staticCell.column().name.bytes, dynamicCell.path().get(0)); |
| if (cmp < 0) |
| builder.addCell(makeDynamicCell(staticCells.next())); |
| else if (cmp > 0) |
| builder.addCell(dynamicCells.next()); |
| else |
| builder.addCell(Cells.reconcile(makeDynamicCell(staticCells.next()), dynamicCells.next(), nowInSec)); |
| } |
| |
| while (staticCells.hasNext()) |
| builder.addCell(makeDynamicCell(staticCells.next())); |
| while (dynamicCells.hasNext()) |
| builder.addCell(dynamicCells.next()); |
| |
| return builder.build(); |
| } |
| |
| private Cell makeDynamicCell(Cell staticCell) |
| { |
| return new BufferCell(superColumnMapColumn, staticCell.timestamp(), staticCell.ttl(), staticCell.localDeletionTime(), staticCell.value(), CellPath.create(staticCell.column().name.bytes)); |
| } |
| |
| private Iterator<Cell> simpleCellsIterator(Row row) |
| { |
| final Iterator<Cell> cells = row.cells().iterator(); |
| return new AbstractIterator<Cell>() |
| { |
| protected Cell computeNext() |
| { |
| if (cells.hasNext()) |
| { |
| Cell cell = cells.next(); |
| if (cell.column().isSimple()) |
| return cell; |
| } |
| return endOfData(); |
| } |
| }; |
| } |
| } |
| } |
| |