blob: a32dfbc356e359acf032374a409399ba3a5da8b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.log;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.MetadataSnapshots;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.Unregister;
import org.apache.cassandra.utils.CassandraVersion;
import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES;
import static org.junit.Assert.assertEquals;
public class RegisterTest extends TestBaseImpl
{
@Test
public void testRegistrationIdempotence() throws Throwable
{
try (Cluster cluster = builder().withNodes(3)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(5))
.withConfig((config) -> config.with(Feature.NETWORK, Feature.GOSSIP))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(5, "dc0", "rack0"))
.createWithoutStarting())
{
// Make sure 2 and 3 do not race for ID
for (int i : new int[]{ 1,3,2 })
cluster.get(i).startup();
for (int i : new int[]{ 3, 2 })
{
cluster.get(i).runOnInstance(() -> {
ClusterMetadataService.instance().commit(new PrepareLeave(ClusterMetadata.current().myNodeId(),
true,
ClusterMetadataService.instance().placementProvider(),
LeaveStreams.Kind.UNBOOTSTRAP));
UnbootstrapAndLeave unbootstrapAndLeave = (UnbootstrapAndLeave) ClusterMetadata.current().inProgressSequences.get(ClusterMetadata.current().myNodeId());
ClusterMetadataService.instance().commit(unbootstrapAndLeave.startLeave);
ClusterMetadataService.instance().commit(unbootstrapAndLeave.midLeave);
ClusterMetadataService.instance().commit(unbootstrapAndLeave.finishLeave);
ClusterMetadataService.instance().commit(new Unregister(ClusterMetadata.current().myNodeId()));
});
cluster.get(1).runOnInstance(() -> {
ClusterMetadataService.instance().commit(TriggerSnapshot.instance);
});
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance newInstance = cluster.bootstrap(config);
newInstance.startup();
}
}
}
@Test
public void serializationVersionCeilingTest() throws Throwable
{
try (Cluster cluster = builder().withNodes(1)
.createWithoutStarting();
WithProperties prop = new WithProperties().set(TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES, "true"))
{
final String firstNodeEndpoint = "127.0.0.10";
cluster.get(1).startup();
cluster.get(1).runOnInstance(() -> {
try
{
// Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
NodeId oldNode = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
// Fake an upgrade of this node and assert we continue to serialize so that the one which only
// supports V0 can deserialize. In a real cluster it wouldn't happen exactly in this way (here the
// min serialization version actually goes backwards from CURRENT to V0 when we upgrade, which would
// not happen in a real cluster as we would never register like oldNode, with the current C* version
// but an older metadata version
CassandraVersion currentVersion = NodeVersion.CURRENT.cassandraVersion;
NodeVersion upgraded = new NodeVersion(new CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
NodeVersion.CURRENT_METADATA_VERSION);
ClusterMetadata metadata = ClusterMetadata.current();
NodeId id = metadata.myNodeId();
Startup startup = new Startup(id, metadata.directory.getNodeAddresses(id), upgraded);
ClusterMetadataService.instance().commit(startup);
// Doesn't matter which specific Transformation we use here, we're testing that the serializer uses
// the correct lower bound
Transformation t = new Register(NodeAddresses.current(), new Location("DC", "RACK"), NodeVersion.CURRENT);
try
{
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
Version.V0.asInt());
ByteBuffer bytes = t.kind().toVersionedBytes(t);
try (DataInputBuffer buf = new DataInputBuffer(bytes, true))
{
// Because ClusterMetadata.current().directory still contains oldNode we must serialize at
// the version it supports
assertEquals(Version.V0, Version.fromInt(buf.readUnsignedVInt32()));
}
// If we unregister oldNode, then the ceiling for serialization version will rise
ClusterMetadataService.instance().commit(new Unregister(oldNode));
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
NodeVersion.CURRENT_METADATA_VERSION.asInt());
bytes = t.kind().toVersionedBytes(t);
try (DataInputBuffer buf = new DataInputBuffer(bytes, true))
{
assertEquals(NodeVersion.CURRENT_METADATA_VERSION, Version.fromInt(buf.readUnsignedVInt32()));
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
});
}
}
@Test
public void replayLocallyFromV0Snapshot() throws Throwable
{
try (Cluster cluster = builder().withNodes(1)
.createWithoutStarting())
{
cluster.get(1).startup();
cluster.get(1).runOnInstance(() -> {
try
{
// Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
ClusterMetadataService.instance().commit(TriggerSnapshot.instance);
ClusterMetadata cm = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch);
cm.equals(ClusterMetadata.current());
});
}
}
}