blob: e17c6a7a04d65421758f2fa3ef21c9db1443eb77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import com.google.common.io.Files;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
public class LongSegmentedFileBoundaryTest
{
@BeforeClass
public static void setup() throws Exception
{
SchemaLoader.cleanupAndLeaveDirs();
Keyspace.setInitialized();
StorageService.instance.initServer();
}
@AfterClass
public static void tearDown()
{
Config.setClientMode(false);
}
@Test
public void testRandomBoundaries()
{
long[] candidates = new long[1 + (1 << 16)];
int[] indexesToCheck = new int[1 << 8];
Random random = new Random();
for (int run = 0; run < 100; run++)
{
long seed = random.nextLong();
random.setSeed(seed);
System.out.println("Seed: " + seed);
// at least 1Ki, and as many as 256Ki, boundaries
int candidateCount = (1 + random.nextInt(candidates.length >> 10)) << 10;
generateBoundaries(random, candidateCount, candidates, indexesToCheck);
Boundaries builder = new Boundaries();
int nextIndexToCheck = indexesToCheck[0];
int checkCount = 0;
System.out.printf("[0..%d)", candidateCount);
for (int i = 1; i < candidateCount - 1; i++)
{
if (i == nextIndexToCheck)
{
if (checkCount % 20 == 0)
System.out.printf(" %d", i);
// grow number of samples logarithmically; work will still increase superlinearly, as size of dataset grows linearly
int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount));
checkBoundarySample(random, candidates, i, sampleCount, builder);
// select out next index to check (there may be dups, so skip them)
while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount : indexesToCheck[checkCount]) == i)
checkCount++;
}
builder.addCandidate(candidates[i]);
}
System.out.println();
checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount - 1]);
Assert.assertEquals(candidateCount, nextIndexToCheck);
}
}
private static void generateBoundaries(Random random, int candidateCount, long[] candidates, int[] indexesToCheck)
{
// average averageBoundarySize is 4MiB, max 4GiB, min 4KiB
long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20);
long prev = 0;
for (int i = 1 ; i < candidateCount ; i++)
candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian() * averageBoundarySize));
// generate indexes we will corroborate our behaviour on
for (int i = 0 ; i < indexesToCheck.length ; i++)
indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2);
Arrays.sort(indexesToCheck);
}
private static void checkBoundarySample(Random random, long[] candidates, int candidateCount, int sampleCount, Boundaries builder)
{
for (int i = 0 ; i < sampleCount ; i++)
{
// pick a number exponentially less likely to be near the beginning, since we test that area earlier
int position = 0 ;
while (position <= 0)
position = candidateCount / (Integer.lowestOneBit(random.nextInt()));
long upperBound = candidates[position];
long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt())))
: candidates[Math.max(0, position - random.nextInt(64))];
long length = rand(random, lowerBound, upperBound);
checkBoundaries(candidates, candidateCount, builder, length);
}
checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]);
}
private static long rand(Random random, long lowerBound, long upperBound)
{
if (upperBound == lowerBound)
return upperBound;
return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound));
}
private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries builder, long length)
{
if (length == 0)
return;
long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))];
int count = 1;
int prev = 0;
while (true)
{
int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE
? prev + 1
: Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count - 1] + Integer.MAX_VALUE);
if (p < 0) p = -2 -p;
if (p >= candidateCount - 1 || candidates[p] >= length)
break;
boundaries[count++] = candidates[p];
if (candidates[p + 1] >= length)
break;
prev = p;
}
if (candidates[candidateCount - 1] < length && length - boundaries[count - 1] >= Integer.MAX_VALUE)
boundaries[count++] = candidates[candidateCount - 1];
boundaries[count++] = length;
final long[] canon = Arrays.copyOf(boundaries, count);
final long[] check = builder.finish(length, false);
if (!Arrays.equals(canon, check))
Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check), Arrays.equals(canon, check));
}
@Test
public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1, 1 << 16);
}
@Test
public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1, 1 << 20);
}
@Test
public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1, 100 << 20);
}
@Test
public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024);
}
@Test
public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1, Integer.MAX_VALUE);
}
@Test
public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1 << 7, 1 << 15);
}
@Test
public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException, IOException
{
testBoundariesAndRepair(1 << 14, 1 << 15);
}
private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException, IOException
{
String KS = "cql_keyspace";
String TABLE = "table1";
File tempdir = Files.createTempDir();
try
{
Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize);
Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner);
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
Assert.assertTrue(dataDir.mkdirs());
String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1 ? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };";
String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)";
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.withPartitioner(StorageService.getPartitioner())
.using(insert)
.sorted();
CQLSSTableWriter writer = builder.build();
// write 8Gb of decorated keys
ByteBuffer[] value = new ByteBuffer[rows];
for (int row = 0 ; row < rows ; row++)
{
// if we're using clustering columns, the clustering key is replicated across every other column
value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5));
value[row].putInt(0, row);
}
long targetSize = 8L << 30;
long dk = 0;
long size = 0;
long dkSize = rowSize * rows;
while (size < targetSize)
{
for (int row = 0 ; row < rows ; row++)
writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]);
size += dkSize;
dk++;
}
Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL);
writer.close();
// open (and close) the reader so that the summary file is created
SSTableReader reader = SSTableReader.open(descriptor);
reader.selfRef().release();
// then check the boundaries are reasonable, and corrupt them
checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
// then check that reopening corrects the corruption
reader = SSTableReader.open(descriptor);
reader.selfRef().release();
checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
}
finally
{
FileUtils.deleteRecursive(tempdir);
}
}
private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable) throws IOException
{
File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY));
DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile));
IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(), true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL);
ByteBuffer first = ByteBufferUtil.readWithLength(iStream);
ByteBuffer last = ByteBufferUtil.readWithLength(iStream);
MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder();
MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder();
ibuilder.deserializeBounds(iStream);
dbuilder.deserializeBounds(iStream);
iStream.close();
// index file cannot generally be non-mmappable, as index entries cannot be larger than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int)
assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries());
assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries());
DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile));
IndexSummary.serializer.serialize(indexSummary, oStream, true);
ByteBufferUtil.writeWithLength(first, oStream);
ByteBufferUtil.writeWithLength(last, oStream);
oStream.writeInt(1);
oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length());
oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length());
oStream.close();
}
private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries)
{
long length = new File(path).length();
long prev = boundaries[0];
for (int i = 1 ; i <= boundaries.length && prev < length ; i++)
{
long boundary = i == boundaries.length ? length : boundaries[i];
Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length),
expectMmappable, boundary - prev <= Integer.MAX_VALUE);
prev = boundary;
}
}
}