blob: 6634af3910311e09c31ef4820392f809d788014d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import java.io.IOException;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
import static com.google.common.base.Preconditions.checkState;
public abstract class SortedTablePartitionWriter implements AutoCloseable
{
protected final UnfilteredSerializer unfilteredSerializer;
private final SerializationHeader header;
private final SequentialWriter writer;
private final SerializationHelper helper;
private final int version;
private long previousRowStart;
private long initialPosition;
private long headerLength;
protected long startPosition;
protected int written;
protected ClusteringPrefix<?> firstClustering;
protected ClusteringPrefix<?> lastClustering;
protected DeletionTime openMarker = DeletionTime.LIVE;
protected DeletionTime startOpenMarker = DeletionTime.LIVE;
// Sequence control, also used to add empty static row if `addStaticRow` is not called.
private enum State
{
AWAITING_PARTITION_HEADER,
AWAITING_STATIC_ROW,
AWAITING_ROWS,
COMPLETED
}
State state = State.AWAITING_PARTITION_HEADER;
protected SortedTablePartitionWriter(SerializationHeader header, SequentialWriter writer, Version version)
{
this.header = header;
this.writer = writer;
this.unfilteredSerializer = UnfilteredSerializer.serializer;
this.helper = new SerializationHelper(header);
this.version = version.correspondingMessagingVersion();
}
protected void reset()
{
this.initialPosition = writer.position();
this.startPosition = -1;
this.previousRowStart = 0;
this.written = 0;
this.firstClustering = null;
this.lastClustering = null;
this.openMarker = DeletionTime.LIVE;
this.headerLength = -1;
this.state = State.AWAITING_PARTITION_HEADER;
}
public long getHeaderLength()
{
return headerLength;
}
public void start(DecoratedKey key, DeletionTime partitionLevelDeletion) throws IOException
{
if (state == State.COMPLETED)
reset();
checkState(state == State.AWAITING_PARTITION_HEADER);
ByteBufferUtil.writeWithShortLength(key.getKey(), writer);
DeletionTime.serializer.serialize(partitionLevelDeletion, writer);
if (!header.hasStatic())
{
this.headerLength = writer.position() - initialPosition;
state = State.AWAITING_ROWS;
return;
}
state = State.AWAITING_STATIC_ROW;
}
public void addStaticRow(Row staticRow) throws IOException
{
checkState(state == State.AWAITING_STATIC_ROW);
checkState(staticRow.isStatic());
UnfilteredSerializer.serializer.serializeStaticRow(staticRow, helper, writer, version);
this.headerLength = writer.position() - initialPosition;
state = State.AWAITING_ROWS;
}
public void addUnfiltered(Unfiltered unfiltered) throws IOException
{
checkState(state == State.AWAITING_ROWS);
long pos = currentPosition();
if (firstClustering == null)
{
// Beginning of an index block. Remember the start and position
firstClustering = unfiltered.clustering();
startOpenMarker = openMarker;
startPosition = pos;
}
long unfilteredPosition = writer.position();
unfilteredSerializer.serialize(unfiltered, helper, writer, pos - previousRowStart, version);
lastClustering = unfiltered.clustering();
previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE;
}
}
protected long finish() throws IOException
{
checkState(state == State.AWAITING_ROWS);
state = State.COMPLETED;
long endPosition = currentPosition();
unfilteredSerializer.writeEndOfPartition(writer);
return endPosition;
}
protected long currentPosition()
{
return writer.position() - initialPosition;
}
public long getInitialPosition()
{
return initialPosition;
}
}