blob: 2bdcaf16c6b635dbe573cc7c1b758de620c83166 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.ActiveCompactions;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
public class UpgradeSSTablesTest extends TestBaseImpl
{
@Test
public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
cluster.get(1).acceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
cfs.disableAutoCompaction();
CompactionManager.instance.setMaximumCompactorThreads(1);
CompactionManager.instance.setCoreCompactorThreads(1);
}).accept(KEYSPACE);
String blob = "blob";
for (int i = 0; i < 6; i++)
blob += blob;
for (int cnt = 0; cnt < 5; cnt++)
{
for (int i = 0; i < 100; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
}
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
}
LogAction logAction = cluster.get(1).logs();
logAction.mark();
Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION);
}).apply(KEYSPACE);
Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
future.get();
Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
}
}
@Test
public void compactionDoesNotCancelUpgradeSSTables() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
cluster.get(1).acceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
cfs.disableAutoCompaction();
CompactionManager.instance.setMaximumCompactorThreads(1);
CompactionManager.instance.setCoreCompactorThreads(1);
}).accept(KEYSPACE);
String blob = "blob";
for (int i = 0; i < 6; i++)
blob += blob;
for (int cnt = 0; cnt < 5; cnt++)
{
for (int i = 0; i < 100; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
}
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
}
LogAction logAction = cluster.get(1).logs();
logAction.mark();
Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
Assert.assertFalse(logAction.watchFor("Compacting").getResult().isEmpty());
cluster.get(1).acceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
FBUtilities.allOf(CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION))
.awaitUninterruptibly(1, TimeUnit.MINUTES);
}).accept(KEYSPACE);
Assert.assertTrue(logAction.grep("Compaction interrupted").getResult().isEmpty());
Assert.assertFalse(logAction.grep("Finished Upgrade sstables").getResult().isEmpty());
Assert.assertFalse(logAction.grep("Compacted (.*) 5 sstables to").getResult().isEmpty());
}
}
@Test
public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
cluster.get(1).acceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
cfs.disableAutoCompaction();
}).accept(KEYSPACE);
String blob = "blob";
for (int i = 0; i < 6; i++)
blob += blob;
for (int i = 0; i < 10000; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
ConsistencyLevel.QUORUM, i, i, blob);
}
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
LogAction logAction = cluster.get(1).logs();
logAction.mark();
// Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction
Thread upgradeThread = new Thread(() -> {
cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
});
upgradeThread.start();
Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
// Start a scrub and make sure that it fails, log check later to make sure it was
// because it cannot cancel the active upgrade sstables
Assert.assertNotEquals(0, cluster.get(1).nodetool("scrub", KEYSPACE, "tbl"));
// Now resume the upgrade sstables so test can shut down
cluster.get(1).runOnInstance(() -> {
BB.start.decrement();
});
upgradeThread.join();
Assert.assertFalse(logAction.grep("Unable to cancel in-progress compactions, since they're running with higher or same priority: Upgrade sstables").getResult().isEmpty());
Assert.assertFalse(logAction.grep("Starting Scrub for ").getResult().isEmpty());
Assert.assertFalse(logAction.grep("Finished Upgrade sstables for distributed_test_keyspace.tbl successfully").getResult().isEmpty());
}
}
@Test
public void truncateWhileUpgrading() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) "));
cluster.get(1).acceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
cfs.disableAutoCompaction();
CompactionManager.instance.setMaximumCompactorThreads(1);
CompactionManager.instance.setCoreCompactorThreads(1);
}).accept(KEYSPACE);
String blob = "blob";
for (int i = 0; i < 10; i++)
blob += blob;
for (int i = 0; i < 500; i++)
{
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
ConsistencyLevel.QUORUM, i, i, blob);
if (i > 0 && i % 100 == 0)
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
}
LogAction logAction = cluster.get(1).logs();
logAction.mark();
Future<?> upgrade = CompletableFuture.runAsync(() -> {
cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
});
cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl"));
upgrade.get();
Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
}
}
@Test
public void rewriteSSTablesTest() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = builder().withNodes(1).withDataDirCount(1).start())
{
for (String compressionBefore : new String[]{ "{'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 32}", "{'enabled': 'false'}" })
{
for (String command : new String[]{ "upgradesstables", "recompress_sstables" })
{
cluster.schemaChange(withKeyspace("DROP KEYSPACE IF EXISTS %s"));
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"));
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) " +
"WITH compression = " + compressionBefore));
cluster.get(1).acceptsOnInstance((String ks) -> {
Keyspace.open(ks).getColumnFamilyStore("tbl").disableAutoCompaction();
}).accept(KEYSPACE);
String blob = "blob";
for (int i = 0; i < 6; i++)
blob += blob;
for (int i = 0; i < 100; i++)
{
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
ConsistencyLevel.QUORUM, i, i, blob);
}
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH compression = {'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 128};"));
Thread.sleep(2000); // Make sure timestamp will be different even with 1-second resolution.
long maxSoFar = cluster.get(1).appliesOnInstance((String ks) -> {
long maxTs = -1;
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
cfs.disableAutoCompaction();
for (SSTableReader tbl : cfs.getLiveSSTables())
{
maxTs = Math.max(maxTs, tbl.getDataCreationTime());
}
return maxTs;
}).apply(KEYSPACE);
for (int i = 100; i < 200; i++)
{
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
ConsistencyLevel.QUORUM, i, i, blob);
}
cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
LogAction logAction = cluster.get(1).logs();
logAction.mark();
long expectedCount = cluster.get(1).appliesOnInstance((String ks, Long maxTs) -> {
long count = 0;
long skipped = 0;
Set<SSTableReader> liveSSTables = Keyspace.open(ks).getColumnFamilyStore("tbl").getLiveSSTables();
assert liveSSTables.size() == 2 : String.format("Expected 2 sstables, but got " + liveSSTables.size());
for (SSTableReader tbl : liveSSTables)
{
if (tbl.getDataCreationTime() <= maxTs)
count++;
else
skipped++;
}
assert skipped > 0;
return count;
}).apply(KEYSPACE, maxSoFar);
if (command.equals("upgradesstables"))
Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", "-t", Long.toString(maxSoFar), KEYSPACE, "tbl"));
else
Assert.assertEquals(0, cluster.get(1).nodetool("recompress_sstables", KEYSPACE, "tbl"));
Assert.assertFalse(logAction.grep(String.format("%d sstables to", expectedCount)).getResult().isEmpty());
}
}
}
}
public static class BB
{
// Will be initialized in the context of the instance class loader
static CountDownLatch starting = newCountDownLatch(1);
static CountDownLatch start = newCountDownLatch(1);
public static void install(ClassLoader classLoader, Integer num)
{
new ByteBuddy().rebase(ActiveCompactions.class)
.method(named("beginCompaction"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
@SuppressWarnings("unused")
public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall)
{
try
{
zuperCall.call();
if (ci.getCompactionInfo().getTaskType() == OperationType.UPGRADE_SSTABLES)
{
starting.decrement();
Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES));
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}