blob: a7cf7b463b1d16942d58bb348b39b5f88ed01e07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.junit.After;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.index.PerRowSecondaryIndexTest;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SchemaLoader
{
private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
@BeforeClass
public static void loadSchema() throws ConfigurationException
{
prepareServer();
// Migrations aren't happy if gossiper is not started. Even if we don't use migrations though,
// some tests now expect us to start gossip for them.
startGossiper();
}
@After
public void leakDetect() throws InterruptedException
{
System.gc();
System.gc();
System.gc();
Thread.sleep(10);
}
public static void prepareServer()
{
// Cleanup first
try
{
cleanupAndLeaveDirs();
}
catch (IOException e)
{
logger.error("Failed to cleanup and recreate directories and files.");
throw new RuntimeException(e);
}
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
public void uncaughtException(Thread t, Throwable e)
{
logger.error("Fatal exception in thread " + t, e);
}
});
Keyspace.setInitialized();
}
public static void startGossiper()
{
if (!Gossiper.instance.isEnabled())
Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
}
public static void schemaDefinition(String testName) throws ConfigurationException
{
List<KSMetaData> schema = new ArrayList<KSMetaData>();
// A whole bucket of shorthand
String ks1 = testName + "Keyspace1";
String ks2 = testName + "Keyspace2";
String ks3 = testName + "Keyspace3";
String ks4 = testName + "Keyspace4";
String ks5 = testName + "Keyspace5";
String ks6 = testName + "Keyspace6";
String ks_kcs = testName + "KeyCacheSpace";
String ks_rcs = testName + "RowCacheSpace";
String ks_ccs = testName + "CounterCacheSpace";
String ks_nocommit = testName + "NoCommitlogSpace";
String ks_prsi = testName + "PerRowSecondaryIndex";
String ks_cql = testName + "cql_keyspace";
Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class;
Map<String, String> opts_rf1 = KSMetaData.optsWithRF(1);
Map<String, String> opts_rf2 = KSMetaData.optsWithRF(2);
Map<String, String> opts_rf3 = KSMetaData.optsWithRF(3);
Map<String, String> opts_rf5 = KSMetaData.optsWithRF(5);
AbstractType bytes = BytesType.instance;
AbstractType<?> composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{BytesType.instance, TimeUUIDType.instance, IntegerType.instance}));
AbstractType<?> compositeMaxMin = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{BytesType.instance, IntegerType.instance}));
Map<Byte, AbstractType<?>> aliases = new HashMap<Byte, AbstractType<?>>();
aliases.put((byte)'b', BytesType.instance);
aliases.put((byte)'t', TimeUUIDType.instance);
aliases.put((byte)'B', ReversedType.getInstance(BytesType.instance));
aliases.put((byte)'T', ReversedType.getInstance(TimeUUIDType.instance));
AbstractType<?> dynamicComposite = DynamicCompositeType.getInstance(aliases);
// Make it easy to test compaction
Map<String, String> compactionOptions = new HashMap<String, String>();
compactionOptions.put("tombstone_compaction_interval", "1");
Map<String, String> leveledOptions = new HashMap<String, String>();
leveledOptions.put("sstable_size_in_mb", "1");
// Keyspace 1
schema.add(KSMetaData.testMetadata(ks1,
simple,
opts_rf1,
// Column Families
standardCFMD(ks1, "Standard1").compactionStrategyOptions(compactionOptions),
standardCFMD(ks1, "Standard2"),
standardCFMD(ks1, "Standard3"),
standardCFMD(ks1, "Standard4"),
standardCFMD(ks1, "StandardGCGS0").gcGraceSeconds(0),
standardCFMD(ks1, "StandardLong1"),
standardCFMD(ks1, "StandardLong2"),
CFMetaData.denseCFMetaData(ks1, "ValuesWithQuotes", BytesType.instance).defaultValidator(UTF8Type.instance),
superCFMD(ks1, "Super1", LongType.instance),
superCFMD(ks1, "Super2", LongType.instance),
superCFMD(ks1, "Super3", LongType.instance),
superCFMD(ks1, "Super4", UTF8Type.instance),
superCFMD(ks1, "Super5", bytes),
superCFMD(ks1, "Super6", LexicalUUIDType.instance, UTF8Type.instance),
indexCFMD(ks1, "Indexed1", true),
indexCFMD(ks1, "Indexed2", false),
CFMetaData.denseCFMetaData(ks1, "StandardInteger1", IntegerType.instance),
CFMetaData.denseCFMetaData(ks1, "StandardLong3", IntegerType.instance),
CFMetaData.denseCFMetaData(ks1, "Counter1", bytes).defaultValidator(CounterColumnType.instance),
CFMetaData.denseCFMetaData(ks1, "SuperCounter1", bytes, bytes).defaultValidator(CounterColumnType.instance),
superCFMD(ks1, "SuperDirectGC", BytesType.instance).gcGraceSeconds(0),
jdbcSparseCFMD(ks1, "JdbcInteger", IntegerType.instance).addColumnDefinition(integerColumn(ks1, "JdbcInteger")),
jdbcSparseCFMD(ks1, "JdbcUtf8", UTF8Type.instance).addColumnDefinition(utf8Column(ks1, "JdbcUtf8")),
jdbcCFMD(ks1, "JdbcLong", LongType.instance),
jdbcCFMD(ks1, "JdbcBytes", bytes),
jdbcCFMD(ks1, "JdbcAscii", AsciiType.instance),
CFMetaData.denseCFMetaData(ks1, "StandardComposite", composite),
CFMetaData.denseCFMetaData(ks1, "StandardComposite2", compositeMaxMin),
CFMetaData.denseCFMetaData(ks1, "StandardDynamicComposite", dynamicComposite),
standardCFMD(ks1, "StandardLeveled")
.compactionStrategyClass(LeveledCompactionStrategy.class)
.compactionStrategyOptions(leveledOptions),
standardCFMD(ks1, "legacyleveled")
.compactionStrategyClass(LeveledCompactionStrategy.class)
.compactionStrategyOptions(leveledOptions),
standardCFMD(ks1, "StandardLowIndexInterval").minIndexInterval(8)
.maxIndexInterval(256)
.caching(CachingOptions.NONE),
standardCFMD(ks1, "UUIDKeys").keyValidator(UUIDType.instance),
CFMetaData.denseCFMetaData(ks1, "MixedTypes", LongType.instance).keyValidator(UUIDType.instance).defaultValidator(BooleanType.instance),
CFMetaData.denseCFMetaData(ks1, "MixedTypesComposite", composite).keyValidator(composite).defaultValidator(BooleanType.instance),
standardCFMD(ks1, "AsciiKeys").keyValidator(AsciiType.instance)
));
// Keyspace 2
schema.add(KSMetaData.testMetadata(ks2,
simple,
opts_rf1,
// Column Families
standardCFMD(ks2, "Standard1"),
standardCFMD(ks2, "Standard3"),
superCFMD(ks2, "Super3", bytes),
superCFMD(ks2, "Super4", TimeUUIDType.instance),
indexCFMD(ks2, "Indexed1", true),
compositeIndexCFMD(ks2, "Indexed2", true),
compositeIndexCFMD(ks2, "Indexed3", true).gcGraceSeconds(0)));
// Keyspace 3
schema.add(KSMetaData.testMetadata(ks3,
simple,
opts_rf5,
// Column Families
standardCFMD(ks3, "Standard1"),
indexCFMD(ks3, "Indexed1", true)));
// Keyspace 4
schema.add(KSMetaData.testMetadata(ks4,
simple,
opts_rf3,
// Column Families
standardCFMD(ks4, "Standard1"),
standardCFMD(ks4, "Standard3"),
superCFMD(ks4, "Super3", bytes),
superCFMD(ks4, "Super4", TimeUUIDType.instance),
CFMetaData.denseCFMetaData(ks4, "Super5", TimeUUIDType.instance, bytes)));
// Keyspace 5
schema.add(KSMetaData.testMetadata(ks5,
simple,
opts_rf2,
standardCFMD(ks5, "Standard1"),
standardCFMD(ks5, "Counter1")
.defaultValidator(CounterColumnType.instance)));
// Keyspace 6
schema.add(KSMetaData.testMetadata(ks6,
simple,
opts_rf1,
indexCFMD(ks6, "Indexed1", true)));
// KeyCacheSpace
schema.add(KSMetaData.testMetadata(ks_kcs,
simple,
opts_rf1,
standardCFMD(ks_kcs, "Standard1"),
standardCFMD(ks_kcs, "Standard2"),
standardCFMD(ks_kcs, "Standard3")));
// RowCacheSpace
schema.add(KSMetaData.testMetadata(ks_rcs,
simple,
opts_rf1,
standardCFMD(ks_rcs, "CFWithoutCache").caching(CachingOptions.NONE),
standardCFMD(ks_rcs, "CachedCF").caching(CachingOptions.ALL),
standardCFMD(ks_rcs, "CachedIntCF").
defaultValidator(IntegerType.instance).
caching(new CachingOptions(new CachingOptions.KeyCache(CachingOptions.KeyCache.Type.ALL),
new CachingOptions.RowCache(CachingOptions.RowCache.Type.HEAD, 100)))));
// CounterCacheSpace
schema.add(KSMetaData.testMetadata(ks_ccs,
simple,
opts_rf1,
standardCFMD(ks_ccs, "Counter1").defaultValidator(CounterColumnType.instance),
standardCFMD(ks_ccs, "Counter2").defaultValidator(CounterColumnType.instance)));
schema.add(KSMetaData.testMetadataNotDurable(ks_nocommit,
simple,
opts_rf1,
standardCFMD(ks_nocommit, "Standard1")));
// PerRowSecondaryIndexTest
schema.add(KSMetaData.testMetadata(ks_prsi,
simple,
opts_rf1,
perRowIndexedCFMD(ks_prsi, "Indexed1")));
// CQLKeyspace
schema.add(KSMetaData.testMetadata(ks_cql,
simple,
opts_rf1,
// Column Families
CFMetaData.compile("CREATE TABLE table1 ("
+ "k int PRIMARY KEY,"
+ "v1 text,"
+ "v2 int"
+ ")", ks_cql),
CFMetaData.compile("CREATE TABLE table2 ("
+ "k text,"
+ "c text,"
+ "v text,"
+ "PRIMARY KEY (k, c))", ks_cql),
CFMetaData.compile("CREATE TABLE foo ("
+ "bar text, "
+ "baz text, "
+ "qux text, "
+ "PRIMARY KEY(bar, baz) ) "
+ "WITH COMPACT STORAGE", ks_cql),
CFMetaData.compile("CREATE TABLE foofoo ("
+ "bar text, "
+ "baz text, "
+ "qux text, "
+ "quz text, "
+ "foo text, "
+ "PRIMARY KEY((bar, baz), qux, quz) ) "
+ "WITH COMPACT STORAGE", ks_cql)
));
if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
useCompression(schema);
// if you're messing with low-level sstable stuff, it can be useful to inject the schema directly
// Schema.instance.load(schemaDefinition());
for (KSMetaData ksm : schema)
MigrationManager.announceNewKeyspace(ksm, false);
}
public static void createKeyspace(String keyspaceName,
Class<? extends AbstractReplicationStrategy> strategy,
Map<String, String> options,
CFMetaData... cfmetas) throws ConfigurationException
{
createKeyspace(keyspaceName, true, true, strategy, options, cfmetas);
}
public static void createKeyspace(String keyspaceName,
boolean durable,
boolean announceLocally,
Class<? extends AbstractReplicationStrategy> strategy,
Map<String, String> options,
CFMetaData... cfmetas) throws ConfigurationException
{
KSMetaData ksm = durable ? KSMetaData.testMetadata(keyspaceName, strategy, options, cfmetas)
: KSMetaData.testMetadataNotDurable(keyspaceName, strategy, options, cfmetas);
MigrationManager.announceNewKeyspace(ksm, announceLocally);
}
public static ColumnDefinition integerColumn(String ksName, String cfName)
{
return new ColumnDefinition(ksName,
cfName,
new ColumnIdentifier(IntegerType.instance.fromString("42"), IntegerType.instance),
UTF8Type.instance,
null,
null,
null,
null,
ColumnDefinition.Kind.REGULAR);
}
private static ColumnDefinition utf8Column(String ksName, String cfName)
{
return new ColumnDefinition(ksName,
cfName,
new ColumnIdentifier("fortytwo", true),
UTF8Type.instance,
null,
null,
null,
null,
ColumnDefinition.Kind.REGULAR);
}
public static CFMetaData perRowIndexedCFMD(String ksName, String cfName)
{
final Map<String, String> indexOptions = Collections.singletonMap(
SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
PerRowSecondaryIndexTest.TestIndex.class.getName());
CFMetaData cfm = CFMetaData.sparseCFMetaData(ksName, cfName, AsciiType.instance).keyValidator(AsciiType.instance);
ByteBuffer cName = ByteBufferUtil.bytes("indexed");
return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(cfm, cName, AsciiType.instance, null)
.setIndex("indexe1", IndexType.CUSTOM, indexOptions));
}
private static void useCompression(List<KSMetaData> schema)
{
for (KSMetaData ksm : schema)
{
for (CFMetaData cfm : ksm.cfMetaData().values())
{
cfm.compressionParameters(new CompressionParameters(SnappyCompressor.instance));
}
}
}
public static CFMetaData standardCFMD(String ksName, String cfName)
{
return CFMetaData.denseCFMetaData(ksName, cfName, BytesType.instance).compressionParameters(getCompressionParameters());
}
public static CFMetaData standardCFMD(String ksName, String cfName, AbstractType<?> comparator)
{
return CFMetaData.denseCFMetaData(ksName, cfName, comparator).compressionParameters(getCompressionParameters());
}
public static CFMetaData superCFMD(String ksName, String cfName, AbstractType subcc)
{
return superCFMD(ksName, cfName, BytesType.instance, subcc).compressionParameters(getCompressionParameters());
}
public static CFMetaData superCFMD(String ksName, String cfName, AbstractType cc, AbstractType subcc)
{
return CFMetaData.denseCFMetaData(ksName, cfName, cc, subcc).compressionParameters(getCompressionParameters());
}
public static CFMetaData indexCFMD(String ksName, String cfName, final Boolean withIdxType) throws ConfigurationException
{
CFMetaData cfm = CFMetaData.sparseCFMetaData(ksName, cfName, BytesType.instance).keyValidator(AsciiType.instance);
ByteBuffer cName = ByteBufferUtil.bytes("birthdate");
IndexType keys = withIdxType ? IndexType.KEYS : null;
return cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, cName, LongType.instance, null)
.setIndex(withIdxType ? ByteBufferUtil.bytesToHex(cName) : null, keys, null))
.compressionParameters(getCompressionParameters());
}
public static CFMetaData compositeIndexCFMD(String ksName, String cfName, final Boolean withIdxType) throws ConfigurationException
{
final CompositeType composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{UTF8Type.instance, UTF8Type.instance}));
CFMetaData cfm = CFMetaData.sparseCFMetaData(ksName, cfName, composite);
ByteBuffer cName = ByteBufferUtil.bytes("col1");
IndexType idxType = withIdxType ? IndexType.COMPOSITES : null;
return cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, cName, UTF8Type.instance, 1)
.setIndex(withIdxType ? "col1_idx" : null, idxType, Collections.<String, String>emptyMap()))
.compressionParameters(getCompressionParameters());
}
private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)
{
return CFMetaData.denseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters());
}
public static CFMetaData jdbcSparseCFMD(String ksName, String cfName, AbstractType comp)
{
return CFMetaData.sparseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters());
}
public static CompressionParameters getCompressionParameters()
{
return getCompressionParameters(null);
}
public static CompressionParameters getCompressionParameters(Integer chunkSize)
{
if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
return new CompressionParameters(SnappyCompressor.instance, chunkSize, Collections.<String, String>emptyMap());
else
return new CompressionParameters(null);
}
public static void cleanupAndLeaveDirs() throws IOException
{
// We need to stop and unmap all CLS instances prior to cleanup() or we'll get failures on Windows.
CommitLog.instance.stopUnsafe(true);
mkdirs();
cleanup();
mkdirs();
CommitLog.instance.restartUnsafe();
}
public static void cleanup()
{
// clean up commitlog
String[] directoryNames = { DatabaseDescriptor.getCommitLogLocation(), };
for (String dirName : directoryNames)
{
File dir = new File(dirName);
if (!dir.exists())
throw new RuntimeException("No such directory: " + dir.getAbsolutePath());
// Leave the folder around as Windows will complain about directory deletion w/handles open to children files
String[] children = dir.list();
for (String child : children)
FileUtils.deleteRecursive(new File(dir, child));
}
cleanupSavedCaches();
// clean up data directory which are stored as data directory/keyspace/data files
for (String dirName : DatabaseDescriptor.getAllDataFileLocations())
{
File dir = new File(dirName);
if (!dir.exists())
throw new RuntimeException("No such directory: " + dir.getAbsolutePath());
String[] children = dir.list();
for (String child : children)
FileUtils.deleteRecursive(new File(dir, child));
}
}
public static void mkdirs()
{
DatabaseDescriptor.createAllDirectories();
}
public static void insertData(String keyspace, String columnFamily, int offset, int numberOfRows)
{
for (int i = offset; i < offset + numberOfRows; i++)
{
ByteBuffer key = ByteBufferUtil.bytes("key" + i);
Mutation mutation = new Mutation(keyspace, key);
mutation.add(columnFamily, Util.cellname("col" + i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
mutation.applyUnsafe();
}
}
/* usually used to populate the cache */
public static void readData(String keyspace, String columnFamily, int offset, int numberOfRows)
{
ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
for (int i = offset; i < offset + numberOfRows; i++)
{
DecoratedKey key = Util.dk("key" + i);
store.getColumnFamily(Util.namesQueryFilter(store, key, "col" + i));
}
}
public static void cleanupSavedCaches()
{
File cachesDir = new File(DatabaseDescriptor.getSavedCachesLocation());
if (!cachesDir.exists() || !cachesDir.isDirectory())
return;
FileUtils.delete(cachesDir.listFiles());
}
}