blob: 37833205665291130963c22447acb3de54a39f43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.schema;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.MessagingMetrics;
import org.apache.cassandra.net.EndpointMessagingVersions;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.internal.creation.MockSettingsImpl;
import static com.google.common.util.concurrent.Futures.getUnchecked;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MigrationCoordinatorTest
{
private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
private static final InetAddressAndPort EP1;
private static final InetAddressAndPort EP2;
private static final InetAddressAndPort EP3;
private static final UUID LOCAL_VERSION = UUID.randomUUID();
private static final UUID V1 = UUID.randomUUID();
private static final UUID V2 = UUID.randomUUID();
private static final EndpointState validEndpointState = mock(EndpointState.class);
static
{
try
{
EP1 = InetAddressAndPort.getByName("10.0.0.1");
EP2 = InetAddressAndPort.getByName("10.0.0.2");
EP3 = InetAddressAndPort.getByName("10.0.0.3");
}
catch (UnknownHostException e)
{
throw new AssertionError(e);
}
DatabaseDescriptor.daemonInitialization();
when(validEndpointState.getApplicationState(ApplicationState.RELEASE_VERSION)).thenReturn(VersionedValue.unsafeMakeVersionedValue(FBUtilities.getReleaseVersionString(), 0));
}
private static class Wrapper
{
final Queue<Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>>> requests = new LinkedList<>();
final ScheduledExecutorService oneTimeExecutor = mock(ScheduledExecutorService.class);
final Gossiper gossiper = mock(Gossiper.class);
final Set<InetAddressAndPort> mergedSchemasFrom = new HashSet<>();
final EndpointMessagingVersions versions = mock(EndpointMessagingVersions.class);
final MessagingService messagingService = mock(MessagingService.class, new MockSettingsImpl<>().defaultAnswer(a -> {
throw new UnsupportedOperationException();
}).useConstructor(true, versions, mock(MessagingMetrics.class)));
UUID localSchemaVersion = LOCAL_VERSION;
final MigrationCoordinator coordinator;
private Wrapper()
{
this(3);
}
private Wrapper(int maxOutstandingRequests)
{
when(oneTimeExecutor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(a -> {
a.getArgument(0, Runnable.class).run();
return mock(ScheduledFuture.class);
});
when(gossiper.getEndpointStateForEndpoint(any())).thenReturn(validEndpointState);
when(gossiper.isAlive(any())).thenReturn(true);
doAnswer(a -> requests.add(Pair.create(a.getArgument(1, InetAddressAndPort.class), a.getArgument(2, RequestCallback.class))))
.when(messagingService)
.sendWithCallback(any(), any(), any());
when(versions.knows(any())).thenReturn(true);
when(versions.getRaw(any())).thenReturn(MessagingService.current_version);
this.coordinator = new MigrationCoordinator(messagingService,
ImmediateExecutor.INSTANCE,
oneTimeExecutor,
maxOutstandingRequests,
gossiper,
() -> localSchemaVersion,
(endpoint, ignored) -> mergedSchemasFrom.add(endpoint));
}
private InetAddressAndPort configureMocksForEndpoint(String endpointName, EndpointState es, Integer msgVersion, boolean gossipOnlyMember) throws UnknownHostException
{
InetAddressAndPort endpoint = InetAddressAndPort.getByName(endpointName);
return configureMocksForEndpoint(endpoint, es, msgVersion, gossipOnlyMember);
}
private InetAddressAndPort configureMocksForEndpoint(InetAddressAndPort endpoint, EndpointState es, Integer msgVersion, boolean gossipOnlyMember) throws UnknownHostException
{
when(gossiper.getEndpointStateForEndpoint(endpoint)).thenReturn(es);
if (msgVersion == null)
{
when(versions.knows(endpoint)).thenReturn(false);
when(versions.getRaw(endpoint)).thenThrow(new IllegalArgumentException());
}
else
{
when(versions.knows(endpoint)).thenReturn(true);
when(versions.getRaw(endpoint)).thenReturn(msgVersion);
}
when(gossiper.isGossipOnlyMember(endpoint)).thenReturn(gossipOnlyMember);
when(gossiper.isAlive(endpoint)).thenReturn(true);
return endpoint;
}
}
@Test
public void requestResponseCycle() throws InterruptedException
{
Wrapper wrapper = new Wrapper(1);
MigrationCoordinator coordinator = wrapper.coordinator;
Assert.assertTrue(wrapper.requests.isEmpty());
// first schema report should send a migration request
getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
Assert.assertEquals(1, wrapper.requests.size());
Assert.assertFalse(coordinator.awaitSchemaRequests(1));
// second should not
getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
Assert.assertEquals(1, wrapper.requests.size());
Assert.assertFalse(coordinator.awaitSchemaRequests(1));
// until the first request fails, then the second endpoint should be contacted
Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>> request1 = wrapper.requests.poll();
Assert.assertEquals(EP1, request1.left);
request1.right.onFailure(null, null);
Assert.assertTrue(wrapper.mergedSchemasFrom.isEmpty());
Assert.assertFalse(coordinator.awaitSchemaRequests(1));
// ... then the second endpoint should be contacted
Assert.assertEquals(1, wrapper.requests.size());
Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>> request2 = wrapper.requests.poll();
Assert.assertEquals(EP2, request2.left);
Assert.assertFalse(coordinator.awaitSchemaRequests(1));
request2.right.onResponse(Message.remoteResponse(request2.left, Verb.SCHEMA_PULL_RSP, Collections.emptyList()));
Assert.assertEquals(EP2, Iterables.getOnlyElement(wrapper.mergedSchemasFrom));
Assert.assertTrue(coordinator.awaitSchemaRequests(1));
// and migration tasks should not be sent out for subsequent version reports
getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
Assert.assertTrue(wrapper.requests.isEmpty());
}
/**
* If we don't send a request for a version, and endpoints associated with
* it all change versions, we should signal anyone waiting on that version
*/
@Test
public void versionsAreSignaledWhenDeleted()
{
Wrapper wrapper = new Wrapper();
wrapper.coordinator.reportEndpointVersion(EP1, V1);
WaitQueue.Signal signal = wrapper.coordinator.getVersionInfoUnsafe(V1).register();
Assert.assertFalse(signal.isSignalled());
wrapper.coordinator.reportEndpointVersion(EP1, V2);
Assert.assertNull(wrapper.coordinator.getVersionInfoUnsafe(V1));
Assert.assertTrue(signal.isSignalled());
}
/**
* If an endpoint is removed and no other endpoints are reporting its
* schema version, the version should be removed and we should signal
* anyone waiting on that version
*/
@Test
public void versionsAreSignaledWhenEndpointsRemoved()
{
Wrapper wrapper = new Wrapper();
wrapper.coordinator.reportEndpointVersion(EP1, V1);
WaitQueue.Signal signal = wrapper.coordinator.getVersionInfoUnsafe(V1).register();
Assert.assertFalse(signal.isSignalled());
wrapper.coordinator.removeAndIgnoreEndpoint(EP1);
Assert.assertNull(wrapper.coordinator.getVersionInfoUnsafe(V1));
Assert.assertTrue(signal.isSignalled());
}
private static void assertNoContact(Wrapper wrapper, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
{
Assert.assertTrue(wrapper.requests.isEmpty());
Future<Void> future = wrapper.coordinator.reportEndpointVersion(EP1, V1);
if (future != null)
getUnchecked(future);
Assert.assertTrue(wrapper.requests.isEmpty());
Assert.assertEquals(startupShouldBeUnblocked, wrapper.coordinator.awaitSchemaRequests(1));
}
private static void assertNoContact(Wrapper coordinator, boolean startupShouldBeUnblocked)
{
assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
}
@Test
public void dontContactNodesWithSameSchema()
{
Wrapper wrapper = new Wrapper();
wrapper.localSchemaVersion = V1;
assertNoContact(wrapper, true);
}
@Test
public void dontContactIncompatibleNodes()
{
Wrapper wrapper = new Wrapper();
when(wrapper.gossiper.getEndpointStateForEndpoint(any())).thenReturn(null); // shouldPullFromEndpoint should return false in this case
assertNoContact(wrapper, false);
}
@Test
public void dontContactDeadNodes()
{
Wrapper wrapper = new Wrapper();
when(wrapper.gossiper.isAlive(ArgumentMatchers.eq(EP1))).thenReturn(false);
assertNoContact(wrapper, EP1, V1, false);
}
/**
* If a node has become incompatible between when the task was scheduled and when it
* was run, we should detect that and fail the task
*/
@Test
public void testGossipRace()
{
Wrapper wrapper = new Wrapper();
when(wrapper.gossiper.getEndpointStateForEndpoint(any())).thenReturn(validEndpointState, (EndpointState) null);
assertNoContact(wrapper, EP1, V1, false);
}
@Test
public void testWeKeepSendingRequests() throws Exception
{
Wrapper wrapper = new Wrapper();
getUnchecked(wrapper.coordinator.reportEndpointVersion(EP3, V2));
Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>> cb = wrapper.requests.remove();
cb.right.onResponse(Message.remoteResponse(cb.left, Verb.SCHEMA_PULL_RSP, Collections.emptyList()));
getUnchecked(wrapper.coordinator.reportEndpointVersion(EP1, V1));
getUnchecked(wrapper.coordinator.reportEndpointVersion(EP2, V1));
Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>> prev = null;
Set<InetAddressAndPort> EPs = Sets.newHashSet(EP1, EP2);
int ep1requests = 0;
int ep2requests = 0;
for (int i = 0; i < 10; i++)
{
Assert.assertEquals(String.format("%s", i), 2, wrapper.requests.size());
Pair<InetAddressAndPort, RequestCallback<Collection<Mutation>>> next = wrapper.requests.remove();
// we should be contacting endpoints in a round robin fashion
Assert.assertTrue(EPs.contains(next.left));
if (prev != null && prev.left.equals(next.left))
Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.left, next.left));
// should send a new request
next.right.onFailure(null, null);
prev = next;
Assert.assertFalse(wrapper.coordinator.awaitSchemaRequests(1));
Assert.assertEquals(2, wrapper.requests.size());
}
logger.info("{} -> {}", EP1, ep1requests);
logger.info("{} -> {}", EP2, ep2requests);
// a single success should unblock startup though
cb = wrapper.requests.remove();
cb.right.onResponse(Message.remoteResponse(cb.left, Verb.SCHEMA_PULL_RSP, Collections.emptyList()));
Assert.assertTrue(wrapper.coordinator.awaitSchemaRequests(1));
}
/**
* Pull unreceived schemas should detect and send requests out for any
* schemas that are marked unreceived and have no outstanding requests
*/
@Test
public void pullUnreceived()
{
Wrapper wrapper = new Wrapper();
when(wrapper.gossiper.getEndpointStateForEndpoint(any())).thenReturn(null); // shouldPullFromEndpoint should return false in this case
assertNoContact(wrapper, false);
when(wrapper.gossiper.getEndpointStateForEndpoint(any())).thenReturn(validEndpointState);
Assert.assertEquals(0, wrapper.requests.size());
wrapper.coordinator.start();
Assert.assertEquals(1, wrapper.requests.size());
}
@Test
public void pushSchemaMutationsOnlyToViableNodes() throws UnknownHostException
{
Wrapper wrapper = new Wrapper();
Collection<Mutation> mutations = Arrays.asList(mock(Mutation.class));
EndpointState validState = mock(EndpointState.class);
InetAddressAndPort thisNode = wrapper.configureMocksForEndpoint(FBUtilities.getBroadcastAddressAndPort(), validState, MessagingService.current_version, false);
InetAddressAndPort unkonwnNode = wrapper.configureMocksForEndpoint("10.0.0.1:8000", validState, null, false);
InetAddressAndPort invalidMessagingVersionNode = wrapper.configureMocksForEndpoint("10.0.0.2:8000", validState, MessagingService.VERSION_30, false);
InetAddressAndPort regularNode = wrapper.configureMocksForEndpoint("10.0.0.3:8000", validState, MessagingService.current_version, false);
when(wrapper.gossiper.getLiveMembers()).thenReturn(Sets.newHashSet(thisNode, unkonwnNode, invalidMessagingVersionNode, regularNode));
ArgumentCaptor<Message> msg = ArgumentCaptor.forClass(Message.class);
ArgumentCaptor<InetAddressAndPort> targetEndpoint = ArgumentCaptor.forClass(InetAddressAndPort.class);
doNothing().when(wrapper.messagingService).send(msg.capture(), targetEndpoint.capture());
Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> result = wrapper.coordinator.pushSchemaMutations(mutations);
assertThat(result.left()).containsExactlyInAnyOrder(regularNode);
assertThat(result.right()).containsExactlyInAnyOrder(thisNode, unkonwnNode, invalidMessagingVersionNode);
assertThat(msg.getAllValues()).hasSize(1);
assertThat(msg.getValue().payload).isEqualTo(mutations);
assertThat(msg.getValue().verb()).isEqualTo(Verb.SCHEMA_PUSH_REQ);
assertThat(targetEndpoint.getValue()).isEqualTo(regularNode);
}
@Test
public void reset() throws UnknownHostException
{
Collection<Mutation> mutations = Arrays.asList(mock(Mutation.class));
Wrapper wrapper = new Wrapper();
wrapper.localSchemaVersion = SchemaConstants.emptyVersion;
EndpointState invalidVersionState = mock(EndpointState.class);
when(invalidVersionState.getApplicationState(ApplicationState.RELEASE_VERSION)).thenReturn(VersionedValue.unsafeMakeVersionedValue("3.0", 0));
when(invalidVersionState.getSchemaVersion()).thenReturn(V1);
EndpointState validVersionState = mock(EndpointState.class);
when(validVersionState.getApplicationState(ApplicationState.RELEASE_VERSION)).thenReturn(VersionedValue.unsafeMakeVersionedValue(FBUtilities.getReleaseVersionString(), 0));
when(validVersionState.getSchemaVersion()).thenReturn(V2);
EndpointState localVersionState = mock(EndpointState.class);
when(localVersionState.getApplicationState(ApplicationState.RELEASE_VERSION)).thenReturn(VersionedValue.unsafeMakeVersionedValue(FBUtilities.getReleaseVersionString(), 0));
when(localVersionState.getSchemaVersion()).thenReturn(SchemaConstants.emptyVersion);
// some nodes
InetAddressAndPort thisNode = wrapper.configureMocksForEndpoint(FBUtilities.getBroadcastAddressAndPort(), localVersionState, MessagingService.current_version, false);
InetAddressAndPort noStateNode = wrapper.configureMocksForEndpoint("10.0.0.1:8000", null, MessagingService.current_version, false);
InetAddressAndPort diffMajorVersionNode = wrapper.configureMocksForEndpoint("10.0.0.2:8000", invalidVersionState, MessagingService.current_version, false);
InetAddressAndPort unkonwnNode = wrapper.configureMocksForEndpoint("10.0.0.2:8000", validVersionState, null, false);
InetAddressAndPort invalidMessagingVersionNode = wrapper.configureMocksForEndpoint("10.0.0.3:8000", validVersionState, MessagingService.VERSION_30, false);
InetAddressAndPort gossipOnlyMemberNode = wrapper.configureMocksForEndpoint("10.0.0.4:8000", validVersionState, MessagingService.current_version, true);
InetAddressAndPort regularNode1 = wrapper.configureMocksForEndpoint("10.0.0.5:8000", validVersionState, MessagingService.current_version, false);
InetAddressAndPort regularNode2 = wrapper.configureMocksForEndpoint("10.0.0.6:8000", validVersionState, MessagingService.current_version, false);
Set<InetAddressAndPort> nodes = new LinkedHashSet<>(Arrays.asList(thisNode, noStateNode, diffMajorVersionNode, unkonwnNode, invalidMessagingVersionNode, gossipOnlyMemberNode, regularNode1, regularNode2));
when(wrapper.gossiper.getLiveMembers()).thenReturn(nodes);
doAnswer(a -> {
Message msg = a.getArgument(0, Message.class);
InetAddressAndPort endpoint = a.getArgument(1, InetAddressAndPort.class);
RequestCallback callback = a.getArgument(2, RequestCallback.class);
assertThat(msg.verb()).isEqualTo(Verb.SCHEMA_PULL_REQ);
assertThat(endpoint).isEqualTo(regularNode1);
callback.onResponse(Message.remoteResponse(regularNode1, Verb.SCHEMA_PULL_RSP, mutations));
return null;
}).when(wrapper.messagingService).sendWithCallback(any(Message.class), any(InetAddressAndPort.class), any(RequestCallback.class));
wrapper.coordinator.reset();
assertThat(wrapper.mergedSchemasFrom).anyMatch(ep -> regularNode1.equals(ep) || regularNode2.equals(ep));
assertThat(wrapper.mergedSchemasFrom).hasSize(1);
}
}