blob: 8b1577822b834ea222b09b6e468dcae25d56004d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.tcm.transformations;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
import org.apache.cassandra.tcm.ownership.PlacementProvider;
import org.apache.cassandra.tcm.ownership.PlacementTransitionPlan;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
public class PrepareReplace implements Transformation
{
private static final Logger logger = LoggerFactory.getLogger(PrepareReplace.class);
public static final Serializer serializer = new Serializer();
private final NodeId replaced;
private final NodeId replacement;
private final PlacementProvider placementProvider;
private final boolean joinTokenRing;
private final boolean streamData;
public PrepareReplace(NodeId replaced,
NodeId replacement,
PlacementProvider placementProvider,
boolean joinTokenRing,
boolean streamData)
{
this.replaced = replaced;
this.replacement = replacement;
this.placementProvider = placementProvider;
this.joinTokenRing = joinTokenRing;
this.streamData = streamData;
}
@Override
public Kind kind()
{
return Kind.PREPARE_REPLACE;
}
public NodeId replacement()
{
return replacement;
}
@Override
public Result execute(ClusterMetadata prev)
{
if (prev.directory.peerState(replaced) != NodeState.JOINED)
return new Rejected(INVALID, String.format("Rejecting this plan as the replaced node %s is in state %s", replaced, prev.directory.peerState(replaced)));
if (prev.directory.peerState(replacement) != NodeState.REGISTERED)
return new Rejected(INVALID, String.format("Rejecting this plan as the replacement node %s is in state %s", replacement, prev.directory.peerState(replacement)));
LockedRanges.Key unlockKey = LockedRanges.keyFor(prev.nextEpoch());
LockedRanges lockedRanges = prev.lockedRanges;
PlacementTransitionPlan transitionPlan = placementProvider.planForReplacement(prev,
replaced,
replacement,
prev.schema.getKeyspaces());
LockedRanges.AffectedRanges rangesToLock = transitionPlan.affectedRanges();
LockedRanges.Key alreadyLockedBy = lockedRanges.intersects(rangesToLock);
if (!alreadyLockedBy.equals(LockedRanges.NOT_LOCKED))
{
return new Rejected(INVALID, String.format("Rejecting this plan as it interacts with a range locked by %s (locked: %s, new: %s)",
alreadyLockedBy, lockedRanges, rangesToLock));
}
StartReplace start = new StartReplace(replaced, replacement, transitionPlan.addToWrites(), unlockKey);
MidReplace mid = new MidReplace(replaced, replacement, transitionPlan.moveReads(), unlockKey);
FinishReplace finish = new FinishReplace(replaced, replacement, transitionPlan.removeFromWrites(), unlockKey);
transitionPlan.assertPreExistingWriteReplica(prev.placements);
Set<Token> tokens = new HashSet<>(prev.tokenMap.tokens(replaced));
BootstrapAndReplace plan = BootstrapAndReplace.newSequence(prev.nextEpoch(),
unlockKey,
tokens,
start, mid, finish,
joinTokenRing, streamData);
LockedRanges newLockedRanges = lockedRanges.lock(unlockKey, rangesToLock);
ClusterMetadata.Transformer proposed = prev.transformer()
.with(newLockedRanges)
.with(prev.inProgressSequences.with(replacement(), plan));
logger.info("Node {} is replacing {}, tokens {}", prev.directory.endpoint(replacement), prev.directory.endpoint(replaced), prev.tokenMap.tokens(replaced));
return Transformation.success(proposed, rangesToLock);
}
@Override
public String toString()
{
return "PrepareReplace{" +
"replaced=" + replaced +
", replacement=" + replacement +
", joinTokenRing=" + joinTokenRing +
", streamData=" + streamData +
'}';
}
public static final class Serializer implements AsymmetricMetadataSerializer<Transformation, PrepareReplace>
{
public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException
{
assert t instanceof PrepareReplace;
PrepareReplace transformation = (PrepareReplace) t;
NodeId.serializer.serialize(transformation.replaced, out, version);
NodeId.serializer.serialize(transformation.replacement, out, version);
out.writeBoolean(transformation.joinTokenRing);
out.writeBoolean(transformation.streamData);
}
public PrepareReplace deserialize(DataInputPlus in, Version version) throws IOException
{
NodeId replaced = NodeId.serializer.deserialize(in, version);
NodeId replacement = NodeId.serializer.deserialize(in, version);
boolean joinTokenRing = in.readBoolean();
boolean streamData = in.readBoolean();
return new PrepareReplace(replaced,
replacement,
ClusterMetadataService.instance().placementProvider(),
joinTokenRing,
streamData);
}
public long serializedSize(Transformation t, Version version)
{
assert t instanceof PrepareReplace;
PrepareReplace transformation = (PrepareReplace) t;
return NodeId.serializer.serializedSize(transformation.replaced, version) +
NodeId.serializer.serializedSize(transformation.replacement, version) +
(TypeSizes.BOOL_SIZE * 2);
}
}
public static abstract class BaseReplaceTransformation extends ApplyPlacementDeltas
{
protected final NodeId replaced;
public BaseReplaceTransformation(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key unlockKey, boolean unlock)
{
super(replacement, delta, unlockKey, unlock);
this.replaced = replaced;
}
public NodeId replacement()
{
return nodeId;
}
public NodeId replaced()
{
return replaced;
}
}
// Analogous to ApplyPlacementDeltas.SerializerBase. The difference is that classes which extend
// BaseReplaceTransformation have 2 NodeIds, whereas the equivalents from PrepareJoin and PrepareLeave only have a single id
// TODO we can probably make the hierarchy of abstract/concrete transformations and associated serializers a bit less convoluted
public abstract static class SerializerBase<T extends BaseReplaceTransformation> implements AsymmetricMetadataSerializer<Transformation, T>
{
public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException
{
BaseReplaceTransformation change = (T) t;
NodeId.serializer.serialize(change.replaced(), out, version);
NodeId.serializer.serialize(change.replacement(), out, version);
PlacementDeltas.serializer.serialize(change.delta, out, version);
LockedRanges.Key.serializer.serialize(change.lockKey, out, version);
}
public T deserialize(DataInputPlus in, Version version) throws IOException
{
NodeId replaced = NodeId.serializer.deserialize(in, version);
NodeId replacement = NodeId.serializer.deserialize(in, version);
PlacementDeltas delta = PlacementDeltas.serializer.deserialize(in, version);
LockedRanges.Key lockKey = LockedRanges.Key.serializer.deserialize(in, version);
return construct(replaced, replacement, delta, lockKey);
}
public long serializedSize(Transformation t, Version version)
{
BaseReplaceTransformation change = (T) t;
return NodeId.serializer.serializedSize(change.replaced(), version) +
NodeId.serializer.serializedSize(change.replacement(), version) +
PlacementDeltas.serializer.serializedSize(change.delta, version) +
LockedRanges.Key.serializer.serializedSize(change.lockKey, version);
}
abstract T construct(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key lockKey);
}
// This is functionally identical to StartJoin. They only exist as distinct transformations for clarity.
public static class StartReplace extends BaseReplaceTransformation
{
public static final Serializer serializer = new Serializer();
public StartReplace(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key unlockKey)
{
super(replaced, replacement, delta, unlockKey, false);
}
@Override
public Kind kind()
{
return Kind.START_REPLACE;
}
@Override
public ClusterMetadata.Transformer transform(ClusterMetadata prev, ClusterMetadata.Transformer transformer)
{
return transformer.withNodeState(replacement(), NodeState.BOOT_REPLACING)
.with(prev.inProgressSequences.with(nodeId, (BootstrapAndReplace plan) -> plan.advance(prev.nextEpoch())));
}
@Override
public String toString()
{
return "StartReplace{" +
", delta=" + delta +
", lockKey=" + lockKey +
", unlock=" + unlock +
", replaced=" + replaced +
", replacement=" + replacement() +
'}';
}
public static final class Serializer extends PrepareReplace.SerializerBase<StartReplace>
{
@Override
public StartReplace construct(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key unlockKey)
{
return new StartReplace(replaced, replacement, delta, unlockKey);
}
}
}
public static class MidReplace extends BaseReplaceTransformation
{
public static final Serializer serializer = new Serializer();
public MidReplace(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key unlockKey)
{
super(replaced, replacement, delta, unlockKey, false);
}
@Override
public Kind kind()
{
return Kind.MID_REPLACE;
}
@Override
public ClusterMetadata.Transformer transform(ClusterMetadata prev, ClusterMetadata.Transformer transformer)
{
return transformer
.with(prev.inProgressSequences.with(nodeId, (plan) -> plan.advance(prev.nextEpoch())));
}
@Override
public String toString()
{
return "MidReplace{" +
", delta=" + delta +
", lockKey=" + lockKey +
", unlock=" + unlock +
", replaced=" + replaced +
", replacement=" + replacement() +
'}';
}
public static final class Serializer extends PrepareReplace.SerializerBase<MidReplace>
{
@Override
MidReplace construct(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key lockKey)
{
return new MidReplace(replaced, replacement, delta, lockKey);
}
}
}
public static class FinishReplace extends BaseReplaceTransformation
{
public static final Serializer serializer = new Serializer();
public FinishReplace(NodeId replaced,
NodeId replacement,
PlacementDeltas delta,
LockedRanges.Key unlockKey)
{
super(replaced, replacement, delta, unlockKey, true);
}
@Override
public Kind kind()
{
return Kind.FINISH_REPLACE;
}
@Override
public ClusterMetadata.Transformer transform(ClusterMetadata prev, ClusterMetadata.Transformer transformer)
{
return transformer.replaced(replaced, replacement())
.with(prev.inProgressSequences.without(nodeId));
}
@Override
public String toString()
{
return "FinishReplace{" +
"replaced=" + replaced +
", replacement=" + replacement() +
", delta=" + delta +
", unlockKey=" + lockKey +
'}';
}
public static final class Serializer extends PrepareReplace.SerializerBase<FinishReplace>
{
@Override
FinishReplace construct(NodeId replaced, NodeId replacement, PlacementDeltas delta, LockedRanges.Key lockKey)
{
return new FinishReplace(replaced, replacement, delta, lockKey);
}
}
}
}