blob: 1ab96fac1d511b3499e245a6cec5ec01a634f163 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.io.IOError;
import java.util.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.net.MessagingService;
/**
* Helper class to deserialize Unfiltered object from disk efficiently.
*
* More precisely, this class is used by the low-level reader to ensure
* we don't do more work than necessary (i.e. we don't allocate/deserialize
* objects for things we don't care about).
*/
public abstract class UnfilteredDeserializer
{
private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class);
protected final CFMetaData metadata;
protected final DataInputPlus in;
protected final SerializationHelper helper;
protected UnfilteredDeserializer(CFMetaData metadata,
DataInputPlus in,
SerializationHelper helper)
{
this.metadata = metadata;
this.in = in;
this.helper = helper;
}
public static UnfilteredDeserializer create(CFMetaData metadata,
DataInputPlus in,
SerializationHeader header,
SerializationHelper helper,
DeletionTime partitionDeletion,
boolean readAllAsDynamic)
{
if (helper.version >= MessagingService.VERSION_30)
return new CurrentDeserializer(metadata, in, header, helper);
else
return new OldFormatDeserializer(metadata, in, helper, partitionDeletion, readAllAsDynamic);
}
/**
* Whether or not there is more atom to read.
*/
public abstract boolean hasNext() throws IOException;
/**
* Compare the provided bound to the next atom to read on disk.
*
* This will not read/deserialize the whole atom but only what is necessary for the
* comparison. Whenever we know what to do with this atom (read it or skip it),
* readNext or skipNext should be called.
*/
public abstract int compareNextTo(ClusteringBound bound) throws IOException;
/**
* Returns whether the next atom is a row or not.
*/
public abstract boolean nextIsRow() throws IOException;
/**
* Returns whether the next atom is the static row or not.
*/
public abstract boolean nextIsStatic() throws IOException;
/**
* Returns the next atom.
*/
public abstract Unfiltered readNext() throws IOException;
/**
* Clears any state in this deserializer.
*/
public abstract void clearState() throws IOException;
/**
* Skips the next atom.
*/
public abstract void skipNext() throws IOException;
/**
* For the legacy layout deserializer, we have to deal with the fact that a row can span multiple index blocks and that
* the call to hasNext() reads the next element upfront. We must take that into account when we check in AbstractSSTableIterator if
* we're past the end of an index block boundary as that check expect to account for only consumed data (that is, if hasNext has
* been called and made us cross an index boundary but neither readNext() or skipNext() as yet been called, we shouldn't consider
* the index block boundary crossed yet).
*
* TODO: we don't care about this for the current file format because a row can never span multiple index blocks (further, hasNext()
* only just basically read 2 bytes from disk in that case). So once we drop backward compatibility with pre-3.0 sstable, we should
* remove this.
*/
public abstract long bytesReadForUnconsumedData();
private static class CurrentDeserializer extends UnfilteredDeserializer
{
private final ClusteringPrefix.Deserializer clusteringDeserializer;
private final SerializationHeader header;
private int nextFlags;
private int nextExtendedFlags;
private boolean isReady;
private boolean isDone;
private final Row.Builder builder;
private CurrentDeserializer(CFMetaData metadata,
DataInputPlus in,
SerializationHeader header,
SerializationHelper helper)
{
super(metadata, in, helper);
this.header = header;
this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
this.builder = BTreeRow.sortedBuilder();
}
public boolean hasNext() throws IOException
{
if (isReady)
return true;
prepareNext();
return !isDone;
}
private void prepareNext() throws IOException
{
if (isDone)
return;
nextFlags = in.readUnsignedByte();
if (UnfilteredSerializer.isEndOfPartition(nextFlags))
{
isDone = true;
isReady = false;
return;
}
nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags);
clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
isReady = true;
}
public int compareNextTo(ClusteringBound bound) throws IOException
{
if (!isReady)
prepareNext();
assert !isDone;
return clusteringDeserializer.compareNextTo(bound);
}
public boolean nextIsRow() throws IOException
{
if (!isReady)
prepareNext();
return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW;
}
public boolean nextIsStatic() throws IOException
{
// This exists only for the sake of the OldFormatDeserializer
throw new UnsupportedOperationException();
}
public Unfiltered readNext() throws IOException
{
isReady = false;
if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
ClusteringBoundOrBoundary bound = clusteringDeserializer.deserializeNextBound();
return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound);
}
else
{
builder.newRow(clusteringDeserializer.deserializeNextClustering());
return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder);
}
}
public void skipNext() throws IOException
{
isReady = false;
clusteringDeserializer.skipNext();
if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
UnfilteredSerializer.serializer.skipMarkerBody(in);
}
else
{
UnfilteredSerializer.serializer.skipRowBody(in);
}
}
public void clearState()
{
isReady = false;
isDone = false;
}
public long bytesReadForUnconsumedData()
{
// In theory, hasNext() does consume 2-3 bytes, but we don't care about this for the current file format so returning
// 0 to mean "do nothing".
return 0;
}
}
public static class OldFormatDeserializer extends UnfilteredDeserializer
{
private final boolean readAllAsDynamic;
private boolean skipStatic;
// The next Unfiltered to return, computed by hasNext()
private Unfiltered next;
// A temporary storage for an unfiltered that isn't returned next but should be looked at just afterwards
private Unfiltered saved;
private boolean isFirst = true;
// The Unfiltered as read from the old format input
private final UnfilteredIterator iterator;
// The position in the input after the last data consumption (readNext/skipNext).
private long lastConsumedPosition;
private OldFormatDeserializer(CFMetaData metadata,
DataInputPlus in,
SerializationHelper helper,
DeletionTime partitionDeletion,
boolean readAllAsDynamic)
{
super(metadata, in, helper);
this.iterator = new UnfilteredIterator(partitionDeletion);
this.readAllAsDynamic = readAllAsDynamic;
this.lastConsumedPosition = currentPosition();
}
public void setSkipStatic()
{
this.skipStatic = true;
}
private boolean isStatic(Unfiltered unfiltered)
{
return unfiltered.isRow() && ((Row)unfiltered).isStatic();
}
public boolean hasNext() throws IOException
{
try
{
while (next == null)
{
if (saved == null && !iterator.hasNext())
return false;
next = saved == null ? iterator.next() : saved;
saved = null;
// The sstable iterators assume that if there is one, the static row is the first thing this deserializer will return.
// However, in the old format, a range tombstone with an empty start would sort before any static cell. So we should
// detect that case and return the static parts first if necessary.
if (isFirst && iterator.hasNext() && isStatic(iterator.peek()))
{
saved = next;
next = iterator.next();
}
isFirst = false;
// When reading old tables, we sometimes want to skip static data (due to how staticly defined column of compact
// tables are handled).
if (skipStatic && isStatic(next))
next = null;
}
return true;
}
catch (IOError e)
{
if (e.getCause() != null && e.getCause() instanceof IOException)
throw (IOException)e.getCause();
throw e;
}
}
private boolean isRow(LegacyLayout.LegacyAtom atom)
{
if (atom.isCell())
return true;
LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
}
public int compareNextTo(ClusteringBound bound) throws IOException
{
if (!hasNext())
throw new IllegalStateException();
return metadata.comparator.compare(next.clustering(), bound);
}
public boolean nextIsRow() throws IOException
{
if (!hasNext())
throw new IllegalStateException();
return next.isRow();
}
public boolean nextIsStatic() throws IOException
{
return nextIsRow() && ((Row)next).isStatic();
}
private long currentPosition()
{
// We return a bogus value if the input is not file based, but check we never rely
// on that value in that case in bytesReadForUnconsumedData
return in instanceof FileDataInput ? ((FileDataInput)in).getFilePointer() : 0;
}
public Unfiltered readNext() throws IOException
{
if (!hasNext())
throw new IllegalStateException();
Unfiltered toReturn = next;
next = null;
lastConsumedPosition = currentPosition();
return toReturn;
}
public void skipNext() throws IOException
{
if (!hasNext())
throw new UnsupportedOperationException();
next = null;
lastConsumedPosition = currentPosition();
}
public long bytesReadForUnconsumedData()
{
if (!(in instanceof FileDataInput))
throw new AssertionError();
return currentPosition() - lastConsumedPosition;
}
public void clearState()
{
next = null;
saved = null;
iterator.clearState();
lastConsumedPosition = currentPosition();
}
// Groups atoms from the input into proper Unfiltered.
// Note: this could use guava AbstractIterator except that we want to be able to clear
// the internal state of the iterator so it's cleaner to do it ourselves.
private class UnfilteredIterator implements PeekingIterator<Unfiltered>
{
private final AtomIterator atoms;
private final LegacyLayout.CellGrouper grouper;
private final TombstoneTracker tombstoneTracker;
private Unfiltered next;
private UnfilteredIterator(DeletionTime partitionDeletion)
{
this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
this.atoms = new AtomIterator();
}
public boolean hasNext()
{
// Note that we loop on next == null because TombstoneTracker.openNew() could return null below or the atom might be shadowed.
while (next == null)
{
if (atoms.hasNext())
{
// If a range tombstone closes strictly before the next row/RT, we need to return that close (or boundary) marker first.
if (tombstoneTracker.hasClosingMarkerBefore(atoms.peek()))
{
next = tombstoneTracker.popClosingMarker();
}
else
{
LegacyLayout.LegacyAtom atom = atoms.next();
if (!tombstoneTracker.isShadowed(atom))
next = isRow(atom) ? readRow(atom) : tombstoneTracker.openNew(atom.asRangeTombstone());
}
}
else if (tombstoneTracker.hasOpenTombstones())
{
next = tombstoneTracker.popClosingMarker();
}
else
{
return false;
}
}
return true;
}
private Unfiltered readRow(LegacyLayout.LegacyAtom first)
{
LegacyLayout.CellGrouper grouper = first.isStatic()
? LegacyLayout.CellGrouper.staticGrouper(metadata, helper)
: this.grouper;
grouper.reset();
grouper.addAtom(first);
// As long as atoms are part of the same row, consume them. Note that the call to addAtom() uses
// atoms.peek() so that the atom is only consumed (by next) if it's part of the row (addAtom returns true)
while (atoms.hasNext() && grouper.addAtom(atoms.peek()))
{
atoms.next();
}
return grouper.getRow();
}
public Unfiltered next()
{
if (!hasNext())
throw new UnsupportedOperationException();
Unfiltered toReturn = next;
next = null;
return toReturn;
}
public Unfiltered peek()
{
if (!hasNext())
throw new UnsupportedOperationException();
return next;
}
public void clearState()
{
atoms.clearState();
tombstoneTracker.clearState();
next = null;
}
public void remove()
{
throw new UnsupportedOperationException();
}
}
// Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
// Note: this could use guava AbstractIterator except that we want to be able to clear
// the internal state of the iterator so it's cleaner to do it ourselves.
private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
{
private boolean isDone;
private LegacyLayout.LegacyAtom next;
private AtomIterator()
{
}
public boolean hasNext()
{
if (isDone)
return false;
if (next == null)
{
next = readAtom();
if (next == null)
{
isDone = true;
return false;
}
}
return true;
}
private LegacyLayout.LegacyAtom readAtom()
{
try
{
return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
}
catch (IOException e)
{
throw new IOError(e);
}
}
public LegacyLayout.LegacyAtom next()
{
if (!hasNext())
throw new UnsupportedOperationException();
LegacyLayout.LegacyAtom toReturn = next;
next = null;
return toReturn;
}
public LegacyLayout.LegacyAtom peek()
{
if (!hasNext())
throw new UnsupportedOperationException();
return next;
}
public void clearState()
{
this.next = null;
this.isDone = false;
}
public void remove()
{
throw new UnsupportedOperationException();
}
}
/**
* Tracks which range tombstones are open when deserializing the old format.
*/
private class TombstoneTracker
{
private final DeletionTime partitionDeletion;
// Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
// As we only track non-fully-shadowed ranges, the first range is necessarily the currently
// open tombstone (the one with the higher timestamp).
private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
public TombstoneTracker(DeletionTime partitionDeletion)
{
this.partitionDeletion = partitionDeletion;
this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
}
/**
* Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
*/
public boolean isShadowed(LegacyLayout.LegacyAtom atom)
{
assert !hasClosingMarkerBefore(atom);
long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
if (partitionDeletion.deletes(timestamp))
return true;
SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
}
/**
* Whether the currently open marker closes stricly before the provided row/RT.
*/
public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
{
return !openTombstones.isEmpty()
&& metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
}
/**
* Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
*/
public Unfiltered popClosingMarker()
{
assert !openTombstones.isEmpty();
Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
LegacyLayout.LegacyRangeTombstone first = iter.next();
iter.remove();
// If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
// next tombstone
if (!iter.hasNext())
return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
LegacyLayout.LegacyRangeTombstone next = iter.next();
return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
}
/**
* Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
* of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
* or even null (if the new tombston start is supersedes by the currently open tombstone).
*
* Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
* assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
*/
public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
{
if (openTombstones.isEmpty())
{
openTombstones.add(tombstone);
return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
}
Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
LegacyLayout.LegacyRangeTombstone first = iter.next();
if (tombstone.deletionTime.supersedes(first.deletionTime))
{
// We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open
// one and open the new one. We should also add the tombstone, but if it stop after the first one, we should
// also remove that first tombstone as it won't be useful anymore.
if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0)
iter.remove();
openTombstones.add(tombstone);
return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime);
}
else
{
// If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we
// just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone
// simply extend after the first one and we'll deal with it later)
assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) > 0;
openTombstones.add(tombstone);
return null;
}
}
public boolean hasOpenTombstones()
{
return !openTombstones.isEmpty();
}
private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open)
{
return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0;
}
public void clearState()
{
openTombstones.clear();
}
}
}
}