blob: 725833e5326b29e3748b71ecfc100fa606f3a747 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair.consistent;
import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.TimeUUID;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
public class CoordinatorSessionsTest extends AbstractRepairTest
{
private static TableMetadata cfm;
private static ColumnFamilyStore cfs;
// to check CoordinatorSessions is passing the messages to the coordinator session correctly
private static class InstrumentedCoordinatorSession extends CoordinatorSession
{
public InstrumentedCoordinatorSession(Builder builder)
{
super(builder);
}
int prepareResponseCalls = 0;
InetAddressAndPort preparePeer = null;
boolean prepareSuccess = false;
public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success)
{
prepareResponseCalls++;
preparePeer = participant;
prepareSuccess = success;
}
int finalizePromiseCalls = 0;
InetAddressAndPort promisePeer = null;
boolean promiseSuccess = false;
public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success)
{
finalizePromiseCalls++;
promisePeer = participant;
promiseSuccess = success;
}
int failCalls = 0;
public synchronized void fail()
{
failCalls++;
}
}
private static class InstrumentedCoordinatorSessions extends CoordinatorSessions
{
protected CoordinatorSession buildSession(CoordinatorSession.Builder builder)
{
return new InstrumentedCoordinatorSession(builder);
}
public InstrumentedCoordinatorSession getSession(TimeUUID sessionId)
{
return (InstrumentedCoordinatorSession) super.getSession(sessionId);
}
public InstrumentedCoordinatorSession registerSession(TimeUUID sessionId, Set<InetAddressAndPort> peers, boolean isForced) throws NoSuchRepairSessionException
{
return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers, isForced);
}
}
@BeforeClass
public static void setupClass()
{
SchemaLoader.prepareServer();
cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build();
SchemaLoader.createKeyspace("coordinatorsessiontest", KeyspaceParams.simple(1), cfm);
cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
}
private static TimeUUID registerSession()
{
return registerSession(cfs, true, true);
}
@Test
public void registerSessionTest() throws NoSuchRepairSessionException
{
CoordinatorSessions sessions = new CoordinatorSessions();
TimeUUID sessionID = registerSession();
CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(ConsistentSession.State.PREPARING, session.getState());
Assert.assertEquals(sessionID, session.sessionID);
Assert.assertEquals(COORDINATOR, session.coordinator);
Assert.assertEquals(Sets.newHashSet(cfm.id), session.tableIds);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
Assert.assertEquals(prs.repairedAt, session.repairedAt);
Assert.assertEquals(prs.getRanges(), session.ranges);
Assert.assertEquals(PARTICIPANTS, session.participants);
Assert.assertSame(session, sessions.getSession(sessionID));
}
@Test
public void handlePrepareResponse() throws NoSuchRepairSessionException
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID sessionID = registerSession();
InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(0, session.prepareResponseCalls);
sessions.handlePrepareResponse(new PrepareConsistentResponse(sessionID, PARTICIPANT1, true));
Assert.assertEquals(1, session.prepareResponseCalls);
Assert.assertEquals(PARTICIPANT1, session.preparePeer);
Assert.assertTrue(session.prepareSuccess);
}
@Test
public void handlePrepareResponseNoSession()
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID fakeID = nextTimeUUID();
sessions.handlePrepareResponse(new PrepareConsistentResponse(fakeID, PARTICIPANT1, true));
Assert.assertNull(sessions.getSession(fakeID));
}
@Test
public void handlePromiseResponse() throws NoSuchRepairSessionException
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID sessionID = registerSession();
InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(0, session.finalizePromiseCalls);
sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, PARTICIPANT1, true));
Assert.assertEquals(1, session.finalizePromiseCalls);
Assert.assertEquals(PARTICIPANT1, session.promisePeer);
Assert.assertTrue(session.promiseSuccess);
}
@Test
public void handlePromiseResponseNoSession()
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID fakeID = nextTimeUUID();
sessions.handleFinalizePromiseMessage(new FinalizePromise(fakeID, PARTICIPANT1, true));
Assert.assertNull(sessions.getSession(fakeID));
}
@Test
public void handleFailureMessage() throws NoSuchRepairSessionException
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID sessionID = registerSession();
InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(0, session.failCalls);
sessions.handleFailSessionMessage(new FailSession(sessionID));
Assert.assertEquals(1, session.failCalls);
}
@Test
public void handleFailureMessageNoSession()
{
InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
TimeUUID fakeID = nextTimeUUID();
sessions.handleFailSessionMessage(new FailSession(fakeID));
Assert.assertNull(sessions.getSession(fakeID));
}
}