blob: 7cf4c167f75da938fa0eb81602190a5d388cf32c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.impl;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.UUID;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.Collections.singleton;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress;
public class UnsafeGossipHelper
{
public static class HostInfo implements Serializable
{
final InetSocketAddress address;
final UUID hostId;
final String tokenString;
final int messagingVersion;
final boolean isShutdown;
private HostInfo(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
{
this.address = address;
this.hostId = hostId;
this.tokenString = tokenString;
this.messagingVersion = messagingVersion;
this.isShutdown = isShutdown;
}
private HostInfo(IInstance instance)
{
this(instance, instance.config().hostId(), instance.config().getString("initial_token"));
}
private HostInfo(IInstance instance, UUID hostId, String tokenString)
{
this(instance.broadcastAddress(), hostId, tokenString, instance.getMessagingVersion(), instance.isShutdown());
}
}
public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
{
return () -> {
try
{
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
InetAddressAndPort addressAndPort = getByAddress(address);
Token token;
if (FBUtilities.getBroadcastAddressAndPort().equals(addressAndPort))
{
// try grabbing saved tokens so that - if we're leaving - we get the ones we may have adopted as part of a range movement
// if that fails, grab them from config (as we're probably joining and should just use the default token)
Token.TokenFactory tokenFactory = DatabaseDescriptor.getPartitioner().getTokenFactory();
if (tokenString == null)
{
Token tmp;
try
{
tmp = getOnlyElement(SystemKeyspace.getSavedTokens());
}
catch (Throwable t)
{
tmp = tokenFactory.fromString(getOnlyElement(DatabaseDescriptor.getInitialTokens()));
}
token = tmp;
}
else
{
token = tokenFactory.fromString(tokenString);
}
SystemKeyspace.setLocalHostId(hostId);
SystemKeyspace.updateTokens(singleton(token));
}
else
{
if (tokenString == null)
throw new IllegalArgumentException();
token = DatabaseDescriptor.getPartitioner().getTokenFactory().fromString(tokenString);
}
Gossiper.runInGossipStageBlocking(() -> {
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
if (state == null)
{
Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
Gossiper.instance.realMarkAlive(addressAndPort, state);
}
state.addApplicationState(ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(singleton(token)));
VersionedValue status = statusFactory.apply(new VersionedValue.VersionedValueFactory(partitioner), singleton(token));
state.addApplicationState(ApplicationState.STATUS_WITH_PORT, status);
StorageService.instance.onChange(addressAndPort, ApplicationState.STATUS_WITH_PORT, status);
});
int setMessagingVersion = isShutdown
? MessagingService.current_version
: Math.min(MessagingService.current_version, messagingVersion);
MessagingService.instance().versions.set(addressAndPort, setMessagingVersion);
PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
throw new RuntimeException(e);
}
};
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer)
{
return addToRingNormalRunner(new HostInfo(peer));
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer, UUID hostId, String tokenString)
{
return addToRingNormalRunner(new HostInfo(peer, hostId, tokenString));
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(HostInfo info)
{
return addToRingNormalRunner(info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown);
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
{
return addToRingRunner(VersionedValue.VersionedValueFactory::normal, address, hostId, tokenString, messagingVersion, isShutdown);
}
public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, HostInfo info)
{
return addToRingRunner(statusFactory, info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown);
}
// reset gossip state so we know of the node being alive only
public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(IInstance instance)
{
return removeFromRingRunner(new HostInfo(instance));
}
// reset gossip state so we know of the node being alive only
public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(HostInfo info)
{
return removeFromRingRunner(info.address, info.hostId, info.tokenString);
}
public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(InetSocketAddress address, UUID hostId, String tokenString)
{
return () -> {
try
{
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
Token token = partitioner.getTokenFactory().fromString(tokenString);
InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(address);
Gossiper.runInGossipStageBlocking(() -> {
StorageService.instance.onChange(addressAndPort,
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).left(singleton(token), 0L));
Gossiper.instance.unsafeAnnulEndpoint(addressAndPort);
Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
});
PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
throw new RuntimeException(e);
}
};
}
public static IIsolatedExecutor.SerializableRunnable addToRingBootstrappingRunner(IInstance peer)
{
return addToRingRunner(VersionedValue.VersionedValueFactory::bootstrapping, new HostInfo(peer));
}
public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, IInvokableInstance replacing, UUID hostId, String tokenString)
{
return addToRingBootstrapReplacingRunner(peer, replacing.broadcastAddress(), hostId, tokenString);
}
public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, InetSocketAddress replacingAddress, UUID hostId, String tokenString)
{
return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacingAddress)), new HostInfo(peer, hostId, tokenString));
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, IInstance replaced)
{
return addToRingNormalReplacedRunner(peer, replaced.broadcastAddress());
}
public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, InetSocketAddress replacedAddress)
{
return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacedAddress)), new HostInfo(peer, null, null));
}
public static IIsolatedExecutor.SerializableRunnable addToRingLeavingRunner(IInstance peer)
{
return addToRingRunner(VersionedValue.VersionedValueFactory::leaving, new HostInfo(peer, null, null));
}
public static IIsolatedExecutor.SerializableRunnable addToRingLeftRunner(IInstance peer)
{
return addToRingRunner((factory, tokens) -> factory.left(tokens, Long.MAX_VALUE), new HostInfo(peer, null, null));
}
public static void removeFromRing(IInstance peer)
{
removeFromRingRunner(peer).run();
}
public static void addToRingNormal(IInstance peer)
{
addToRingNormalRunner(peer).run();
assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
}
public static void addToRingBootstrapping(IInstance peer)
{
addToRingBootstrappingRunner(peer).run();
}
public static IIsolatedExecutor.SerializableRunnable markShutdownRunner(InetSocketAddress address)
{
return () -> {
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
Gossiper.runInGossipStageBlocking(() -> {
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(getByAddress(address));
VersionedValue status = new VersionedValue.VersionedValueFactory(partitioner).shutdown(true);
state.addApplicationState(ApplicationState.STATUS, status);
state.getHeartBeatState().forceHighestPossibleVersionUnsafe();
StorageService.instance.onChange(getByAddress(address), ApplicationState.STATUS, status);
});
};
}
}