| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.sparksql; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.spark.data.CqlField; |
| import org.apache.cassandra.spark.data.CqlTable; |
| import org.apache.cassandra.spark.data.DataLayer; |
| import org.apache.cassandra.spark.reader.Rid; |
| import org.apache.cassandra.spark.reader.StreamScanner; |
| import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter; |
| import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; |
| import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter; |
| import org.apache.cassandra.spark.stats.Stats; |
| import org.apache.cassandra.spark.utils.ArrayUtils; |
| import org.apache.cassandra.spark.utils.ByteBufferUtils; |
| import org.apache.cassandra.spark.utils.ColumnTypes; |
| import org.apache.spark.sql.types.StructField; |
| import org.apache.spark.sql.types.StructType; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Iterate through CompactionIterator, deserializing ByteBuffers and normalizing into Object[] array in column order |
| */ |
| public class SparkCellIterator implements Iterator<Cell>, AutoCloseable |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(SparkCellIterator.class); |
| |
| protected final DataLayer dataLayer; |
| private final Stats stats; |
| private final CqlTable cqlTable; |
| private final Object[] values; |
| private final int numPartitionKeys; |
| private final boolean noValueColumns; |
| @Nullable |
| protected final PruneColumnFilter columnFilter; |
| private final long startTimeNanos; |
| @NotNull |
| private final StreamScanner scanner; |
| @NotNull |
| private final Rid rid; |
| |
| // Mutable Iterator State |
| private boolean skipPartition = false; |
| private boolean newRow = false; |
| private boolean closed = false; |
| private Cell next = null; |
| private long previousTimeNanos; |
| |
| protected final int partitionId; |
| |
| public SparkCellIterator(int partitionId, |
| @NotNull DataLayer dataLayer, |
| @Nullable StructType requiredSchema, |
| @NotNull List<PartitionKeyFilter> partitionKeyFilters, |
| @Nullable CdcOffsetFilter cdcOffsetFilter) |
| { |
| this.partitionId = partitionId; |
| this.dataLayer = dataLayer; |
| stats = dataLayer.stats(); |
| cqlTable = dataLayer.cqlTable(); |
| numPartitionKeys = cqlTable.numPartitionKeys(); |
| columnFilter = buildColumnFilter(requiredSchema, cqlTable); |
| if (columnFilter != null) |
| { |
| LOGGER.info("Adding prune column filter columns='{}'", String.join(",", columnFilter.requiredColumns())); |
| |
| // If we are reading only partition/clustering keys or static columns, no value columns |
| Set<String> valueColumns = cqlTable.valueColumns().stream().map(CqlField::name).collect(Collectors.toSet()); |
| noValueColumns = columnFilter.requiredColumns().stream().noneMatch(valueColumns::contains); |
| } |
| else |
| { |
| noValueColumns = cqlTable.numValueColumns() == 0; |
| } |
| |
| // The value array copies across all the partition/clustering/static columns |
| // and the single column value for this cell to the SparkRowIterator |
| values = new Object[cqlTable.numNonValueColumns() + (noValueColumns ? 0 : 1)]; |
| |
| // Open compaction scanner |
| startTimeNanos = System.nanoTime(); |
| previousTimeNanos = startTimeNanos; |
| scanner = openScanner(partitionId, partitionKeyFilters, cdcOffsetFilter); |
| long openTimeNanos = System.nanoTime() - startTimeNanos; |
| LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos); |
| stats.openedCompactionScanner(openTimeNanos); |
| rid = scanner.rid(); |
| stats.openedSparkCellIterator(); |
| } |
| |
| protected StreamScanner openScanner(int partitionId, |
| @NotNull List<PartitionKeyFilter> partitionKeyFilters, |
| @Nullable CdcOffsetFilter cdcOffsetFilter) |
| { |
| return dataLayer.openCompactionScanner(partitionId, partitionKeyFilters, columnFilter); |
| } |
| |
| @Nullable |
| static PruneColumnFilter buildColumnFilter(@Nullable StructType requiredSchema, @NotNull CqlTable cqlTable) |
| { |
| return requiredSchema != null |
| ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields()) |
| .map(StructField::name) |
| .filter(cqlTable::has) |
| .collect(Collectors.toSet())) |
| : null; |
| } |
| |
| public boolean noValueColumns() |
| { |
| return noValueColumns; |
| } |
| |
| @Override |
| public boolean hasNext() |
| { |
| try |
| { |
| return hasNextThrows(); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| public boolean hasNextThrows() throws IOException |
| { |
| if (next != null || closed) |
| { |
| return !closed; |
| } |
| return getNext(); |
| } |
| |
| @Override |
| public Cell next() |
| { |
| Cell result = next; |
| assert result != null; |
| next = null; |
| newRow = false; |
| long now = System.nanoTime(); |
| stats.nextCell(now - previousTimeNanos); |
| previousTimeNanos = now; |
| return result; |
| } |
| |
| private boolean getNext() throws IOException |
| { |
| while (scanner.hasNext()) |
| { |
| // If hasNext returns true, it indicates the partition keys has been loaded into the rid. |
| // Therefore, let's try to rebuild partition. |
| // Deserialize partition keys - if we have moved to a new partition - and update 'values' Object[] array. |
| maybeRebuildPartition(); |
| |
| if (rid.shouldConsumeRangeTombstoneMarkers()) |
| { |
| List<RangeTombstoneMarker> markers = rid.getRangeTombstoneMarkers(); |
| long maxTimestamp = markers.stream() |
| .map(marker -> { |
| if (marker.isBoundary()) |
| { |
| return Math.max(marker.openDeletionTime(false), marker.closeDeletionTime(false)); |
| } |
| else |
| { |
| return marker.isOpen(false) ? marker.openDeletionTime(false) : marker.closeDeletionTime(false); |
| } |
| }) |
| .max(Long::compareTo) |
| .get(); // Safe to call get as markers is non-empty |
| // Range tombstones requires only to have the partition key in the spark row, |
| // the range tombstones are encoded in the extra column |
| int partitionkeyLength = cqlTable.numPartitionKeys(); |
| next = new RangeTombstone(ArrayUtils.retain(values, 0, partitionkeyLength), maxTimestamp, markers); |
| rid.resetRangeTombstoneMarkers(); |
| return true; |
| } |
| |
| if (rid.isPartitionDeletion()) |
| { |
| // Special case that row deletion will only have the partition key parts present in the values array |
| int partitionkeyLength = cqlTable.numPartitionKeys(); |
| // Strip out other values if any rather than the partition keys |
| next = new Tombstone(ArrayUtils.retain(values, 0, partitionkeyLength), rid.getTimestamp()); |
| rid.setPartitionDeletion(false); // Reset |
| return true; |
| } |
| |
| scanner.advanceToNextColumn(); |
| |
| // Skip partition e.g. if token is outside of Spark worker token range |
| if (skipPartition) |
| { |
| continue; |
| } |
| |
| // Deserialize clustering keys - if moved to new CQL row - and update 'values' Object[] array |
| ByteBuffer columnNameBuf = Objects.requireNonNull(rid.getColumnName(), "ColumnName buffer in Rid is null, this is unexpected"); |
| maybeRebuildClusteringKeys(columnNameBuf); |
| |
| // Deserialize CQL field column name |
| ByteBuffer component = ColumnTypes.extractComponent(columnNameBuf, cqlTable.numClusteringKeys()); |
| String columnName = component != null ? ByteBufferUtils.stringThrowRuntime(component) : null; |
| if (columnName == null || columnName.length() == 0) |
| { |
| if (noValueColumns) |
| { |
| // Special case where schema consists only of partition keys, clustering keys or static columns, no value columns |
| next = new Cell(values, 0, newRow, rid.isUpdate(), rid.getTimestamp()); |
| return true; |
| } |
| |
| // SBR job (not CDC) should not expect encountering a row tombstone. |
| // It would throw IllegalStateException at the beginning of this method (at scanner.hasNext()). |
| // For a row deletion, the resulting row tombstone does not carry other fields than the primary keys. |
| if (rid.isRowDeletion()) |
| { |
| // Special case that row deletion will only have the primary key parts present in the values array |
| int primaryKeyLength = cqlTable.numPrimaryKeyColumns(); |
| // Strip out other values if any rather than the primary keys |
| next = new Tombstone(ArrayUtils.retain(values, 0, primaryKeyLength), rid.getTimestamp()); |
| // Reset row deletion flag |
| rid.setRowDeletion(false); |
| return true; |
| } |
| |
| continue; |
| } |
| |
| CqlField field = cqlTable.getField(columnName); |
| if (field == null) |
| { |
| LOGGER.warn("Ignoring unknown column columnName='{}'", columnName); |
| continue; |
| } |
| |
| // Deserialize value field or static column and update 'values' Object[] array |
| deserializeField(field); |
| |
| // Static column, so continue reading entire CQL row before returning |
| if (field.isStaticColumn()) |
| { |
| continue; |
| } |
| |
| if (rid.hasCellTombstoneInComplex()) |
| { |
| next = new TombstonesInComplex(values, field.position(), newRow, rid.getTimestamp(), columnName, rid.getCellTombstonesInComplex()); |
| rid.resetCellTombstonesInComplex(); |
| } |
| else |
| { |
| // Update next Cell |
| next = new Cell(values, field.position(), newRow, rid.isUpdate(), rid.getTimestamp()); |
| } |
| |
| return true; |
| } |
| |
| // Finished so close |
| next = null; |
| close(); |
| return false; |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| if (!closed) |
| { |
| scanner.close(); |
| closed = true; |
| long runtimeNanos = System.nanoTime() - startTimeNanos; |
| LOGGER.info("Closed CompactionScanner runtimeNanos={}", runtimeNanos); |
| stats.closedSparkCellIterator(runtimeNanos); |
| } |
| } |
| |
| /* Iterator Helpers */ |
| |
| /** |
| * If it is a new partition see if we can skip (e.g. if partition outside Spark worker token range), otherwise re-build partition keys |
| */ |
| private void maybeRebuildPartition() |
| { |
| if (!rid.isNewPartition()) |
| { |
| return; |
| } |
| |
| // Skip partitions not in the token range for this Spark partition |
| newRow = true; |
| |
| for (CqlField field : cqlTable.staticColumns()) |
| { |
| // We need to reset static columns between partitions, if a static column is null/not-populated |
| // in the next partition, then the previous value might be carried across |
| values[field.position()] = null; |
| } |
| |
| skipPartition = !dataLayer.isInPartition(partitionId, rid.getToken(), rid.getPartitionKey()); |
| if (skipPartition) |
| { |
| stats.skippedPartitionInIterator(rid.getPartitionKey(), rid.getToken()); |
| return; |
| } |
| |
| // Or new partition, so deserialize partition keys and update 'values' array |
| ByteBuffer partitionKey = rid.getPartitionKey(); |
| if (numPartitionKeys == 1) |
| { |
| // Not a composite partition key |
| CqlField field = cqlTable.partitionKeys().get(0); |
| values[field.position()] = deserialize(field, partitionKey); |
| } |
| else |
| { |
| // Split composite partition keys |
| ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, numPartitionKeys); |
| int index = 0; |
| for (CqlField field : cqlTable.partitionKeys()) |
| { |
| values[field.position()] = deserialize(field, partitionKeyBufs[index++]); |
| } |
| } |
| } |
| |
| /** |
| * Deserialize clustering key components and update 'values' array if changed. Mark isNewRow true if we move to new CQL row. |
| */ |
| private void maybeRebuildClusteringKeys(@NotNull ByteBuffer columnNameBuf) |
| { |
| List<CqlField> clusteringKeys = cqlTable.clusteringKeys(); |
| if (clusteringKeys.isEmpty()) |
| { |
| return; |
| } |
| |
| int index = 0; |
| for (CqlField field : clusteringKeys) |
| { |
| Object newObj = deserialize(field, ColumnTypes.extractComponent(columnNameBuf, index++)); |
| Object oldObj = values[field.position()]; |
| if (newRow || oldObj == null || newObj == null || !field.equals(newObj, oldObj)) |
| { |
| newRow = true; |
| values[field.position()] = newObj; |
| } |
| } |
| } |
| |
| /** |
| * Deserialize value field if required and update 'values' array |
| */ |
| private void deserializeField(@NotNull CqlField field) |
| { |
| if (columnFilter == null || columnFilter.includeColumn(field.name())) |
| { |
| // Deserialize value |
| Object value = deserialize(field, rid.getValue()); |
| |
| if (field.isStaticColumn()) |
| { |
| values[field.position()] = value; |
| return; |
| } |
| |
| values[values.length - 1] = value; // Last index in array always stores the cell value |
| } |
| } |
| |
| private Object deserialize(CqlField field, ByteBuffer buffer) |
| { |
| long now = System.nanoTime(); |
| Object value = buffer != null ? field.deserialize(buffer) : null; |
| stats.fieldDeserialization(field, System.nanoTime() - now); |
| return value; |
| } |
| } |