| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.repair.consistent; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.BooleanSupplier; |
| import java.util.function.Consumer; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.cql3.QueryProcessor; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; |
| import org.apache.cassandra.locator.RangesAtEndpoint; |
| import org.apache.cassandra.net.Message; |
| import org.apache.cassandra.repair.AbstractRepairTest; |
| import org.apache.cassandra.locator.InetAddressAndPort; |
| import org.apache.cassandra.repair.KeyspaceRepairManager; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.SchemaConstants; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.SystemKeyspace; |
| import org.apache.cassandra.repair.messages.FailSession; |
| import org.apache.cassandra.repair.messages.FinalizeCommit; |
| import org.apache.cassandra.repair.messages.FinalizePromise; |
| import org.apache.cassandra.repair.messages.FinalizePropose; |
| import org.apache.cassandra.repair.messages.PrepareConsistentRequest; |
| import org.apache.cassandra.repair.messages.PrepareConsistentResponse; |
| import org.apache.cassandra.repair.messages.RepairMessage; |
| import org.apache.cassandra.repair.messages.StatusRequest; |
| import org.apache.cassandra.repair.messages.StatusResponse; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.UUIDGen; |
| |
| import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; |
| import static org.psjava.util.AssertStatus.assertTrue; |
| |
| public class LocalSessionTest extends AbstractRepairTest |
| { |
| private static final UUID TID1 = UUIDGen.getTimeUUID(); |
| private static final UUID TID2 = UUIDGen.getTimeUUID(); |
| |
| static LocalSession.Builder createBuilder() |
| { |
| LocalSession.Builder builder = LocalSession.builder(); |
| builder.withState(PREPARING); |
| builder.withSessionID(UUIDGen.getTimeUUID()); |
| builder.withCoordinator(COORDINATOR); |
| builder.withUUIDTableIds(Sets.newHashSet(TID1, TID2)); |
| builder.withRepairedAt(System.currentTimeMillis()); |
| builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3)); |
| builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3)); |
| |
| int now = FBUtilities.nowInSeconds(); |
| builder.withStartedAt(now); |
| builder.withLastUpdate(now); |
| |
| return builder; |
| } |
| |
| static LocalSession createSession() |
| { |
| return createBuilder().build(); |
| } |
| |
| private static void assertValidationFailure(Consumer<LocalSession.Builder> consumer) |
| { |
| try |
| { |
| LocalSession.Builder builder = createBuilder(); |
| consumer.accept(builder); |
| builder.build(); |
| Assert.fail("Expected assertion error"); |
| } |
| catch (IllegalArgumentException e) |
| { |
| // expected |
| } |
| } |
| |
| private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to) |
| { |
| Assert.assertNull(sessions.sentMessages.get(to)); |
| } |
| |
| private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to, RepairMessage... expected) |
| { |
| Assert.assertEquals(Lists.newArrayList(expected), sessions.sentMessages.get(to)); |
| } |
| |
| static class InstrumentedLocalSessions extends LocalSessions |
| { |
| Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>(); |
| |
| protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message) |
| { |
| if (!sentMessages.containsKey(destination)) |
| { |
| sentMessages.put(destination, new ArrayList<>()); |
| } |
| sentMessages.get(destination).add(message.payload); |
| } |
| |
| SettableFuture<Object> prepareSessionFuture = null; |
| boolean prepareSessionCalled = false; |
| |
| @Override |
| ListenableFuture prepareSession(KeyspaceRepairManager repairManager, |
| UUID sessionID, |
| Collection<ColumnFamilyStore> tables, |
| RangesAtEndpoint ranges, |
| ExecutorService executor, |
| BooleanSupplier isCancelled) |
| { |
| prepareSessionCalled = true; |
| if (prepareSessionFuture != null) |
| { |
| return prepareSessionFuture; |
| } |
| else |
| { |
| return super.prepareSession(repairManager, sessionID, tables, ranges, executor, isCancelled); |
| } |
| } |
| |
| boolean failSessionCalled = false; |
| public void failSession(UUID sessionID, boolean sendMessage) |
| { |
| failSessionCalled = true; |
| super.failSession(sessionID, sendMessage); |
| } |
| |
| public LocalSession prepareForTest(UUID sessionID) |
| { |
| prepareSessionFuture = SettableFuture.create(); |
| handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| prepareSessionFuture.set(new Object()); |
| sentMessages.clear(); |
| return getSession(sessionID); |
| } |
| |
| @Override |
| protected InetAddressAndPort getBroadcastAddressAndPort() |
| { |
| return PARTICIPANT1; |
| } |
| |
| protected boolean isAlive(InetAddressAndPort address) |
| { |
| return true; |
| } |
| |
| protected boolean isNodeInitialized() |
| { |
| return true; |
| } |
| |
| public Map<UUID, Integer> completedSessions = new HashMap<>(); |
| |
| protected void sessionCompleted(LocalSession session) |
| { |
| UUID sessionID = session.sessionID; |
| int calls = completedSessions.getOrDefault(sessionID, 0); |
| completedSessions.put(sessionID, calls + 1); |
| } |
| |
| boolean sessionHasData = false; |
| protected boolean sessionHasData(LocalSession session) |
| { |
| return sessionHasData; |
| } |
| } |
| |
| private static TableMetadata cfm; |
| private static ColumnFamilyStore cfs; |
| |
| @BeforeClass |
| public static void setupClass() |
| { |
| SchemaLoader.prepareServer(); |
| cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "localsessiontest").build(); |
| SchemaLoader.createKeyspace("localsessiontest", KeyspaceParams.simple(1), cfm); |
| cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); |
| } |
| |
| @Before |
| public void setup() |
| { |
| // clear out any data from previous test runs |
| ColumnFamilyStore repairCfs = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.REPAIRS); |
| repairCfs.truncateBlocking(); |
| } |
| |
| private static UUID registerSession() |
| { |
| return registerSession(cfs, true, true); |
| } |
| |
| @Test |
| public void validation() |
| { |
| assertValidationFailure(b -> b.withState(null)); |
| assertValidationFailure(b -> b.withSessionID(null)); |
| assertValidationFailure(b -> b.withCoordinator(null)); |
| assertValidationFailure(b -> b.withTableIds(null)); |
| assertValidationFailure(b -> b.withTableIds(new HashSet<>())); |
| assertValidationFailure(b -> b.withRepairedAt(-1)); |
| assertValidationFailure(b -> b.withRanges(null)); |
| assertValidationFailure(b -> b.withRanges(new HashSet<>())); |
| assertValidationFailure(b -> b.withParticipants(null)); |
| assertValidationFailure(b -> b.withParticipants(new HashSet<>())); |
| assertValidationFailure(b -> b.withStartedAt(0)); |
| assertValidationFailure(b -> b.withLastUpdate(0)); |
| } |
| |
| /** |
| * Test that sessions are loaded and saved properly |
| */ |
| @Test |
| public void persistence() |
| { |
| LocalSessions sessions = new LocalSessions(); |
| LocalSession expected = createSession(); |
| sessions.save(expected); |
| LocalSession actual = sessions.loadUnsafe(expected.sessionID); |
| Assert.assertEquals(expected, actual); |
| } |
| |
| @Test |
| public void prepareSuccessCase() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| // replacing future so we can inspect state before and after anti compaction callback |
| sessions.prepareSessionFuture = SettableFuture.create(); |
| Assert.assertFalse(sessions.prepareSessionCalled); |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| Assert.assertTrue(sessions.prepareSessionCalled); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| |
| // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING |
| LocalSession session = sessions.getSession(sessionID); |
| Assert.assertNotNull(session); |
| Assert.assertEquals(PREPARING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| |
| // anti compaction has now finished, so state in memory and on disk should be PREPARED |
| sessions.prepareSessionFuture.set(new Object()); |
| session = sessions.getSession(sessionID); |
| Assert.assertNotNull(session); |
| Assert.assertEquals(PREPARED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| |
| // ...and we should have sent a success message back to the coordinator |
| assertMessagesSent(sessions, COORDINATOR, new PrepareConsistentResponse(sessionID, PARTICIPANT1, true)); |
| } |
| |
| /** |
| * If anti compactionn fails, we should fail the session locally, |
| * and send a failure message back to the coordinator |
| */ |
| @Test |
| public void prepareAntiCompactFailure() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| // replacing future so we can inspect state before and after anti compaction callback |
| sessions.prepareSessionFuture = SettableFuture.create(); |
| Assert.assertFalse(sessions.prepareSessionCalled); |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| Assert.assertTrue(sessions.prepareSessionCalled); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| |
| // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING |
| LocalSession session = sessions.getSession(sessionID); |
| Assert.assertNotNull(session); |
| Assert.assertEquals(PREPARING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| |
| // anti compaction has now finished, so state in memory and on disk should be PREPARED |
| sessions.prepareSessionFuture.setException(new RuntimeException()); |
| session = sessions.getSession(sessionID); |
| Assert.assertNotNull(session); |
| Assert.assertEquals(FAILED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| |
| // ...and we should have sent a success message back to the coordinator |
| assertMessagesSent(sessions, COORDINATOR, new PrepareConsistentResponse(sessionID, PARTICIPANT1, false)); |
| |
| } |
| |
| /** |
| * If a ParentRepairSession wasn't previously created, we shouldn't |
| * create a session locally, but we should send a failure message to |
| * the coordinator. |
| */ |
| @Test |
| public void prepareWithNonExistantParentSession() |
| { |
| UUID sessionID = UUIDGen.getTimeUUID(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| Assert.assertNull(sessions.getSession(sessionID)); |
| assertMessagesSent(sessions, COORDINATOR, new PrepareConsistentResponse(sessionID, PARTICIPANT1, false)); |
| } |
| |
| /** |
| * If the session is cancelled mid-prepare, the isCancelled boolean supplier should start returning true |
| */ |
| @Test |
| public void prepareCancellation() |
| { |
| UUID sessionID = registerSession(); |
| AtomicReference<BooleanSupplier> isCancelledRef = new AtomicReference<>(); |
| SettableFuture future = SettableFuture.create(); |
| |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions() { |
| ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, RangesAtEndpoint ranges, ExecutorService executor, BooleanSupplier isCancelled) |
| { |
| isCancelledRef.set(isCancelled); |
| return future; |
| } |
| }; |
| sessions.start(); |
| |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| |
| BooleanSupplier isCancelled = isCancelledRef.get(); |
| Assert.assertNotNull(isCancelled); |
| Assert.assertFalse(isCancelled.getAsBoolean()); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| |
| sessions.failSession(sessionID, false); |
| Assert.assertTrue(isCancelled.getAsBoolean()); |
| |
| // now that the session has failed, it send a negative response to the coordinator (even if the anti-compaction completed successfully) |
| future.set(new Object()); |
| assertMessagesSent(sessions, COORDINATOR, new PrepareConsistentResponse(sessionID, PARTICIPANT1, false)); |
| } |
| |
| @Test |
| public void maybeSetRepairing() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| |
| sessions.sentMessages.clear(); |
| sessions.maybeSetRepairing(sessionID); |
| Assert.assertEquals(REPAIRING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| } |
| |
| /** |
| * Multiple calls to maybeSetRepairing shouldn't cause any problems |
| */ |
| @Test |
| public void maybeSetRepairingDuplicates() |
| { |
| |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| |
| // initial set |
| sessions.sentMessages.clear(); |
| sessions.maybeSetRepairing(sessionID); |
| Assert.assertEquals(REPAIRING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| |
| // repeated call 1 |
| sessions.maybeSetRepairing(sessionID); |
| Assert.assertEquals(REPAIRING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| |
| // repeated call 2 |
| sessions.maybeSetRepairing(sessionID); |
| Assert.assertEquals(REPAIRING, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| } |
| |
| /** |
| * We shouldn't fail if we don't have a session for the given session id |
| */ |
| @Test |
| public void maybeSetRepairingNonExistantSession() |
| { |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| UUID fakeID = UUIDGen.getTimeUUID(); |
| sessions.maybeSetRepairing(fakeID); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| } |
| |
| /** |
| * In the success case, session state should be set to FINALIZE_PROMISED and |
| * persisted, and a FinalizePromise message should be sent back to the coordinator |
| */ |
| @Test |
| public void finalizeProposeSuccessCase() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| // create session and move to preparing |
| LocalSession session = sessions.prepareForTest(sessionID); |
| sessions.maybeSetRepairing(sessionID); |
| |
| // |
| Assert.assertEquals(REPAIRING, session.getState()); |
| |
| // should send a promised message to coordinator and set session state accordingly |
| sessions.sentMessages.clear(); |
| sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); |
| Assert.assertEquals(FINALIZE_PROMISED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| assertMessagesSent(sessions, COORDINATOR, new FinalizePromise(sessionID, PARTICIPANT1, true)); |
| } |
| |
| /** |
| * Trying to propose finalization when the session isn't in the repaired |
| * state should fail the session and send a failure message to the proposer |
| */ |
| @Test |
| public void finalizeProposeInvalidStateFailure() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| |
| // should fail the session and send a failure message to the coordinator |
| sessions.sentMessages.clear(); |
| sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); |
| Assert.assertEquals(FAILED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); |
| } |
| |
| @Test |
| public void finalizeProposeNonExistantSessionFailure() |
| { |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| UUID fakeID = UUIDGen.getTimeUUID(); |
| sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(fakeID)); |
| Assert.assertNull(sessions.getSession(fakeID)); |
| assertMessagesSent(sessions, COORDINATOR, new FailSession(fakeID)); |
| } |
| |
| /** |
| * Session state should be set to finalized, sstables should be promoted |
| * to repaired. No messages should be sent to the coordinator |
| */ |
| @Test |
| public void finalizeCommitSuccessCase() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| // create session and move to finalized promised |
| sessions.prepareForTest(sessionID); |
| sessions.maybeSetRepairing(sessionID); |
| sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); |
| |
| Assert.assertEquals(0, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); |
| sessions.sentMessages.clear(); |
| LocalSession session = sessions.getSession(sessionID); |
| sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(sessionID)); |
| |
| Assert.assertEquals(FINALIZED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| Assert.assertEquals(1, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); |
| } |
| |
| @Test |
| public void finalizeCommitNonExistantSession() |
| { |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| UUID fakeID = UUIDGen.getTimeUUID(); |
| sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(fakeID)); |
| Assert.assertNull(sessions.getSession(fakeID)); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| } |
| |
| @Test |
| public void failSession() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| sessions.sentMessages.clear(); |
| |
| // fail session |
| Assert.assertEquals(0, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); |
| sessions.failSession(sessionID); |
| Assert.assertEquals(FAILED, session.getState()); |
| assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); |
| Assert.assertEquals(1, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); |
| } |
| |
| /** |
| * Session should be failed, but no messages should be sent |
| */ |
| @Test |
| public void handleFailMessage() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| sessions.sentMessages.clear(); |
| |
| sessions.handleFailSessionMessage(PARTICIPANT1, new FailSession(sessionID)); |
| Assert.assertEquals(FAILED, session.getState()); |
| Assert.assertTrue(sessions.sentMessages.isEmpty()); |
| } |
| |
| @Test |
| public void sendStatusRequest() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| |
| sessions.sentMessages.clear(); |
| sessions.sendStatusRequest(session); |
| |
| assertNoMessagesSent(sessions, PARTICIPANT1); |
| StatusRequest expected = new StatusRequest(sessionID); |
| assertMessagesSent(sessions, PARTICIPANT2, expected); |
| assertMessagesSent(sessions, PARTICIPANT3, expected); |
| } |
| |
| @Test |
| public void handleStatusRequest() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| Assert.assertEquals(PREPARED, session.getState()); |
| |
| sessions.sentMessages.clear(); |
| sessions.handleStatusRequest(PARTICIPANT2, new StatusRequest(sessionID)); |
| assertNoMessagesSent(sessions, PARTICIPANT1); |
| assertMessagesSent(sessions, PARTICIPANT2, new StatusResponse(sessionID, PREPARED)); |
| assertNoMessagesSent(sessions, PARTICIPANT3); |
| } |
| |
| @Test |
| public void handleStatusRequestNoSession() throws Exception |
| { |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| sessions.sentMessages.clear(); |
| UUID sessionID = UUIDGen.getTimeUUID(); |
| sessions.handleStatusRequest(PARTICIPANT2, new StatusRequest(sessionID)); |
| assertNoMessagesSent(sessions, PARTICIPANT1); |
| assertMessagesSent(sessions, PARTICIPANT2, new StatusResponse(sessionID, FAILED)); |
| assertNoMessagesSent(sessions, PARTICIPANT3); |
| } |
| |
| @Test |
| public void handleStatusResponseFinalized() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| session.setState(FINALIZE_PROMISED); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZED)); |
| Assert.assertEquals(FINALIZED, session.getState()); |
| } |
| |
| @Test |
| public void handleStatusResponseFinalizedRedundant() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| session.setState(FINALIZED); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZED)); |
| Assert.assertEquals(FINALIZED, session.getState()); |
| } |
| |
| @Test |
| public void handleStatusResponseFailed() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| session.setState(FINALIZE_PROMISED); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FAILED)); |
| Assert.assertEquals(FAILED, session.getState()); |
| } |
| |
| @Test |
| public void handleStatusResponseFailedRedundant() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| session.setState(FAILED); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FAILED)); |
| Assert.assertEquals(FAILED, session.getState()); |
| } |
| |
| @Test |
| public void handleStatusResponseNoop() throws Exception |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| LocalSession session = sessions.prepareForTest(sessionID); |
| session.setState(REPAIRING); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZE_PROMISED)); |
| Assert.assertEquals(REPAIRING, session.getState()); |
| } |
| |
| @Test |
| public void handleStatusResponseNoSession() throws Exception |
| { |
| UUID sessionID = UUIDGen.getTimeUUID(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZE_PROMISED)); |
| Assert.assertNull(sessions.getSession(sessionID)); |
| } |
| |
| /** |
| * Check all states (except failed) |
| */ |
| @Test |
| public void isSessionInProgress() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| sessions.prepareSessionFuture = SettableFuture.create(); // prevent moving to prepared |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| |
| LocalSession session = sessions.getSession(sessionID); |
| Assert.assertNotNull(session); |
| Assert.assertEquals(PREPARING, session.getState()); |
| Assert.assertTrue(sessions.isSessionInProgress(sessionID)); |
| |
| session.setState(PREPARED); |
| Assert.assertTrue(sessions.isSessionInProgress(sessionID)); |
| |
| session.setState(REPAIRING); |
| Assert.assertTrue(sessions.isSessionInProgress(sessionID)); |
| |
| session.setState(FINALIZE_PROMISED); |
| Assert.assertTrue(sessions.isSessionInProgress(sessionID)); |
| |
| session.setState(FINALIZED); |
| Assert.assertFalse(sessions.isSessionInProgress(sessionID)); |
| } |
| |
| @Test |
| public void isSessionInProgressFailed() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| sessions.prepareSessionFuture = SettableFuture.create(); |
| sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); |
| sessions.prepareSessionFuture.set(new Object()); |
| |
| Assert.assertTrue(sessions.isSessionInProgress(sessionID)); |
| sessions.failSession(sessionID); |
| Assert.assertFalse(sessions.isSessionInProgress(sessionID)); |
| } |
| |
| @Test |
| public void isSessionInProgressNonExistantSession() |
| { |
| UUID fakeID = UUIDGen.getTimeUUID(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| Assert.assertFalse(sessions.isSessionInProgress(fakeID)); |
| } |
| |
| @Test |
| public void finalRepairedAtFinalized() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| sessions.prepareForTest(sessionID); |
| sessions.maybeSetRepairing(sessionID); |
| sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); |
| sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(sessionID)); |
| |
| LocalSession session = sessions.getSession(sessionID); |
| Assert.assertTrue(session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE); |
| Assert.assertEquals(session.repairedAt, sessions.getFinalSessionRepairedAt(sessionID)); |
| } |
| |
| @Test |
| public void finalRepairedAtFailed() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| sessions.prepareForTest(sessionID); |
| sessions.failSession(sessionID); |
| |
| LocalSession session = sessions.getSession(sessionID); |
| Assert.assertTrue(session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE); |
| long repairedAt = sessions.getFinalSessionRepairedAt(sessionID); |
| Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, repairedAt); |
| } |
| |
| @Test |
| public void finalRepairedAtNoSession() |
| { |
| UUID fakeID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| long repairedAt = sessions.getFinalSessionRepairedAt(fakeID); |
| Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, repairedAt); |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void finalRepairedAtInProgress() |
| { |
| UUID sessionID = registerSession(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| sessions.prepareForTest(sessionID); |
| |
| sessions.getFinalSessionRepairedAt(sessionID); |
| } |
| |
| /** |
| * Startup happy path |
| */ |
| @Test |
| public void startup() throws Exception |
| { |
| InstrumentedLocalSessions initialSessions = new InstrumentedLocalSessions(); |
| initialSessions.start(); |
| Assert.assertEquals(0, initialSessions.getNumSessions()); |
| UUID id1 = registerSession(); |
| UUID id2 = registerSession(); |
| UUID id3 = registerSession(); |
| |
| initialSessions.prepareForTest(id1); |
| initialSessions.prepareForTest(id2); |
| initialSessions.prepareForTest(id3); |
| |
| Assert.assertEquals(3, initialSessions.getNumSessions()); |
| LocalSession session1 = initialSessions.getSession(id1); |
| LocalSession session2 = initialSessions.getSession(id2); |
| LocalSession session3 = initialSessions.getSession(id3); |
| initialSessions.setStateAndSave(session2, PREPARED); |
| initialSessions.setStateAndSave(session2, REPAIRING); |
| initialSessions.setStateAndSave(session2, FINALIZE_PROMISED); |
| initialSessions.setStateAndSave(session3, PREPARED); |
| initialSessions.setStateAndSave(session3, REPAIRING); |
| initialSessions.setStateAndSave(session3, FINALIZE_PROMISED); |
| initialSessions.setStateAndSave(session3, FINALIZED); |
| Assert.assertEquals(3, initialSessions.getNumSessions()); |
| |
| // subsequent startups should load persisted sessions |
| InstrumentedLocalSessions nextSessions = new InstrumentedLocalSessions(); |
| Assert.assertEquals(0, nextSessions.getNumSessions()); |
| nextSessions.start(); |
| Assert.assertEquals(3, nextSessions.getNumSessions()); |
| |
| LocalSession session1next = nextSessions.getSession(id1); |
| LocalSession session2next = nextSessions.getSession(id2); |
| LocalSession session3next = nextSessions.getSession(id3); |
| |
| // non-finalized sessions should fail & notify coordinator after startup |
| assertMessagesSent(nextSessions, session1next.coordinator, new FailSession(session1next.sessionID)); |
| Assert.assertEquals(session1.sessionID, session1next.sessionID); |
| Assert.assertEquals(FAILED, session1next.getState()); |
| |
| Assert.assertEquals(session2, session2next); |
| Assert.assertEquals(session3, session3next); |
| |
| } |
| |
| /** |
| * Stop happy path |
| */ |
| @Test |
| public void stop() throws Exception |
| { |
| InstrumentedLocalSessions initialSessions = new InstrumentedLocalSessions(); |
| initialSessions.start(); |
| Assert.assertEquals(0, initialSessions.getNumSessions()); |
| UUID id1 = registerSession(); |
| UUID id2 = registerSession(); |
| UUID id3 = registerSession(); |
| |
| initialSessions.prepareForTest(id1); |
| initialSessions.prepareForTest(id2); |
| initialSessions.prepareForTest(id3); |
| |
| Assert.assertEquals(3, initialSessions.getNumSessions()); |
| LocalSession session1 = initialSessions.getSession(id1); |
| LocalSession session2 = initialSessions.getSession(id2); |
| LocalSession session3 = initialSessions.getSession(id3); |
| initialSessions.setStateAndSave(session2, PREPARED); |
| initialSessions.setStateAndSave(session2, REPAIRING); |
| initialSessions.setStateAndSave(session2, FINALIZE_PROMISED); |
| initialSessions.setStateAndSave(session3, PREPARED); |
| initialSessions.setStateAndSave(session3, REPAIRING); |
| initialSessions.setStateAndSave(session3, FINALIZE_PROMISED); |
| initialSessions.setStateAndSave(session3, FINALIZED); |
| |
| initialSessions.stop(); |
| // clean shutdown should fail session1 & notify coordinator |
| assertMessagesSent(initialSessions, session1.coordinator, new FailSession(session1.sessionID)); |
| |
| // subsequent startups should load persisted sessions |
| InstrumentedLocalSessions nextSessions = new InstrumentedLocalSessions(); |
| Assert.assertEquals(0, nextSessions.getNumSessions()); |
| nextSessions.start(); |
| Assert.assertEquals(3, nextSessions.getNumSessions()); |
| |
| LocalSession session1next = nextSessions.getSession(id1); |
| LocalSession session2next = nextSessions.getSession(id2); |
| LocalSession session3next = nextSessions.getSession(id3); |
| |
| Assert.assertEquals(session1, session1next); |
| Assert.assertEquals(session2, session2next); |
| Assert.assertEquals(session3, session3next); |
| // clean shutdown above should make startup send no messages; |
| assertNoMessagesSent(nextSessions, session1next.coordinator); |
| assertNoMessagesSent(nextSessions, session2next.coordinator); |
| assertNoMessagesSent(nextSessions, session3next.coordinator); |
| } |
| |
| /** |
| * If LocalSessions.start is called more than |
| * once, an exception should be thrown |
| */ |
| @Test (expected = IllegalArgumentException.class) |
| public void multipleStartupFailure() throws Exception |
| { |
| InstrumentedLocalSessions initialSessions = new InstrumentedLocalSessions(); |
| initialSessions.start(); |
| initialSessions.start(); |
| } |
| |
| /** |
| * If there are problems with the rows we're reading out of the repair table, we should |
| * do the best we can to repair them, but not refuse to startup. |
| */ |
| @Test |
| public void loadCorruptRow() throws Exception |
| { |
| LocalSessions sessions = new LocalSessions(); |
| LocalSession session = createSession(); |
| sessions.save(session); |
| |
| sessions = new LocalSessions(); |
| sessions.start(); |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| |
| QueryProcessor.instance.executeInternal("DELETE participants, participants_wp FROM system.repairs WHERE parent_id=?", session.sessionID); |
| |
| sessions = new LocalSessions(); |
| sessions.start(); |
| Assert.assertNull(sessions.getSession(session.sessionID)); |
| UntypedResultSet res = QueryProcessor.executeInternal("SELECT * FROM system.repairs WHERE parent_id=?", session.sessionID); |
| assertTrue(res.isEmpty()); |
| } |
| |
| private static LocalSession sessionWithTime(int started, int updated) |
| { |
| LocalSession.Builder builder = createBuilder(); |
| builder.withStartedAt(started); |
| builder.withRepairedAt(started); |
| builder.withLastUpdate(updated); |
| return builder.build(); |
| } |
| |
| /** |
| * Sessions that shouldn't be failed or deleted are left alone |
| */ |
| @Test |
| public void cleanupNoOp() throws Exception |
| { |
| LocalSessions sessions = new LocalSessions(); |
| sessions.start(); |
| |
| int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT + 60; |
| LocalSession session = sessionWithTime(time - 1, time); |
| |
| sessions.putSessionUnsafe(session); |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| |
| sessions.cleanup(); |
| |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| } |
| |
| /** |
| * Sessions past the auto fail cutoff should be failed |
| */ |
| @Test |
| public void cleanupFail() throws Exception |
| { |
| LocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; |
| LocalSession session = sessionWithTime(time - 1, time); |
| session.setState(REPAIRING); |
| |
| sessions.putSessionUnsafe(session); |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| |
| sessions.cleanup(); |
| |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| Assert.assertEquals(FAILED, session.getState()); |
| Assert.assertEquals(session, sessions.loadUnsafe(session.sessionID)); |
| } |
| |
| /** |
| * Sessions past the auto delete cutoff with no sstables should be deleted |
| */ |
| @Test |
| public void cleanupDeleteNoSSTables() throws Exception |
| { |
| LocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; |
| LocalSession failed = sessionWithTime(time - 1, time); |
| failed.setState(FAILED); |
| |
| LocalSession finalized = sessionWithTime(time - 1, time); |
| finalized.setState(FINALIZED); |
| |
| sessions.putSessionUnsafe(failed); |
| sessions.putSessionUnsafe(finalized); |
| Assert.assertNotNull(sessions.getSession(failed.sessionID)); |
| Assert.assertNotNull(sessions.getSession(finalized.sessionID)); |
| |
| sessions.cleanup(); |
| |
| // failed session should be gone, but finalized should not, since it hasn't been superseded |
| Assert.assertNull(sessions.getSession(failed.sessionID)); |
| Assert.assertNotNull(sessions.getSession(finalized.sessionID)); |
| |
| Assert.assertNull(sessions.loadUnsafe(failed.sessionID)); |
| Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID)); |
| |
| // add a finalized superseding session |
| LocalSession superseding = sessionWithTime(time, time + 1); |
| superseding.setState(FINALIZED); |
| sessions.putSessionUnsafe(superseding); |
| |
| sessions.cleanup(); |
| |
| // old finalized should be removed, superseding should still be there |
| Assert.assertNull(sessions.getSession(finalized.sessionID)); |
| Assert.assertNotNull(sessions.getSession(superseding.sessionID)); |
| |
| Assert.assertNull(sessions.loadUnsafe(finalized.sessionID)); |
| Assert.assertNotNull(sessions.loadUnsafe(superseding.sessionID)); |
| } |
| |
| /** |
| * Sessions past the auto delete cutoff with no sstables should be deleted |
| */ |
| @Test |
| public void cleanupDeleteSSTablesRemaining() throws Exception |
| { |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); |
| sessions.start(); |
| |
| int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; |
| LocalSession failed = sessionWithTime(time - 1, time); |
| failed.setState(FAILED); |
| |
| LocalSession finalized = sessionWithTime(time - 1, time); |
| finalized.setState(FINALIZED); |
| |
| sessions.putSessionUnsafe(failed); |
| sessions.putSessionUnsafe(finalized); |
| Assert.assertNotNull(sessions.getSession(failed.sessionID)); |
| Assert.assertNotNull(sessions.getSession(finalized.sessionID)); |
| |
| sessions.sessionHasData = true; |
| sessions.cleanup(); |
| |
| Assert.assertNotNull(sessions.getSession(failed.sessionID)); |
| Assert.assertNotNull(sessions.getSession(finalized.sessionID)); |
| |
| Assert.assertNotNull(sessions.loadUnsafe(failed.sessionID)); |
| Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID)); |
| } |
| |
| /** |
| * Sessions should start checking the status of their participants if |
| * there hasn't been activity for the CHECK_STATUS_TIMEOUT period |
| */ |
| @Test |
| public void cleanupStatusRequest() throws Exception |
| { |
| AtomicReference<LocalSession> checkedSession = new AtomicReference<>(); |
| InstrumentedLocalSessions sessions = new InstrumentedLocalSessions() { |
| public void sendStatusRequest(LocalSession session) |
| { |
| Assert.assertTrue(checkedSession.compareAndSet(null, session)); |
| } |
| }; |
| sessions.start(); |
| |
| int time = FBUtilities.nowInSeconds() - LocalSessions.CHECK_STATUS_TIMEOUT - 1; |
| LocalSession session = sessionWithTime(time - 1, time); |
| session.setState(REPAIRING); |
| |
| sessions.putSessionUnsafe(session); |
| Assert.assertNotNull(sessions.getSession(session.sessionID)); |
| |
| sessions.cleanup(); |
| |
| Assert.assertEquals(session, checkedSession.get()); |
| } |
| } |
| |