blob: bcf75827b08d8b910e02adb389cef92e5b1092ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.repair;
import java.util.Iterator;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.repair.state.ValidationState;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
import static java.util.Collections.singleton;
import static org.apache.cassandra.db.repair.CassandraValidationIterator.getSSTablesToValidate;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
/**
* Tests correct sstables are returned from CompactionManager.getSSTablesForValidation
* for consistent, legacy incremental, and full repairs
*/
public class CompactionManagerGetSSTablesForValidationTest
{
private String ks;
private static final String tbl = "tbl";
private ColumnFamilyStore cfs;
private static InetAddressAndPort coordinator;
private static Token MT;
private SSTableReader repaired;
private SSTableReader unrepaired;
private SSTableReader pendingRepair;
private TimeUUID sessionID;
private RepairJobDesc desc;
@BeforeClass
public static void setupClass() throws Exception
{
SchemaLoader.prepareServer();
coordinator = InetAddressAndPort.getByName("10.0.0.1");
MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
}
@Before
public void setup() throws Exception
{
ks = "ks_" + System.currentTimeMillis();
TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
}
private void makeSSTables()
{
for (int i=0; i<3; i++)
{
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i);
Util.flush(cfs);
}
Assert.assertEquals(3, cfs.getLiveSSTables().size());
}
private void registerRepair(boolean incremental) throws Exception
{
sessionID = nextTimeUUID();
Range<Token> range = new Range<>(MT, MT);
ActiveRepairService.instance.registerParentRepairSession(sessionID,
coordinator,
Lists.newArrayList(cfs),
Sets.newHashSet(range),
incremental,
incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
true,
PreviewKind.NONE);
desc = new RepairJobDesc(sessionID, nextTimeUUID(), ks, tbl, singleton(range));
}
private void modifySSTables() throws Exception
{
Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
repaired = iter.next();
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, System.currentTimeMillis(), null, false);
repaired.reloadSSTableMetadata();
pendingRepair = iter.next();
pendingRepair.descriptor.getMetadataSerializer().mutateRepairMetadata(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, false);
pendingRepair.reloadSSTableMetadata();
unrepaired = iter.next();
Assert.assertFalse(iter.hasNext());
}
@Test
public void consistentRepair() throws Exception
{
makeSSTables();
registerRepair(true);
modifySSTables();
// get sstables for repair
Validator validator = new Validator(new ValidationState(desc, coordinator), FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental));
Assert.assertNotNull(sstables);
Assert.assertEquals(1, sstables.size());
Assert.assertTrue(sstables.contains(pendingRepair));
}
@Test
public void legacyIncrementalRepair() throws Exception
{
makeSSTables();
registerRepair(true);
modifySSTables();
// get sstables for repair
Validator validator = new Validator(new ValidationState(desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental));
Assert.assertNotNull(sstables);
Assert.assertEquals(2, sstables.size());
Assert.assertTrue(sstables.contains(pendingRepair));
Assert.assertTrue(sstables.contains(unrepaired));
}
@Test
public void fullRepair() throws Exception
{
makeSSTables();
registerRepair(false);
modifySSTables();
// get sstables for repair
Validator validator = new Validator(new ValidationState(desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental));
Assert.assertNotNull(sstables);
Assert.assertEquals(3, sstables.size());
Assert.assertTrue(sstables.contains(pendingRepair));
Assert.assertTrue(sstables.contains(unrepaired));
Assert.assertTrue(sstables.contains(repaired));
}
}