blob: 611523fd0f8cbd82a7653a87b991cc32b9eff614 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.pager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.LegacyLayout;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.utils.ByteBufferUtil;
public class PagingState
{
public final ByteBuffer partitionKey; // Can be null for single partition queries.
public final RowMark rowMark; // Can be null if not needed.
public final int remaining;
public final int remainingInPartition;
public PagingState(ByteBuffer partitionKey, RowMark rowMark, int remaining, int remainingInPartition)
{
this.partitionKey = partitionKey;
this.rowMark = rowMark;
this.remaining = remaining;
this.remainingInPartition = remainingInPartition;
}
public static PagingState deserialize(ByteBuffer bytes, int protocolVersion)
{
if (bytes == null)
return null;
try (DataInputBuffer in = new DataInputBuffer(bytes, true))
{
ByteBuffer pk;
RowMark mark;
int remaining, remainingInPartition;
if (protocolVersion <= Server.VERSION_3)
{
pk = ByteBufferUtil.readWithShortLength(in);
mark = new RowMark(ByteBufferUtil.readWithShortLength(in), protocolVersion);
remaining = in.readInt();
// Note that while 'in.available()' is theoretically an estimate of how many bytes are available
// without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
// bytes remain to be read. And the reason we want to condition this is for backward compatility
// as we used to not set this.
remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
}
else
{
pk = ByteBufferUtil.readWithVIntLength(in);
mark = new RowMark(ByteBufferUtil.readWithVIntLength(in), protocolVersion);
remaining = (int)in.readUnsignedVInt();
remainingInPartition = (int)in.readUnsignedVInt();
}
return new PagingState(pk.hasRemaining() ? pk : null,
mark.mark.hasRemaining() ? mark : null,
remaining,
remainingInPartition);
}
catch (IOException e)
{
throw new ProtocolException("Invalid value for the paging state");
}
}
public ByteBuffer serialize(int protocolVersion)
{
assert rowMark == null || protocolVersion == rowMark.protocolVersion;
try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize(protocolVersion)))
{
ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
if (protocolVersion <= Server.VERSION_3)
{
ByteBufferUtil.writeWithShortLength(pk, out);
ByteBufferUtil.writeWithShortLength(mark, out);
out.writeInt(remaining);
out.writeInt(remainingInPartition);
}
else
{
ByteBufferUtil.writeWithVIntLength(pk, out);
ByteBufferUtil.writeWithVIntLength(mark, out);
out.writeUnsignedVInt(remaining);
out.writeUnsignedVInt(remainingInPartition);
}
return out.buffer();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
public int serializedSize(int protocolVersion)
{
assert rowMark == null || protocolVersion == rowMark.protocolVersion;
ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
if (protocolVersion <= Server.VERSION_3)
{
return ByteBufferUtil.serializedSizeWithShortLength(pk)
+ ByteBufferUtil.serializedSizeWithShortLength(mark)
+ 8; // remaining & remainingInPartition
}
else
{
return ByteBufferUtil.serializedSizeWithVIntLength(pk)
+ ByteBufferUtil.serializedSizeWithVIntLength(mark)
+ TypeSizes.sizeofUnsignedVInt(remaining)
+ TypeSizes.sizeofUnsignedVInt(remainingInPartition);
}
}
@Override
public final int hashCode()
{
return Objects.hash(partitionKey, rowMark, remaining, remainingInPartition);
}
@Override
public final boolean equals(Object o)
{
if(!(o instanceof PagingState))
return false;
PagingState that = (PagingState)o;
return Objects.equals(this.partitionKey, that.partitionKey)
&& Objects.equals(this.rowMark, that.rowMark)
&& this.remaining == that.remaining
&& this.remainingInPartition == that.remainingInPartition;
}
@Override
public String toString()
{
return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d",
partitionKey != null ? ByteBufferUtil.bytesToHex(partitionKey) : null,
rowMark,
remaining,
remainingInPartition);
}
/**
* Marks the last row returned by paging, the one from which paging should continue.
* This class essentially holds a row clustering, but due to backward compatibility reasons,
* we need to actually store the cell name for the last cell of the row we're marking when
* the protocol v3 is in use, and this class abstract that complication.
*
* See CASSANDRA-10254 for more details.
*/
public static class RowMark
{
// This can be null for convenience if no row is marked.
private final ByteBuffer mark;
private final int protocolVersion;
private RowMark(ByteBuffer mark, int protocolVersion)
{
this.mark = mark;
this.protocolVersion = protocolVersion;
}
private static List<AbstractType<?>> makeClusteringTypes(CFMetaData metadata)
{
// This is the types that will be used when serializing the clustering in the paging state. We can't really use the actual clustering
// types however because we can't guarantee that there won't be a schema change between when we send the paging state and get it back,
// and said schema change could theoretically change one of the clustering types from a fixed width type to a non-fixed one
// (say timestamp -> blob). So we simply use a list of BytesTypes (for both reading and writting), which may be slightly inefficient
// for fixed-width types, but avoid any risk during schema changes.
int size = metadata.clusteringColumns().size();
List<AbstractType<?>> l = new ArrayList<>(size);
for (int i = 0; i < size; i++)
l.add(BytesType.instance);
return l;
}
public static RowMark create(CFMetaData metadata, Row row, int protocolVersion)
{
ByteBuffer mark;
if (protocolVersion <= Server.VERSION_3)
{
// We need to be backward compatible with 2.1/2.2 nodes paging states. Which means we have to send
// the full cellname of the "last" cell in the row we get (since that's how 2.1/2.2 nodes will start after
// that last row if they get that paging state).
Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, true).iterator();
if (!cells.hasNext())
{
mark = LegacyLayout.encodeClustering(metadata, row.clustering());
}
else
{
Cell cell = cells.next();
mark = LegacyLayout.encodeCellName(metadata, row.clustering(), cell.column().name.bytes, cell.column().isComplex() ? cell.path().get(0) : null);
}
}
else
{
// We froze the serialization version to 3.0 as we need to make this this doesn't change (that is, it has to be
// fix for a given version of the protocol).
mark = Clustering.serializer.serialize(row.clustering(), MessagingService.VERSION_30, makeClusteringTypes(metadata));
}
return new RowMark(mark, protocolVersion);
}
public Clustering clustering(CFMetaData metadata)
{
if (mark == null)
return null;
return protocolVersion <= Server.VERSION_3
? LegacyLayout.decodeClustering(metadata, mark)
: Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata));
}
@Override
public final int hashCode()
{
return Objects.hash(mark, protocolVersion);
}
@Override
public final boolean equals(Object o)
{
if(!(o instanceof RowMark))
return false;
RowMark that = (RowMark)o;
return Objects.equals(this.mark, that.mark) && this.protocolVersion == that.protocolVersion;
}
@Override
public String toString()
{
return ByteBufferUtil.bytesToHex(mark);
}
}
}