| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.reader; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import com.google.common.base.Preconditions; |
| |
| import org.apache.cassandra.db.Clustering; |
| import org.apache.cassandra.db.ClusteringPrefix; |
| import org.apache.cassandra.db.DeletionTime; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.ListType; |
| import org.apache.cassandra.db.marshal.MapType; |
| import org.apache.cassandra.db.marshal.SetType; |
| import org.apache.cassandra.db.marshal.UserType; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.ColumnData; |
| import org.apache.cassandra.db.rows.ComplexColumnData; |
| import org.apache.cassandra.db.rows.RangeTombstoneMarker; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.Unfiltered; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.dht.Murmur3Partitioner; |
| import org.apache.cassandra.dht.RandomPartitioner; |
| import org.apache.cassandra.schema.ColumnMetadata; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.spark.data.partitioner.Partitioner; |
| import org.apache.cassandra.spark.reader.common.SSTableStreamException; |
| import org.apache.cassandra.spark.utils.TimeProvider; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.jetbrains.annotations.NotNull; |
| |
| public abstract class AbstractStreamScanner implements StreamScanner, Closeable |
| { |
| // All partitions in the SSTable |
| private UnfilteredPartitionIterator allPartitions; |
| // A single partition, containing rows and/or range tombstones |
| private UnfilteredRowIterator partition; |
| // The static row of the current partition, which may be empty |
| @SuppressWarnings("FieldCanBeLocal") |
| private Row staticRow; |
| // Current atom (row or range tombstone) being processed |
| private Unfiltered unfiltered; |
| // If processing a row this holds the state of iterating that row |
| private Iterator<ColumnData> columns; |
| // State of processing data for a single column in a row (which may be multi-celled in the case of complex columns) |
| protected ColumnDataState columnData; |
| |
| @NotNull |
| final TableMetadata metadata; |
| |
| @NotNull |
| protected final TimeProvider timeProvider; |
| |
| protected final Rid rid = new Rid(); |
| |
| AbstractStreamScanner(@NotNull TableMetadata metadata, |
| @NotNull Partitioner partitionerType, |
| @NotNull TimeProvider timeProvider) |
| { |
| this.metadata = metadata.unbuild() |
| .partitioner(partitionerType == Partitioner.Murmur3Partitioner |
| ? new Murmur3Partitioner() |
| : new RandomPartitioner()) |
| .build(); |
| this.timeProvider = timeProvider; |
| |
| // Counter tables are not supported |
| if (metadata.isCounter()) |
| { |
| throw new IllegalArgumentException( |
| String.format("Streaming reads of SSTables from counter tables are not supported, " |
| + "rejecting stream of data from %s.%s", |
| metadata.keyspace, metadata.name)); |
| } |
| } |
| |
| @Override |
| public Rid rid() |
| { |
| return rid; |
| } |
| |
| /* Abstract methods */ |
| |
| abstract UnfilteredPartitionIterator initializePartitions(); |
| |
| @Override |
| public abstract void close() throws IOException; |
| |
| protected abstract void handleRowTombstone(Row row); |
| |
| protected abstract void handlePartitionTombstone(UnfilteredRowIterator partition); |
| |
| protected abstract void handleCellTombstone(); |
| |
| protected abstract void handleCellTombstoneInComplex(Cell<?> cell); |
| |
| protected abstract void handleRangeTombstone(RangeTombstoneMarker marker); |
| |
| @Override |
| public void advanceToNextColumn() |
| { |
| columnData.consume(); |
| } |
| |
| // CHECKSTYLE IGNORE: Long method |
| @Override |
| public boolean hasNext() throws IOException |
| { |
| if (allPartitions == null) |
| { |
| allPartitions = initializePartitions(); |
| } |
| |
| while (true) |
| { |
| if (partition == null) |
| { |
| try |
| { |
| // We've exhausted the partition iterator |
| if (allPartitions.hasNext()) |
| { |
| // Advance to next partition |
| partition = allPartitions.next(); |
| |
| if (partition.partitionLevelDeletion().isLive()) |
| { |
| // Reset rid with new partition key |
| rid.setPartitionKeyCopy(partition.partitionKey().getKey(), |
| ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken())); |
| } |
| else |
| { |
| // There's a partition-level delete |
| handlePartitionTombstone(partition); |
| return true; |
| } |
| } |
| else |
| { |
| return false; |
| } |
| } |
| catch (SSTableStreamException exception) |
| { |
| throw exception.getIOException(); |
| } |
| |
| // If the partition has a non-empty static row, grab its columns, |
| // so we process those before moving onto its atoms (the Unfiltered instances) |
| staticRow = partition.staticRow(); |
| if (!staticRow.isEmpty()) |
| { |
| columns = staticRow.iterator(); |
| prepareColumnData(); |
| return true; |
| } |
| } |
| |
| // We may be in the midst of processing some multi-cell column data, |
| // if so, we'll resume that where we left off |
| if (columnData != null && columnData.hasData()) |
| { |
| return true; |
| } |
| |
| // Continue to process columns of the last read row, which may be static |
| if (columns != null && columns.hasNext()) |
| { |
| prepareColumnData(); |
| return true; |
| } |
| |
| // Current row was exhausted (or none were present), so move to the next atom |
| columns = null; |
| try |
| { |
| // Advance to next unfiltered |
| rid.setIsUpdate(false); // Reset isUpdate flag |
| if (partition.hasNext()) |
| { |
| unfiltered = partition.next(); |
| } |
| else |
| { |
| // Current partition is exhausted |
| partition = null; |
| unfiltered = null; |
| |
| // Produce a spark row if there are range tombstone markers |
| if (rid.hasRangeTombstoneMarkers()) |
| { |
| // The current partition is exhusted and ready to produce a spark row for the range tombstones |
| rid.setShouldConsumeRangeTombstoneMarkers(true); |
| return true; |
| } |
| } |
| } |
| catch (SSTableStreamException exception) |
| { |
| throw exception.getIOException(); |
| } |
| |
| if (unfiltered != null) |
| { |
| if (unfiltered.isRow()) |
| { |
| Row row = (Row) unfiltered; |
| |
| // There is a CQL row level delete |
| if (!row.deletion().isLive()) |
| { |
| handleRowTombstone(row); |
| return true; |
| } |
| |
| // For non-compact tables, set up a ClusteringColumnDataState to emit a Rid that emulates a |
| // pre-3.0 CQL row marker. This is necessary for backwards compatibility with 2.1 & 2.0 output, |
| // and also for tables with only primary key columns defined. |
| // An empty PKLI is the 3.0 equivalent of having no row marker (e.g. row modifications via |
| // UPDATE not INSERT) so we don't emit a fake row marker in that case. |
| boolean emptyLiveness = row.primaryKeyLivenessInfo().isEmpty(); |
| rid.setIsUpdate(emptyLiveness); |
| if (!emptyLiveness) |
| { |
| if (TableMetadata.Flag.isCQLTable(metadata.flags)) |
| { |
| columnData = new ClusteringColumnDataState(row.clustering()); |
| } |
| columns = row.iterator(); |
| return true; |
| } |
| |
| // The row's actual columns may be empty, in which case we'll simply skip over them during the next |
| // iteration and move to the next unfiltered. So then only the row marker and/or row deletion (if |
| // either are present) will get emitted |
| columns = row.iterator(); |
| } |
| else if (unfiltered.isRangeTombstoneMarker()) |
| { |
| // Range tombstone can get complicated: |
| // - In the most simple case, that is a DELETE statement with a single clustering key range, we |
| // expect the UnfilteredRowIterator with 2 markers, i.e. open and close range tombstone markers |
| // - In a slightly more complicated case, it contains IN operator (on prior clustering keys), we |
| // expect the UnfilteredRowIterator with 2 * N markers, where N is the number of values specified |
| // for IN |
| // - In the most complicated case, client could comopse a complex partition update with a BATCH |
| // statement; it could have those further scenarios: (only discussing the statements applying to |
| // the same partition key) |
| // - Multiple disjoint ranges => we should expect 2 * N markers, where N is the number of ranges |
| // - Overlapping ranges with the same timestamp => we should expect 2 markers, considering the |
| // overlapping ranges are merged into a single one. (as the boundary is omitted) |
| // - Overlapping ranges with different timestamp ==> we should expect 3 markers, i.e. open bound, |
| // boundary and end bound |
| // - Ranges mixed with INSERT! => The order of the unfiltered (i.e. Row/RangeTombstoneMarker) is |
| // determined by comparing the row clustering with the bounds of the ranges. |
| // See o.a.c.d.r.RowAndDeletionMergeIterator |
| RangeTombstoneMarker rangeTombstoneMarker = (RangeTombstoneMarker) unfiltered; |
| // We encode the ranges within the same spark row. Therefore, it needs to keep the markers when |
| // iterating through the partition, and _only_ generate a spark row with range tombstone info when |
| // exhausting the partition / UnfilteredRowIterator. |
| handleRangeTombstone(rangeTombstoneMarker); |
| // Continue to consume the next unfiltered row/marker |
| } |
| else |
| { |
| // As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker, |
| // see o.a.c.db.rows.Unfiltered.Kind; having the else branch only for completeness |
| throw new IllegalStateException("Encountered unknown Unfiltered kind"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Prepare the columnData to be consumed the next |
| */ |
| private void prepareColumnData() |
| { |
| ColumnData data = columns.next(); |
| if (data.column().isComplex()) |
| { |
| columnData = new ComplexDataState(data.column().isStatic() ? Clustering.STATIC_CLUSTERING |
| : unfiltered.clustering(), |
| (ComplexColumnData) data); |
| } |
| else |
| { |
| columnData = new SimpleColumnDataState(data.column().isStatic() ? Clustering.STATIC_CLUSTERING |
| : unfiltered.clustering(), |
| data); |
| } |
| } |
| |
| private interface ColumnDataState |
| { |
| /** |
| * Indicate whether the column has data |
| * |
| * @return true if it has data to be consumed |
| */ |
| boolean hasData(); |
| |
| /** |
| * Consume the data in the column |
| */ |
| void consume(); |
| } |
| |
| /** |
| * Maps clustering values to column data, to emulate CQL row markers which were removed in Cassandra 3.0, |
| * but which we must still emit Rid for in order to preserve backwards compatibility |
| * and to handle tables containing only primary key columns |
| */ |
| protected final class ClusteringColumnDataState implements ColumnDataState |
| { |
| private boolean consumed = false; |
| private final ClusteringPrefix clustering; |
| |
| ClusteringColumnDataState(ClusteringPrefix clustering) |
| { |
| this.clustering = clustering; |
| } |
| |
| @Override |
| public boolean hasData() |
| { |
| return !consumed; |
| } |
| |
| @Override |
| public void consume() |
| { |
| if (!consumed) |
| { |
| rid.setColumnNameCopy(ReaderUtils.encodeCellName(metadata, |
| clustering, |
| ByteBufferUtil.EMPTY_BYTE_BUFFER, |
| null)); |
| rid.setValueCopy(ByteBufferUtil.EMPTY_BYTE_BUFFER); |
| consumed = true; |
| } |
| else |
| { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |
| |
| /** |
| * Holds current processing state of any simple column data |
| */ |
| private final class SimpleColumnDataState implements ColumnDataState |
| { |
| private ClusteringPrefix clustering; |
| private final Cell cell; |
| |
| private SimpleColumnDataState(ClusteringPrefix clustering, ColumnData data) |
| { |
| Preconditions.checkArgument(data.column().isSimple(), "The type of the ColumnData should be simple"); |
| this.clustering = clustering; |
| this.cell = (Cell) data; |
| } |
| |
| @Override |
| public boolean hasData() |
| { |
| return (clustering != null); |
| } |
| |
| @Override |
| public void consume() |
| { |
| boolean isStatic = cell.column().isStatic(); |
| rid.setColumnNameCopy(ReaderUtils.encodeCellName(metadata, |
| isStatic ? Clustering.STATIC_CLUSTERING : clustering, |
| cell.column().name.bytes, |
| null)); |
| if (cell.isTombstone()) |
| { |
| handleCellTombstone(); |
| } |
| else |
| { |
| rid.setValueCopy(cell.buffer()); |
| } |
| rid.setTimestamp(cell.timestamp()); |
| // Null out clustering so hasData will return false |
| clustering = null; |
| } |
| } |
| |
| /** |
| * Holds current processing state of any complex column data |
| */ |
| private final class ComplexDataState implements ColumnDataState |
| { |
| private final ColumnMetadata column; |
| private ClusteringPrefix clustering; |
| private final Iterator<Cell<?>> cells; |
| private final int cellCount; |
| private final DeletionTime deletionTime; |
| |
| private ComplexDataState(ClusteringPrefix clustering, ComplexColumnData data) |
| { |
| this.clustering = clustering; |
| this.column = data.column(); |
| this.cells = data.iterator(); |
| this.cellCount = data.cellsCount(); |
| this.deletionTime = data.complexDeletion(); |
| } |
| |
| @Override |
| public boolean hasData() |
| { |
| return clustering != null && cells.hasNext(); |
| } |
| |
| @Override |
| public void consume() |
| { |
| rid.setColumnNameCopy(ReaderUtils.encodeCellName(metadata, |
| clustering, |
| column.name.bytes, |
| ByteBufferUtil.EMPTY_BYTE_BUFFER)); |
| // The complex data is live, but there could be element deletion inside; check for it later in the block |
| if (deletionTime.isLive()) |
| { |
| ComplexTypeBuffer buffer = ComplexTypeBuffer.newBuffer(column.type, cellCount); |
| long maxTimestamp = Long.MIN_VALUE; |
| while (cells.hasNext()) |
| { |
| Cell<?> cell = cells.next(); |
| // Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon, |
| // it is handled as tombstone |
| if (cell.isLive(timeProvider.nowInTruncatedSeconds())) |
| { |
| buffer.addCell(cell); |
| } |
| else |
| { |
| // Only adds the tombstoned cell when running as a CDC job |
| handleCellTombstoneInComplex(cell); |
| } |
| // In the case the cell is deleted, the deletion time is also the cell's timestamp |
| maxTimestamp = Math.max(maxTimestamp, cell.timestamp()); |
| } |
| // In the case of CDC, consuming the mutation contains cell tombstones |
| // results into an empty buffer built |
| if (rid.hasCellTombstoneInComplex()) |
| { |
| rid.setValueCopy(null); |
| } |
| else |
| { |
| rid.setValueCopy(buffer.build()); |
| } |
| rid.setTimestamp(maxTimestamp); |
| } |
| else |
| { |
| // The entire collection/UDT is deleted |
| handleCellTombstone(); |
| rid.setTimestamp(deletionTime.markedForDeleteAt()); |
| } |
| |
| // Null out clustering to indicate no data |
| clustering = null; |
| } |
| } |
| |
| private abstract static class ComplexTypeBuffer |
| { |
| private final List<ByteBuffer> buffers; |
| private final int cellCount; |
| private int length = 0; |
| |
| ComplexTypeBuffer(int cellCount, int bufferSize) |
| { |
| this.cellCount = cellCount; |
| this.buffers = new ArrayList<>(bufferSize); |
| } |
| |
| static ComplexTypeBuffer newBuffer(AbstractType<?> type, int cellCount) |
| { |
| ComplexTypeBuffer buffer; |
| if (type instanceof SetType) |
| { |
| buffer = new SetBuffer(cellCount); |
| } |
| else if (type instanceof ListType) |
| { |
| buffer = new ListBuffer(cellCount); |
| } |
| else if (type instanceof MapType) |
| { |
| buffer = new MapBuffer(cellCount); |
| } |
| else if (type instanceof UserType) |
| { |
| buffer = new UdtBuffer(cellCount); |
| } |
| else |
| { |
| throw new IllegalStateException("Unexpected type deserializing CQL Collection: " + type); |
| } |
| return buffer; |
| } |
| |
| void addCell(Cell cell) |
| { |
| add(cell.buffer()); // Copy over value |
| } |
| |
| void add(ByteBuffer buffer) |
| { |
| buffers.add(buffer); |
| length += buffer.remaining(); |
| } |
| |
| ByteBuffer build() |
| { |
| ByteBuffer result = ByteBuffer.allocate(4 + (buffers.size() * 4) + length); |
| result.putInt(cellCount); |
| for (ByteBuffer buffer : buffers) |
| { |
| result.putInt(buffer.remaining()); |
| result.put(buffer); |
| } |
| // Cast to ByteBuffer required when compiling with Java 8 |
| return (ByteBuffer) result.flip(); |
| } |
| } |
| |
| private static class SetBuffer extends ComplexTypeBuffer |
| { |
| SetBuffer(int cellCount) |
| { |
| super(cellCount, cellCount); |
| } |
| |
| @Override |
| void addCell(Cell cell) |
| { |
| add(cell.path().get(0)); // Set - copy over key |
| } |
| } |
| |
| private static class ListBuffer extends ComplexTypeBuffer |
| { |
| ListBuffer(int cellCount) |
| { |
| super(cellCount, cellCount); |
| } |
| } |
| |
| private static class MapBuffer extends ComplexTypeBuffer |
| { |
| |
| MapBuffer(int cellCount) |
| { |
| super(cellCount, cellCount * 2); |
| } |
| |
| @Override |
| void addCell(Cell cell) |
| { |
| add(cell.path().get(0)); // Map - copy over key and value |
| super.addCell(cell); |
| } |
| } |
| |
| private static class UdtBuffer extends ComplexTypeBuffer |
| { |
| UdtBuffer(int cellCount) |
| { |
| super(cellCount, cellCount); |
| } |
| } |
| } |