blob: 0a2df4247ae37c73ba66bacd960b530951110511 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.reader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.jetbrains.annotations.NotNull;
/**
* Helper methods for reading the Summary.db SSTable file component
*/
public final class SummaryDbUtils
{
public static class Summary
{
private final IndexSummary indexSummary;
private final DecoratedKey firstKey;
private final DecoratedKey lastKey;
Summary(IndexSummary indexSummary,
DecoratedKey firstKey,
DecoratedKey lastKey)
{
this.indexSummary = indexSummary;
this.firstKey = firstKey;
this.lastKey = lastKey;
}
public IndexSummary summary()
{
return indexSummary;
}
public DecoratedKey first()
{
return firstKey;
}
public DecoratedKey last()
{
return lastKey;
}
}
private SummaryDbUtils()
{
throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated");
}
public static Summary readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException
{
try (InputStream in = ssTable.openSummaryStream())
{
return readSummary(in, metadata.partitioner, metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
}
}
/**
* Read and deserialize the Summary.db file
*
* @param summaryStream input stream for Summary.db file
* @param partitioner token partitioner
* @param minIndexInterval min index interval
* @param maxIndexInterval max index interval
* @return Summary object
* @throws IOException io exception
*/
static Summary readSummary(InputStream summaryStream,
IPartitioner partitioner,
int minIndexInterval,
int maxIndexInterval) throws IOException
{
if (summaryStream == null)
{
return null;
}
try (DataInputStream is = new DataInputStream(summaryStream))
{
IndexSummary indexSummary = IndexSummary.serializer.deserialize(is, partitioner, minIndexInterval, maxIndexInterval);
DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is));
DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is));
return new Summary(indexSummary, firstKey, lastKey);
}
}
public interface TokenList
{
int size();
BigInteger tokenAt(int index);
}
/**
* Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for
*
* @param summary IndexSummary from Summary.db file
* @param partitioner Cassandra partitioner to hash partition keys to token
* @param token the token we are trying to find
* @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for
*/
public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return summary.getPosition(binarySearchSummary(summary, partitioner, token));
}
public static class IndexSummaryTokenList implements TokenList
{
final IPartitioner partitioner;
final IndexSummary summary;
IndexSummaryTokenList(IPartitioner partitioner,
IndexSummary summary)
{
this.partitioner = partitioner;
this.summary = summary;
}
public int size()
{
return summary.size();
}
public BigInteger tokenAt(int index)
{
return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken());
}
}
public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token);
}
/**
* Binary search the Summary.db file to find nearest index offset in Index.db for a given token.
* Method lifted from org.apache.cassandra.io.sstable.IndexSummary.binarySearch(PartitionPosition key) and reworked for tokens.
*
* @param tokenList list of tokens to binary search
* @param token token to find
* @return closest offset in Index.db preceding token
*/
public static int binarySearchSummary(TokenList tokenList, BigInteger token)
{
int low = 0;
int mid = tokenList.size();
int high = mid - 1;
int result = -1;
while (low <= high)
{
mid = low + high >> 1;
result = token.compareTo(tokenList.tokenAt(mid));
if (result > 0)
{
low = mid + 1;
}
else if (result < 0)
{
high = mid - 1;
}
else
{
break; // Exact match
}
}
// If:
// 1) result < 0: the token is less than nearest sampled token found at mid, so we need to start from mid - 1.
// 2) result == 0: we found an exact match for the token in the sample,
// but there may be token collisions in Data.db so start from mid -1 to be safe.
// 3) result > 0: the nearest sample token at mid is less than the token so we can start from that position.
return result <= 0 ? Math.max(0, mid - 1) : mid;
}
}