blob: 2a6da1ac692a11d03c504bf3e8fe6dbbbc162cbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.virtual;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairRunnable;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.state.Completable;
import org.apache.cassandra.repair.state.CoordinatorState;
import org.apache.cassandra.repair.state.JobState;
import org.apache.cassandra.repair.state.ParticipateState;
import org.apache.cassandra.repair.state.SessionState;
import org.apache.cassandra.repair.state.State;
import org.apache.cassandra.repair.state.ValidationState;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
public class LocalRepairTablesTest extends CQLTester
{
private static final String KS_NAME = "vts";
private static final ImmutableSet<InetAddressAndPort> ADDRESSES = ImmutableSet.of(address(127, 0, 0, 1));
private static final List<String> ADDRESSES_STR = ADDRESSES.stream().map(Object::toString).collect(Collectors.toList());
private static final CommonRange COMMON_RANGE = new CommonRange(ADDRESSES, Collections.emptySet(), Collections.singleton(range(0, 100)));
private static final String REPAIR_KS = "system";
private static final String REPAIR_TABLE = "peers";
@BeforeClass
public static void before()
{
CQLTester.setUpClass();
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, LocalRepairTables.getAll(KS_NAME)));
}
@Before
public void cleanupRepairs()
{
ActiveRepairService.instance.clearLocalRepairState();
}
@Test
public void repairs() throws Throwable
{
assertEmpty("repairs");
CoordinatorState state = coordinator();
assertInit("repairs", state);
state.phase.setup();
assertState("repairs", state, CoordinatorState.State.SETUP);
List<ColumnFamilyStore> tables = Collections.singletonList(table());
RepairRunnable.NeighborsAndRanges neighbors = neighbors();
state.phase.start(tables, neighbors);
assertState("repairs", state, CoordinatorState.State.START);
List<List<String>> expectedRanges = neighbors.commonRanges.stream().map(a -> a.ranges.stream().map(Object::toString).collect(Collectors.toList())).collect(Collectors.toList());
assertRowsIgnoringOrder(execute(t("SELECT id, completed, participants, table_names, ranges, unfiltered_ranges, participants FROM %s.repairs")),
row(state.id, false, ADDRESSES_STR, tables.stream().map(a -> a.name).collect(Collectors.toList()), expectedRanges, expectedRanges, neighbors.participants.stream().map(Object::toString).collect(Collectors.toList())));
state.phase.prepareStart();
assertState("repairs", state, CoordinatorState.State.PREPARE_START);
state.phase.prepareComplete();
assertState("repairs", state, CoordinatorState.State.PREPARE_COMPLETE);
state.phase.repairSubmitted();
assertState("repairs", state, CoordinatorState.State.REPAIR_START);
state.phase.repairCompleted();
assertState("repairs", state, CoordinatorState.State.REPAIR_COMPLETE);
state.phase.success("testing");
assertSuccess("repairs", state);
// make sure serialization works
execute(t("SELECT * FROM %s.repairs"));
}
@Test
public void sessions() throws Throwable
{
assertEmpty("repair_sessions");
SessionState state = session();
assertInit("repair_sessions", state);
state.phase.start();
assertState("repair_sessions", state, SessionState.State.START);
state.phase.jobsSubmitted();
assertState("repair_sessions", state, SessionState.State.JOBS_START);
state.phase.success("testing");
assertSuccess("repair_sessions", state);
assertRowsIgnoringOrder(execute(t("SELECT participants FROM %s.repair_sessions WHERE id=?"), state.id),
row(ADDRESSES_STR));
// make sure serialization works
execute(t("SELECT * FROM %s.repair_sessions"));
}
@Test
public void jobs() throws Throwable
{
assertEmpty("repair_jobs");
JobState state = job();
assertInit("repair_jobs", state);
state.phase.start();
assertState("repair_jobs", state, JobState.State.START);
state.phase.snapshotsSubmitted();
assertState("repair_jobs", state, JobState.State.SNAPSHOT_START);
state.phase.snapshotsCompleted();
assertState("repair_jobs", state, JobState.State.SNAPSHOT_COMPLETE);
state.phase.validationSubmitted();
assertState("repair_jobs", state, JobState.State.VALIDATION_START);
state.phase.validationCompleted();
assertState("repair_jobs", state, JobState.State.VALIDATION_COMPLETE);
state.phase.streamSubmitted();
assertState("repair_jobs", state, JobState.State.STREAM_START);
state.phase.success("testing");
assertSuccess("repair_jobs", state);
assertRowsIgnoringOrder(execute(t("SELECT participants FROM %s.repair_jobs WHERE id=?"), state.id),
row(ADDRESSES_STR));
// make sure serialization works
execute(t("SELECT * FROM %s.repair_jobs"));
}
@Test
public void participates() throws Throwable
{
assertEmpty("repair_participates");
ParticipateState state = participate();
assertInit("repair_participates", state);
state.phase.success("testing");
assertRowsIgnoringOrder(execute(t("SELECT id, initiator, ranges, failure_cause, success_message, state_init_timestamp, state_success_timestamp, state_failure_timestamp FROM %s.repair_participates WHERE id = ?"), state.id),
row(state.getId(), FBUtilities.getBroadcastAddressAndPort().toString(), Arrays.asList("(0,42]"), null, "testing", new Date(state.getInitializedAtMillis()), new Date(state.getLastUpdatedAtMillis()), null));
state = participate();
assertInit("repair_participates", state);
state.phase.fail("testing");
assertRowsIgnoringOrder(execute(t("SELECT id, completed, initiator, ranges, failure_cause, success_message, state_init_timestamp, state_success_timestamp, state_failure_timestamp FROM %s.repair_participates WHERE id = ?"), state.id),
row(state.getId(), true, FBUtilities.getBroadcastAddressAndPort().toString(), Arrays.asList("(0,42]"), "testing", null, new Date(state.getInitializedAtMillis()), null, new Date(state.getLastUpdatedAtMillis())));
// make sure serialization works
execute(t("SELECT * FROM %s.repair_participates"));
}
@Test
public void validations() throws Throwable
{
assertEmpty("repair_validations");
ValidationState state = validation();
assertInit("repair_validations", state);
// progress is defined by estimated partitions and how many partitions have been processed; disable checking in shared functions
state.phase.start(100, 100);
assertState("repair_validations", state, ValidationState.State.START);
for (int i = 0; i < 10; i++)
{
state.partitionsProcessed += 10;
state.bytesRead += 10;
state.updated();
// min 99% is because >= 100 gets lowered to 99% and the last 1% is when validation is actualy complete
assertRowsIgnoringOrder(execute(t("SELECT id, initiator, status, progress_percentage, estimated_partitions, estimated_total_bytes, partitions_processed, bytes_read, failure_cause, success_message FROM %s.repair_validations")),
row(state.getId(), FBUtilities.getBroadcastAddressAndPort().toString(), "start", Math.min(99.0F, (float) state.partitionsProcessed), 100L, 100L, state.partitionsProcessed, state.bytesRead, null, null));
}
state.phase.sendingTrees();
assertState("repair_validations", state, ValidationState.State.SENDING_TREES);
state.phase.success("testing");
assertSuccess("repair_validations", state);
// make sure serialization works
execute(t("SELECT * FROM %s.repair_validations"));
}
private void assertEmpty(String table) throws Throwable
{
assertRowCount(execute(t("SELECT * FROM %s." + table)), 0);
}
private void assertInit(String table, Completable<?> state) throws Throwable
{
assertRowsIgnoringOrder(execute(t("SELECT id, state_init_timestamp, failure_cause, success_message FROM %s." + table + " WHERE id = ?"), state.getId()),
row(state.getId(), new Date(state.getInitializedAtMillis()), null, null));
}
private void assertInit(String table, State<?, ?> state) throws Throwable
{
assertRowsIgnoringOrder(execute(t("SELECT id, status, state_init_timestamp, failure_cause, success_message FROM %s." + table + " WHERE id = ?"), state.getId()),
row(state.getId(), "init", new Date(state.getInitializedAtMillis()), null, null));
}
private <T extends Enum<T>> void assertState(String table, State<?, ?> state, T expectedState) throws Throwable
{
assertRowsIgnoringOrder(execute(t("SELECT id, completed, status, failure_cause, success_message FROM %s." + table + " WHERE id = ?"), state.getId()),
row(state.getId(), false, expectedState.name().toLowerCase(), null, null));
}
private void assertSuccess(String table, State<?, ?> state) throws Throwable
{
assertRowsIgnoringOrder(execute(t("SELECT id, completed, status, failure_cause, success_message FROM %s." + table + " WHERE id = ?"), state.getId()),
row(state.getId(), true, "success", null, "testing"));
}
private static ColumnFamilyStore table()
{
return table(REPAIR_KS, REPAIR_TABLE);
}
private static ColumnFamilyStore table(String ks, String name)
{
return Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ks, name).id);
}
private static RepairRunnable.NeighborsAndRanges neighbors()
{
return new RepairRunnable.NeighborsAndRanges(false, ADDRESSES, ImmutableList.of(COMMON_RANGE));
}
private static Range<Token> range(long a, long b)
{
return new Range<>(new Murmur3Partitioner.LongToken(a), new Murmur3Partitioner.LongToken(b));
}
private static InetAddressAndPort address(int a, int b, int c, int d)
{
try
{
return InetAddressAndPort.getByAddress(new byte[] {(byte) a, (byte) b, (byte) c, (byte) d});
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
private static CoordinatorState coordinator()
{
RepairOption options = RepairOption.parse(Collections.emptyMap(), DatabaseDescriptor.getPartitioner());
CoordinatorState state = new CoordinatorState(0, "test", options);
ActiveRepairService.instance.register(state);
return state;
}
private static SessionState session()
{
CoordinatorState parent = coordinator();
SessionState state = new SessionState(parent.id, REPAIR_KS, new String[]{ REPAIR_TABLE }, COMMON_RANGE);
parent.register(state);
return state;
}
private static JobState job()
{
SessionState session = session();
JobState state = new JobState(new RepairJobDesc(session.parentRepairSession, session.id, session.keyspace, session.cfnames[0], session.commonRange.ranges), session.commonRange.endpoints);
session.register(state);
return state;
}
private ValidationState validation()
{
JobState job = job(); // job isn't needed but makes getting the descriptor easier
ParticipateState participate = participate();
ValidationState state = new ValidationState(job.desc, ADDRESSES.stream().findFirst().get());
participate.register(state);
return state;
}
private ParticipateState participate()
{
List<Range<Token>> ranges = Arrays.asList(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(42)));
ParticipateState state = new ParticipateState(FBUtilities.getBroadcastAddressAndPort(), new PrepareMessage(TimeUUID.Generator.nextTimeUUID(), Collections.emptyList(), ranges, true, 42, true, PreviewKind.ALL));
ActiveRepairService.instance.register(state);
return state;
}
private static String t(String string)
{
return String.format(string, KS_NAME);
}
}