blob: f40a88f9291c7da067100bdb553c7fdef3549085 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.cassandra.spark.sparksql;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.spark.reader.Rid;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.ArrayUtils;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.ColumnTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
* Iterate through CompactionIterator, deserializing ByteBuffers and normalizing into Object[] array in column order
public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
private static final Logger LOGGER = LoggerFactory.getLogger(SparkCellIterator.class);
protected final DataLayer dataLayer;
private final Stats stats;
private final CqlTable cqlTable;
private final Object[] values;
private final int numPartitionKeys;
private final boolean noValueColumns;
protected final PruneColumnFilter columnFilter;
private final long startTimeNanos;
private final StreamScanner scanner;
private final Rid rid;
// Mutable Iterator State
private boolean skipPartition = false;
private boolean newRow = false;
private boolean closed = false;
private Cell next = null;
private long previousTimeNanos;
protected final int partitionId;
public SparkCellIterator(int partitionId,
@NotNull DataLayer dataLayer,
@Nullable StructType requiredSchema,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@Nullable CdcOffsetFilter cdcOffsetFilter)
this.partitionId = partitionId;
this.dataLayer = dataLayer;
stats = dataLayer.stats();
cqlTable = dataLayer.cqlTable();
numPartitionKeys = cqlTable.numPartitionKeys();
columnFilter = buildColumnFilter(requiredSchema, cqlTable);
if (columnFilter != null)
{"Adding prune column filter columns='{}'", String.join(",", columnFilter.requiredColumns()));
// If we are reading only partition/clustering keys or static columns, no value columns
Set<String> valueColumns = cqlTable.valueColumns().stream().map(CqlField::name).collect(Collectors.toSet());
noValueColumns = columnFilter.requiredColumns().stream().noneMatch(valueColumns::contains);
noValueColumns = cqlTable.numValueColumns() == 0;
// The value array copies across all the partition/clustering/static columns
// and the single column value for this cell to the SparkRowIterator
values = new Object[cqlTable.numNonValueColumns() + (noValueColumns ? 0 : 1)];
// Open compaction scanner
startTimeNanos = System.nanoTime();
previousTimeNanos = startTimeNanos;
scanner = openScanner(partitionId, partitionKeyFilters, cdcOffsetFilter);
long openTimeNanos = System.nanoTime() - startTimeNanos;"Opened CompactionScanner runtimeNanos={}", openTimeNanos);
rid = scanner.rid();
protected StreamScanner openScanner(int partitionId,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@Nullable CdcOffsetFilter cdcOffsetFilter)
return dataLayer.openCompactionScanner(partitionId, partitionKeyFilters, columnFilter);
static PruneColumnFilter buildColumnFilter(@Nullable StructType requiredSchema, @NotNull CqlTable cqlTable)
return requiredSchema != null
? new PruneColumnFilter(
: null;
public boolean noValueColumns()
return noValueColumns;
public boolean hasNext()
return hasNextThrows();
catch (IOException exception)
throw new RuntimeException(exception);
public boolean hasNextThrows() throws IOException
if (next != null || closed)
return !closed;
return getNext();
public Cell next()
Cell result = next;
assert result != null;
next = null;
newRow = false;
long now = System.nanoTime();
stats.nextCell(now - previousTimeNanos);
previousTimeNanos = now;
return result;
private boolean getNext() throws IOException
while (scanner.hasNext())
// If hasNext returns true, it indicates the partition keys has been loaded into the rid.
// Therefore, let's try to rebuild partition.
// Deserialize partition keys - if we have moved to a new partition - and update 'values' Object[] array.
if (rid.shouldConsumeRangeTombstoneMarkers())
List<RangeTombstoneMarker> markers = rid.getRangeTombstoneMarkers();
long maxTimestamp =
.map(marker -> {
if (marker.isBoundary())
return Math.max(marker.openDeletionTime(false), marker.closeDeletionTime(false));
return marker.isOpen(false) ? marker.openDeletionTime(false) : marker.closeDeletionTime(false);
.get(); // Safe to call get as markers is non-empty
// Range tombstones requires only to have the partition key in the spark row,
// the range tombstones are encoded in the extra column
int partitionkeyLength = cqlTable.numPartitionKeys();
next = new RangeTombstone(ArrayUtils.retain(values, 0, partitionkeyLength), maxTimestamp, markers);
return true;
if (rid.isPartitionDeletion())
// Special case that row deletion will only have the partition key parts present in the values array
int partitionkeyLength = cqlTable.numPartitionKeys();
// Strip out other values if any rather than the partition keys
next = new Tombstone(ArrayUtils.retain(values, 0, partitionkeyLength), rid.getTimestamp());
rid.setPartitionDeletion(false); // Reset
return true;
// Skip partition e.g. if token is outside of Spark worker token range
if (skipPartition)
// Deserialize clustering keys - if moved to new CQL row - and update 'values' Object[] array
ByteBuffer columnNameBuf = Objects.requireNonNull(rid.getColumnName(), "ColumnName buffer in Rid is null, this is unexpected");
// Deserialize CQL field column name
ByteBuffer component = ColumnTypes.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
String columnName = component != null ? ByteBufferUtils.stringThrowRuntime(component) : null;
if (columnName == null || columnName.length() == 0)
if (noValueColumns)
// Special case where schema consists only of partition keys, clustering keys or static columns, no value columns
next = new Cell(values, 0, newRow, rid.isUpdate(), rid.getTimestamp());
return true;
// SBR job (not CDC) should not expect encountering a row tombstone.
// It would throw IllegalStateException at the beginning of this method (at scanner.hasNext()).
// For a row deletion, the resulting row tombstone does not carry other fields than the primary keys.
if (rid.isRowDeletion())
// Special case that row deletion will only have the primary key parts present in the values array
int primaryKeyLength = cqlTable.numPrimaryKeyColumns();
// Strip out other values if any rather than the primary keys
next = new Tombstone(ArrayUtils.retain(values, 0, primaryKeyLength), rid.getTimestamp());
// Reset row deletion flag
return true;
CqlField field = cqlTable.getField(columnName);
if (field == null)
LOGGER.warn("Ignoring unknown column columnName='{}'", columnName);
// Deserialize value field or static column and update 'values' Object[] array
// Static column, so continue reading entire CQL row before returning
if (field.isStaticColumn())
if (rid.hasCellTombstoneInComplex())
next = new TombstonesInComplex(values, field.position(), newRow, rid.getTimestamp(), columnName, rid.getCellTombstonesInComplex());
// Update next Cell
next = new Cell(values, field.position(), newRow, rid.isUpdate(), rid.getTimestamp());
return true;
// Finished so close
next = null;
return false;
public void close() throws IOException
if (!closed)
closed = true;
long runtimeNanos = System.nanoTime() - startTimeNanos;"Closed CompactionScanner runtimeNanos={}", runtimeNanos);
/* Iterator Helpers */
* If it is a new partition see if we can skip (e.g. if partition outside Spark worker token range), otherwise re-build partition keys
private void maybeRebuildPartition()
if (!rid.isNewPartition())
// Skip partitions not in the token range for this Spark partition
newRow = true;
for (CqlField field : cqlTable.staticColumns())
// We need to reset static columns between partitions, if a static column is null/not-populated
// in the next partition, then the previous value might be carried across
values[field.position()] = null;
skipPartition = !dataLayer.isInPartition(partitionId, rid.getToken(), rid.getPartitionKey());
if (skipPartition)
stats.skippedPartitionInIterator(rid.getPartitionKey(), rid.getToken());
// Or new partition, so deserialize partition keys and update 'values' array
ByteBuffer partitionKey = rid.getPartitionKey();
if (numPartitionKeys == 1)
// Not a composite partition key
CqlField field = cqlTable.partitionKeys().get(0);
values[field.position()] = deserialize(field, partitionKey);
// Split composite partition keys
ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, numPartitionKeys);
int index = 0;
for (CqlField field : cqlTable.partitionKeys())
values[field.position()] = deserialize(field, partitionKeyBufs[index++]);
* Deserialize clustering key components and update 'values' array if changed. Mark isNewRow true if we move to new CQL row.
private void maybeRebuildClusteringKeys(@NotNull ByteBuffer columnNameBuf)
List<CqlField> clusteringKeys = cqlTable.clusteringKeys();
if (clusteringKeys.isEmpty())
int index = 0;
for (CqlField field : clusteringKeys)
Object newObj = deserialize(field, ColumnTypes.extractComponent(columnNameBuf, index++));
Object oldObj = values[field.position()];
if (newRow || oldObj == null || newObj == null || !field.equals(newObj, oldObj))
newRow = true;
values[field.position()] = newObj;
* Deserialize value field if required and update 'values' array
private void deserializeField(@NotNull CqlField field)
if (columnFilter == null || columnFilter.includeColumn(
// Deserialize value
Object value = deserialize(field, rid.getValue());
if (field.isStaticColumn())
values[field.position()] = value;
values[values.length - 1] = value; // Last index in array always stores the cell value
private Object deserialize(CqlField field, ByteBuffer buffer)
long now = System.nanoTime();
Object value = buffer != null ? field.deserialize(buffer) : null;
stats.fieldDeserialization(field, System.nanoTime() - now);
return value;