blob: 7a46935646fb68a0e84d4b9d8852e37fa5697243 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.action;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.shared.VersionedApplicationState;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.junit.Assert.assertTrue;
public class GossipHelper
{
public static InstanceAction statusToBlank(IInvokableInstance newNode)
{
return (instance) -> changeGossipState(instance, newNode,Collections.emptyList());
}
public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
{
return (instance) ->
{
changeGossipState(instance,
newNode,
Arrays.asList(tokens(newNode),
statusBootstrapping(newNode),
statusWithPortBootstrapping(newNode)));
};
}
public static InstanceAction statusToDecommission(IInvokableInstance newNode)
{
return (instance) ->
{
changeGossipState(instance,
newNode,
Arrays.asList(tokens(newNode),
statusLeaving(newNode),
statusWithPortLeaving(newNode)));
};
}
public static InstanceAction statusToNormal(IInvokableInstance peer)
{
return (target) ->
{
changeGossipState(target,
peer,
Arrays.asList(tokens(peer),
statusNormal(peer),
releaseVersion(peer),
netVersion(peer),
statusWithPortNormal(peer)));
};
}
/**
* This method is unsafe and should be used _only_ when gossip is not used or available: it creates versioned values on the
* target instance, which means Gossip versioning gets out of sync. Use a safe couterpart at all times when performing _any_
* ring movement operations _or_ if Gossip is used.
*/
public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer)
{
final int messagingVersion = getOrDefaultMessagingVersion(target, peer);
changeGossipState(target,
peer,
Arrays.asList(unsafeVersionedValue(target,
ApplicationState.TOKENS,
(partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
peer.config().getString("partitioner"),
peer.config().getString("initial_token")),
unsafeVersionedValue(target,
ApplicationState.STATUS,
(partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
peer.config().getString("partitioner"),
peer.config().getString("initial_token")),
unsafeVersionedValue(target,
ApplicationState.STATUS_WITH_PORT,
(partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
peer.config().getString("partitioner"),
peer.config().getString("initial_token")),
unsafeVersionedValue(target,
ApplicationState.NET_VERSION,
(partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
peer.config().getString("partitioner")),
unsafeReleaseVersion(target,
peer.config().getString("partitioner"),
peer.getReleaseVersionString())));
}
public static InstanceAction statusToLeaving(IInvokableInstance newNode)
{
return (instance) -> {
changeGossipState(instance,
newNode,
Arrays.asList(tokens(newNode),
statusLeaving(newNode),
statusWithPortLeaving(newNode)));
};
}
public static InstanceAction bootstrap()
{
return new BootstrapAction();
}
public static InstanceAction bootstrap(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema)
{
return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
}
public static InstanceAction disseminateGossipState(IInvokableInstance newNode)
{
return new DisseminateGossipState(newNode);
}
public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
{
return new PullSchemaFrom(pullFrom);
}
private static InstanceAction disableBinary()
{
return (instance) -> instance.nodetoolResult("disablebinary").asserts().success();
}
private static class DisseminateGossipState implements InstanceAction
{
final Map<InetSocketAddress, byte[]> gossipState;
public DisseminateGossipState(IInvokableInstance... from)
{
gossipState = new HashMap<>();
for (IInvokableInstance node : from)
{
byte[] epBytes = node.callsOnInstance(() -> {
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
return toBytes(epState);
}).call();
gossipState.put(node.broadcastAddress(), epBytes);
}
}
public void accept(IInvokableInstance instance)
{
instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress, byte[]>, Void>)
(map) -> {
Map<InetAddressAndPort, EndpointState> newState = new HashMap<>();
for (Map.Entry<InetSocketAddress, byte[]> e : map.entrySet())
newState.put(toCassandraInetAddressAndPort(e.getKey()), fromBytes(e.getValue()));
Gossiper.runInGossipStageBlocking(() -> {
Gossiper.instance.applyStateLocally(newState);
});
return null;
}).apply(gossipState);
}
}
private static byte[] toBytes(EndpointState epState)
{
try (DataOutputBuffer out = new DataOutputBuffer(1024))
{
EndpointState.serializer.serialize(epState, out, MessagingService.current_version);
return out.toByteArray();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private static EndpointState fromBytes(byte[] bytes)
{
try (DataInputBuffer in = new DataInputBuffer(bytes))
{
return EndpointState.serializer.deserialize(in, MessagingService.current_version);
}
catch (Throwable t)
{
throw new RuntimeException(t);
}
}
private static class PullSchemaFrom implements InstanceAction
{
final InetSocketAddress pullFrom;
public PullSchemaFrom(IInvokableInstance pullFrom)
{
this.pullFrom = pullFrom.broadcastAddress();;
}
public void accept(IInvokableInstance pullTo)
{
pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom);
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
Gossiper.instance.doOnChangeNotifications(endpoint, ApplicationState.SCHEMA, state.getApplicationState(ApplicationState.SCHEMA));
assertTrue("schema is ready", Schema.instance.waitUntilReady(Duration.ofSeconds(10)));
}).accept(pullFrom);
}
}
private static class BootstrapAction implements InstanceAction, Serializable
{
private final boolean joinRing;
private final Duration waitForBootstrap;
private final Duration waitForSchema;
public BootstrapAction()
{
this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
}
public BootstrapAction(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema)
{
this.joinRing = joinRing;
this.waitForBootstrap = waitForBootstrap;
this.waitForSchema = waitForSchema;
}
public void accept(IInvokableInstance instance)
{
instance.appliesOnInstance((String partitionerString, String tokenString) -> {
IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
List<Token> tokens = Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
try
{
Collection<InetAddressAndPort> collisions = StorageService.instance.prepareForBootstrap(waitForSchema.toMillis(), 0);
assert collisions.size() == 0 : String.format("Didn't expect any replacements but got %s", collisions);
boolean isBootstrapSuccessful = StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
assert isBootstrapSuccessful : "Bootstrap did not complete successfully";
StorageService.instance.setUpDistributedSystemKeyspaces();
if (joinRing)
StorageService.instance.finishJoiningRing(true, tokens);
}
catch (Throwable t)
{
throw new RuntimeException(t);
}
return null;
}).apply(instance.config().getString("partitioner"), instance.config().getString("initial_token"));
}
}
public static InstanceAction decommission()
{
return decommission(false);
}
public static InstanceAction decommission(boolean force)
{
return force ? (target) -> target.nodetoolResult("decommission", "--force").asserts().success()
: (target) -> target.nodetoolResult("decommission").asserts().success();
}
public static VersionedApplicationState tokens(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.TOKENS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
}
public static VersionedApplicationState netVersion(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.NET_VERSION, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion());
}
private static VersionedApplicationState unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String releaseVersionStr)
{
return unsafeVersionedValue(instance, ApplicationState.RELEASE_VERSION, (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr), partitionerStr);
}
public static VersionedApplicationState releaseVersion(IInvokableInstance instance)
{
return unsafeReleaseVersion(instance, instance.config().getString("partitioner"), instance.getReleaseVersionString());
}
public static VersionedApplicationState statusNormal(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
}
public static VersionedApplicationState statusWithPortNormal(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
}
public static VersionedApplicationState statusBootstrapping(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
}
public static VersionedApplicationState statusWithPortBootstrapping(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
}
public static VersionedApplicationState statusLeaving(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
}
public static VersionedApplicationState statusLeft(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> {
return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, currentTimeMillis() + Gossiper.aVeryLongTime);
});
}
public static VersionedApplicationState statusWithPortLeft(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> {
return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, currentTimeMillis() + Gossiper.aVeryLongTime);
});
}
public static VersionedApplicationState statusWithPortLeaving(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
}
public static VersionedValue toVersionedValue(VersionedApplicationState vv)
{
return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
}
public static ApplicationState toApplicationState(VersionedApplicationState vv)
{
return ApplicationState.values()[vv.applicationState];
}
private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance,
ApplicationState applicationState,
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier,
String partitionerStr, String initialTokenStr)
{
return instance.appliesOnInstance((String partitionerString, String tokenString) -> {
IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
Collection<Token> tokens = tokenString.contains(",")
? Stream.of(tokenString.split(",")).map(partitioner.getTokenFactory()::fromString).collect(Collectors.toList())
: Collections.singleton(partitioner.getTokenFactory().fromString(tokenString));
VersionedValue versionedValue = supplier.apply(partitioner, tokens);
return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version);
}).apply(partitionerStr, initialTokenStr);
}
private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance,
ApplicationState applicationState,
IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
String partitionerStr)
{
return instance.appliesOnInstance((String partitionerString) -> {
IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
VersionedValue versionedValue = supplier.apply(partitioner);
return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version);
}).apply(partitionerStr);
}
public static VersionedApplicationState versionedToken(IInvokableInstance instance, ApplicationState applicationState, IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier)
{
return unsafeVersionedValue(instance, applicationState, supplier, instance.config().getString("partitioner"), instance.config().getString("initial_token"));
}
public static InstanceAction removeFromRing(IInvokableInstance peer)
{
return (target) -> {
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(peer.broadcastAddress());
VersionedApplicationState newState = statusLeft(peer);
target.runOnInstance(() -> {
// state to 'left'
EndpointState currentState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
ApplicationState as = toApplicationState(newState);
VersionedValue vv = toVersionedValue(newState);
currentState.addApplicationState(as, vv);
StorageService.instance.onChange(endpoint, as, vv);
});
};
}
/**
* Changes gossip state of the `peer` on `target`
*/
public static void changeGossipState(IInvokableInstance target, IInstance peer, List<VersionedApplicationState> newState)
{
InetSocketAddress addr = peer.broadcastAddress();
UUID hostId = peer.config().hostId();
final int netVersion = getOrDefaultMessagingVersion(target, peer);
target.runOnInstance(() -> {
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
StorageService storageService = StorageService.instance;
Gossiper.runInGossipStageBlocking(() -> {
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null)
{
Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, netVersion, 1);
state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state.isAlive() && !Gossiper.instance.isDeadState(state))
Gossiper.instance.realMarkAlive(endpoint, state);
}
for (VersionedApplicationState value : newState)
{
ApplicationState as = toApplicationState(value);
VersionedValue vv = toVersionedValue(value);
state.addApplicationState(as, vv);
storageService.onChange(endpoint, as, vv);
}
});
});
}
private static int getOrDefaultMessagingVersion(IInvokableInstance target, IInstance peer)
{
int peerVersion = peer.getMessagingVersion();
final int netVersion = peerVersion == 0 ? target.getMessagingVersion() : peerVersion;
assert netVersion != 0 : "Unable to determine messaging version for peer {}" + peer.config().num();
return netVersion;
}
public static void withProperty(CassandraRelevantProperties prop, boolean value, Runnable r)
{
withProperty(prop, Boolean.toString(value), r);
}
public static void withProperty(CassandraRelevantProperties prop, String value, Runnable r)
{
String prev = prop.setString(value);
try
{
r.run();
}
finally
{
if (prev == null)
prop.clearValue(); // checkstyle: suppress nearby 'clearValueSystemPropertyUsage'
else
prop.setString(prev);
}
}
}