blob: c04fd3582c9eeb038748dce3d4c209ad9abbe0e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SSTableWriterTestBase extends SchemaLoader
{
protected static final String KEYSPACE = "SSTableRewriterTest";
protected static final String CF = "Standard1";
protected static final String CF_SMALL_MAX_VALUE = "Standard_SmallMaxValue";
private static Config.DiskAccessMode standardMode;
private static Config.DiskAccessMode indexMode;
private static int maxValueSize;
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
if (FBUtilities.isWindows())
{
standardMode = DatabaseDescriptor.getDiskAccessMode();
indexMode = DatabaseDescriptor.getIndexAccessMode();
DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
}
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE, CF),
SchemaLoader.standardCFMD(KEYSPACE, CF_SMALL_MAX_VALUE));
maxValueSize = DatabaseDescriptor.getMaxValueSize();
DatabaseDescriptor.setMaxValueSize(1024 * 1024); // set max value size to 1MB
}
@AfterClass
public static void revertConfiguration()
{
DatabaseDescriptor.setMaxValueSize(maxValueSize);
DatabaseDescriptor.setDiskAccessMode(standardMode);
DatabaseDescriptor.setIndexAccessMode(indexMode);
}
@After
public void truncateCF()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.truncateBlocking();
LifecycleTransaction.waitForDeletions();
}
public static void truncate(ColumnFamilyStore cfs)
{
cfs.truncateBlocking();
LifecycleTransaction.waitForDeletions();
Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
validateCFS(cfs);
}
/**
* Validate the column family store by checking that all live
* sstables are referenced only once and are not marked as
* compacting. It also checks that the generation of the data
* files on disk is the same as that of the live sstables,
* to ensure that the data files on disk belong to the live
* sstables. Finally, it checks that the metrics contain the
* correct disk space used, live and total.
*
* Note that this method will submit a maximal compaction task
* if there are live sstables, in order to check that there is at least
* a maximal task when there are live sstables.
*
* This method has therefore side effects and should be called after
* performing any other checks on previous operations, especially
* checks involving files on disk.
*
* @param cfs - the column family store to validate
*/
public static void validateCFS(ColumnFamilyStore cfs)
{
Set<Integer> liveDescriptors = new HashSet<>();
long spaceUsed = 0;
for (SSTableReader sstable : cfs.getLiveSSTables())
{
assertFalse(sstable.isMarkedCompacted());
assertEquals(1, sstable.selfRef().globalCount());
liveDescriptors.add(sstable.descriptor.generation);
spaceUsed += sstable.bytesOnDisk();
}
for (File dir : cfs.getDirectories().getCFDirectories())
{
for (File f : dir.listFiles())
{
if (f.getName().contains("Data"))
{
Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
assertTrue(d.toString(), liveDescriptors.contains(d.generation));
}
}
}
assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
assertTrue(cfs.getTracker().getCompacting().isEmpty());
if(cfs.getLiveSSTables().size() > 0)
assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty());
}
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
{
String filename = cfs.getSSTablePath(directory);
return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
}
public static ByteBuffer random(int i, int size)
{
byte[] bytes = new byte[size + 4];
ThreadLocalRandom.current().nextBytes(bytes);
ByteBuffer r = ByteBuffer.wrap(bytes);
r.putInt(0, i);
return r;
}
public static int assertFileCounts(String [] files)
{
int tmplinkcount = 0;
int tmpcount = 0;
int datacount = 0;
for (String f : files)
{
if (f.endsWith("-CRC.db"))
continue;
if (f.contains("tmplink-"))
tmplinkcount++;
else if (f.contains("tmp-"))
tmpcount++;
else if (f.contains("Data"))
datacount++;
}
assertEquals(0, tmplinkcount);
assertEquals(0, tmpcount);
return datacount;
}
}