blob: 8e2f86622e388c37597d92aba0e3242199c1c4b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Test;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
import static org.apache.cassandra.Util.getBackups;
import static org.apache.cassandra.distributed.Cluster.build;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TableLevelIncrementalBackupsTest extends TestBaseImpl
{
@Test
public void testIncrementalBackupEnabledCreateTable() throws Exception
{
try (Cluster c = getCluster(true))
{
c.schemaChange(withKeyspace("CREATE TABLE %s.test_table (a text primary key, b int) WITH incremental_backups = true"));
disableCompaction(c, KEYSPACE, "test_table");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, true, KEYSPACE, "test_table");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 2, true, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("DROP TABLE %s.test_table"));
c.schemaChange(withKeyspace("CREATE TABLE %s.test_table2 (a text primary key, b int) WITH incremental_backups = false"));
disableCompaction(c, KEYSPACE, "test_table2");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table2 (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 0, false, KEYSPACE, "test_table2");
c.schemaChange(withKeyspace("DROP TABLE %s.test_table2"));
}
}
@Test
public void testIncrementalBackupEnabledAlterTable() throws Exception
{
try (Cluster c = getCluster(true))
{
c.schemaChange(withKeyspace("CREATE TABLE %s.test_table (a text primary key, b int) WITH incremental_backups = false"));
disableCompaction(c, KEYSPACE, "test_table");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 0, false, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("ALTER TABLE %s.test_table WITH incremental_backups = true"));
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, true, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("DROP TABLE %s.test_table"));
c.schemaChange(withKeyspace("CREATE TABLE %s.test_table2 (a text primary key, b int) WITH incremental_backups = true"));
disableCompaction(c, KEYSPACE, "test_table2");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table2 (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, true, KEYSPACE, "test_table2");
c.schemaChange(withKeyspace("ALTER TABLE %s.test_table2 WITH incremental_backups = false"));
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table2 (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, false, KEYSPACE, "test_table2");
c.schemaChange(withKeyspace("DROP TABLE %s.test_table2"));
}
}
@Test
public void testIncrementalBackupWhenCreateTableByDefault() throws Exception
{
try (Cluster c = getCluster(true))
{
//incremental_backups is set to true by default
c.schemaChange(withKeyspace("CREATE TABLE %s.test_table (a text primary key, b int)"));
disableCompaction(c, KEYSPACE, "test_table");
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, true, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("ALTER TABLE %s.test_table WITH incremental_backups = false"));
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 1, false, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("ALTER TABLE %s.test_table WITH incremental_backups = true"));
c.coordinator(1).execute(withKeyspace("INSERT INTO %s.test_table (a, b) VALUES ('a', 1)"), ALL);
flush(c, KEYSPACE);
assertBackupSSTablesCount(c, 2, true, KEYSPACE, "test_table");
c.schemaChange(withKeyspace("DROP TABLE %s.test_table"));
}
}
private Cluster getCluster(boolean incrementalBackups) throws IOException
{
return init(build(2).withDataDirCount(1).withConfig(c -> c.with(Feature.GOSSIP)
.set("incremental_backups", incrementalBackups)).start());
}
private void flush(Cluster cluster, String keyspace)
{
for (int i = 1; i < cluster.size() + 1; i++)
cluster.get(i).flush(keyspace);
}
private void disableCompaction(Cluster cluster, String keyspace, String table)
{
for (int i = 1; i < cluster.size() + 1; i++)
cluster.get(i).nodetool("disableautocompaction", keyspace, table);
}
private static void assertBackupSSTablesCount(Cluster cluster, int expectedSeqGenIds, boolean enable, String ks, String... tableNames)
{
for (int i = 1; i < cluster.size() + 1; i++)
{
cluster.get(i).runOnInstance(rethrow(() -> Arrays.stream(tableNames).forEach(tableName -> assertTableMetaIncrementalBackupEnable(ks, tableName, enable))));
cluster.get(i).runOnInstance(rethrow(() -> Arrays.stream(tableNames).forEach(tableName -> assertSSTablesCount(getBackups(ks, tableName), tableName, expectedSeqGenIds))));
}
}
private static void assertSSTablesCount(Set<Descriptor> descs, String tableName, int expectedSeqGenIds)
{
List<String> seqSSTables = descs.stream()
.filter(desc -> desc.id instanceof SequenceBasedSSTableId)
.map(descriptor -> descriptor.baseFile().toString())
.sorted()
.collect(Collectors.toList());
assertThat(seqSSTables).describedAs("SSTables of %s with sequence based id", tableName).hasSize(expectedSeqGenIds);
}
private static void assertTableMetaIncrementalBackupEnable(String ks, String tableName, boolean enable)
{
ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(ks, tableName);
if (enable)
assertTrue(columnFamilyStore.isTableIncrementalBackupsEnabled());
else
assertFalse(columnFamilyStore.isTableIncrementalBackupsEnabled());
}
}