blob: da69a6807d0ec7a35f5f2123c64edafa49eb73c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.WaitQueue;
public class MigrationCoordinator
{
private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
private static final int MIGRATION_DELAY_IN_MS = 60000;
private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
public static final MigrationCoordinator instance = new MigrationCoordinator();
public static final String IGNORED_VERSIONS_PROP = "cassandra.skip_schema_check_for_versions";
public static final String IGNORED_ENDPOINTS_PROP = "cassandra.skip_schema_check_for_endpoints";
private static ImmutableSet<UUID> getIgnoredVersions()
{
String s = System.getProperty(IGNORED_VERSIONS_PROP);
if (s == null || s.isEmpty())
return ImmutableSet.of();
ImmutableSet.Builder<UUID> versions = ImmutableSet.builder();
for (String version : s.split(","))
{
versions.add(UUID.fromString(version));
}
return versions.build();
}
private static final Set<UUID> IGNORED_VERSIONS = getIgnoredVersions();
private static Set<InetAddress> getIgnoredEndpoints()
{
Set<InetAddress> endpoints = new HashSet<>();
String s = System.getProperty(IGNORED_ENDPOINTS_PROP);
if (s == null || s.isEmpty())
return endpoints;
for (String endpoint : s.split(","))
{
try
{
endpoints.add(InetAddress.getByName(endpoint));
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
return endpoints;
}
static class VersionInfo
{
final UUID version;
final Set<InetAddress> endpoints = Sets.newConcurrentHashSet();
final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet();
final Deque<InetAddress> requestQueue = new ArrayDeque<>();
private final WaitQueue waitQueue = new WaitQueue();
volatile boolean receivedSchema;
VersionInfo(UUID version)
{
this.version = version;
}
WaitQueue.Signal register()
{
return waitQueue.register();
}
void markReceived()
{
if (receivedSchema)
return;
receivedSchema = true;
waitQueue.signalAll();
}
boolean wasReceived()
{
return receivedSchema;
}
}
private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
private final Map<InetAddress, UUID> endpointVersions = new HashMap<>();
private final Set<InetAddress> ignoredEndpoints = getIgnoredEndpoints();
public void start()
{
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
}
public synchronized void reset()
{
versionInfo.clear();
}
synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
{
List<Future<Void>> futures = new ArrayList<>();
for (VersionInfo info : versionInfo.values())
{
if (info.wasReceived() || info.outstandingRequests.size() > 0)
continue;
Future<Void> future = maybePullSchema(info);
if (future != null && future != FINISHED_FUTURE)
futures.add(future);
}
return futures;
}
synchronized Future<Void> maybePullSchema(VersionInfo info)
{
if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
return FINISHED_FUTURE;
if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
return FINISHED_FUTURE;
for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
{
InetAddress endpoint = info.requestQueue.remove();
if (!info.endpoints.contains(endpoint))
continue;
if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
{
return scheduleSchemaPull(endpoint, info);
}
else
{
// return to queue
info.requestQueue.offer(endpoint);
}
}
// no suitable endpoints were found, check again in a minute, the periodic task will pick it up
return null;
}
public synchronized Map<UUID, Set<InetAddress>> outstandingVersions()
{
HashMap<UUID, Set<InetAddress>> map = new HashMap<>();
for (VersionInfo info : versionInfo.values())
if (!info.wasReceived())
map.put(info.version, ImmutableSet.copyOf(info.endpoints));
return map;
}
@VisibleForTesting
protected VersionInfo getVersionInfoUnsafe(UUID version)
{
return versionInfo.get(version);
}
@VisibleForTesting
protected int getMaxOutstandingVersionRequests()
{
return MAX_OUTSTANDING_VERSION_REQUESTS;
}
@VisibleForTesting
protected boolean isAlive(InetAddress endpoint)
{
return FailureDetector.instance.isAlive(endpoint);
}
@VisibleForTesting
protected boolean shouldPullSchema(UUID version)
{
if (Schema.instance.getVersion() == null)
{
logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
return false;
}
if (Schema.instance.getVersion().equals(version))
{
logger.debug("Not pulling schema for version {}, because schema versions match: " +
"local={}, remote={}",
version, Schema.instance.getVersion(),
version);
return false;
}
return true;
}
// Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
// from both 3.0 and 3.0.14.
private static boolean is30Compatible(int version)
{
return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
}
@VisibleForTesting
protected boolean shouldPullFromEndpoint(InetAddress endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return false;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null)
return false;
final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
if (!releaseVersion.startsWith(ourMajorVersion))
{
logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
endpoint, ourMajorVersion, releaseVersion);
return false;
}
if (!MessagingService.instance().knowsVersion(endpoint))
{
logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
return false;
}
if (!is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
{
logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
return false;
}
if (Gossiper.instance.isGossipOnlyMember(endpoint))
{
logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
return false;
}
return true;
}
@VisibleForTesting
protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
{
if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Immediately submitting migration task for {}, " +
"schema versions: local={}, remote={}",
endpoint,
Schema.instance.getVersion(),
version);
return true;
}
return false;
}
@VisibleForTesting
protected boolean isLocalVersion(UUID version)
{
return Schema.instance.getVersion().equals(version);
}
/**
* If a previous schema update brought our version the same as the incoming schema, don't apply it
*/
synchronized boolean shouldApplySchemaFor(VersionInfo info)
{
if (info.wasReceived())
return false;
return !isLocalVersion(info.version);
}
synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version)
{
if (ignoredEndpoints.contains(endpoint) || IGNORED_VERSIONS.contains(version))
{
endpointVersions.remove(endpoint);
removeEndpointFromVersion(endpoint, null);
return FINISHED_FUTURE;
}
UUID current = endpointVersions.put(endpoint, version);
if (current != null && current.equals(version))
return FINISHED_FUTURE;
VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
if (isLocalVersion(version))
info.markReceived();
info.endpoints.add(endpoint);
info.requestQueue.addFirst(endpoint);
// disassociate this endpoint from its (now) previous schema version
removeEndpointFromVersion(endpoint, current);
return maybePullSchema(info);
}
Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state)
{
if (state == null)
return FINISHED_FUTURE;
VersionedValue version = state.getApplicationState(ApplicationState.SCHEMA);
if (version == null)
return FINISHED_FUTURE;
return reportEndpointVersion(endpoint, UUID.fromString(version.value));
}
private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version)
{
if (version == null)
return;
VersionInfo info = versionInfo.get(version);
if (info == null)
return;
info.endpoints.remove(endpoint);
if (info.endpoints.isEmpty())
{
info.waitQueue.signalAll();
versionInfo.remove(version);
}
}
public synchronized void removeAndIgnoreEndpoint(InetAddress endpoint)
{
Preconditions.checkArgument(endpoint != null);
ignoredEndpoints.add(endpoint);
Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
for (UUID version : versions)
{
removeEndpointFromVersion(endpoint, version);
}
}
Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
{
FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
if (shouldPullImmediately(endpoint, info.version))
{
submitToMigrationIfNotShutdown(task);
}
else
{
ScheduledExecutors.nonPeriodicTasks.schedule(() -> submitToMigrationIfNotShutdown(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
return task;
}
private static Future<?> submitToMigrationIfNotShutdown(Runnable task)
{
LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION);
if (stage.isShutdown() || stage.isTerminated())
{
logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
return null;
}
else
return stage.submit(task);
}
@VisibleForTesting
protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
{
SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations);
}
class Callback implements IAsyncCallbackWithFailure<Collection<Mutation>>
{
final InetAddress endpoint;
final VersionInfo info;
public Callback(InetAddress endpoint, VersionInfo info)
{
this.endpoint = endpoint;
this.info = info;
}
public void onFailure(InetAddress from)
{
fail();
}
Future<Void> fail()
{
return pullComplete(endpoint, info, false);
}
public void response(MessageIn<Collection<Mutation>> message)
{
response(message.payload);
}
Future<Void> response(Collection<Mutation> mutations)
{
synchronized (info)
{
if (shouldApplySchemaFor(info))
{
try
{
mergeSchemaFrom(endpoint, mutations);
}
catch (Exception e)
{
logger.error(String.format("Unable to merge schema from %s", endpoint), e);
return fail();
}
}
return pullComplete(endpoint, info, true);
}
}
public boolean isLatencyForSnitch()
{
return false;
}
}
private void pullSchema(Callback callback)
{
if (!isAlive(callback.endpoint))
{
logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
callback.fail();
return;
}
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
if (!shouldPullFromEndpoint(callback.endpoint))
{
logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
callback.fail();
return;
}
logger.debug("Requesting schema from {}", callback.endpoint);
sendMigrationMessage(callback);
}
protected void sendMigrationMessage(Callback callback)
{
MessageOut<?> message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
logger.info("Sending schema pull request to {} at {} with timeout {}", callback.endpoint, System.currentTimeMillis(), message.getTimeout());
MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true);
}
private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful)
{
if (wasSuccessful)
info.markReceived();
info.outstandingRequests.remove(endpoint);
info.requestQueue.add(endpoint);
return maybePullSchema(info);
}
/**
* Wait until we've received schema responses for all versions we're aware of
* @param waitMillis
* @return true if response for all schemas were received, false if we timed out waiting
*/
public boolean awaitSchemaRequests(long waitMillis)
{
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
CassandraDaemon.waitForGossipToSettle();
WaitQueue.Signal signal = null;
try
{
synchronized (this)
{
List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
for (VersionInfo version : versionInfo.values())
{
if (version.wasReceived())
continue;
signalList.add(version.register());
}
if (signalList.isEmpty())
return true;
WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
signalList.toArray(signals);
signal = WaitQueue.all(signals);
}
return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
if (signal != null)
signal.cancel();
}
}
}