blob: 5b9629a89160b40780713001ad94333d9c42ce6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
public class GossipSettlesTest extends TestBaseImpl
{
@Test
public void testGossipSettles() throws Throwable
{
/* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */
try (Cluster cluster = builder().withNodes(3)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.withSubnet(1)
.start())
{
// Verify the 4.0 WithPort versions of status reporting methods match their InetAddress
// counterparts. Disable Gossip first to prevent any bump in heartbeats that would
// invalidate the comparison. Compare the newer WithPort versions by adding the
// storage port to IP addresses in keys/values/strings as appropriate.
cluster.forEach(i -> i.runOnInstance(() -> { Gossiper.instance.stop(); }));
cluster.get(1).runOnInstance(() -> {
// First prove that the storage port is added
Assert.assertEquals("stuff 127.0.0.1:7012 morestuff 127.0.0.2:7012", addStoragePortToIP("stuff 127.0.0.1 morestuff 127.0.0.2"));
FailureDetector fd = ((FailureDetector) FailureDetector.instance);
Assert.assertEquals(addStoragePortToInstanceName(fd.getAllEndpointStates(false)),
fd.getAllEndpointStates(true));
Assert.assertEquals(addPortToKeys(fd.getSimpleStates()), fd.getSimpleStatesWithPort());
StorageProxy sp = StorageProxy.instance;
Assert.assertEquals(addPortToSchemaVersions(sp.getSchemaVersions()), sp.getSchemaVersionsWithPort());
StorageService ss = StorageService.instance;
Assert.assertEquals(addPortToValues(ss.getTokenToEndpointMap()), ss.getTokenToEndpointWithPortMap());
Assert.assertEquals(addPortToKeys(ss.getEndpointToHostId()), ss.getEndpointWithPortToHostId());
Assert.assertEquals(addPortToValues(ss.getHostIdToEndpoint()), ss.getHostIdToEndpointWithPort());
Assert.assertEquals(addPortToKeys(ss.getLoadMap()), ss.getLoadMapWithPort());
Assert.assertEquals(addPortToList(ss.getLiveNodes()), ss.getLiveNodesWithPort());
List<String> naturalEndpointsAddedPort = ss.getNaturalEndpoints(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
SystemDistributedKeyspace.VIEW_BUILD_STATUS, "dummy").stream()
.map(e -> addStoragePortToIP(e.getHostAddress())).collect(Collectors.toList());
Assert.assertEquals(naturalEndpointsAddedPort,
ss.getNaturalEndpointsWithPort(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
SystemDistributedKeyspace.VIEW_BUILD_STATUS, "dummy"));
naturalEndpointsAddedPort = ss.getNaturalEndpoints(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, ByteBufferUtil.EMPTY_BYTE_BUFFER).stream()
.map(e -> addStoragePortToIP(e.getHostAddress())).collect(Collectors.toList());
Assert.assertEquals(naturalEndpointsAddedPort,
ss.getNaturalEndpointsWithPort(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, ByteBufferUtil.EMPTY_BYTE_BUFFER));
// Difference in key type... convert to String and add the port to the older format
Map<String, Float> getOwnershipKeyAddedPort = ss.getOwnership().entrySet().stream()
.collect(Collectors.<Map.Entry<InetAddress, Float>, String, Float>toMap(
e -> addStoragePortToIP(e.getKey().toString()),
Map.Entry::getValue));
Assert.assertEquals(getOwnershipKeyAddedPort, ss.getOwnershipWithPort());
MessagingService ms = MessagingService.instance();
Assert.assertEquals(addPortToKeys(ms.getTimeoutsPerHost()), ms.getTimeoutsPerHostWithPort());
Assert.assertEquals(addPortToKeys(ms.getLargeMessagePendingTasks()), ms.getLargeMessagePendingTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getLargeMessageCompletedTasks()), ms.getLargeMessageCompletedTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getLargeMessageDroppedTasks()), ms.getLargeMessageDroppedTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getSmallMessagePendingTasks()), ms.getSmallMessagePendingTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getSmallMessageCompletedTasks()), ms.getSmallMessageCompletedTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getSmallMessageDroppedTasks()), ms.getSmallMessageDroppedTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getGossipMessagePendingTasks()), ms.getGossipMessagePendingTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getGossipMessageCompletedTasks()), ms.getGossipMessageCompletedTasksWithPort());
Assert.assertEquals(addPortToKeys(ms.getGossipMessageDroppedTasks()), ms.getGossipMessageDroppedTasksWithPort());
});
}
}
final static Pattern IP4_ADDRESS = Pattern.compile("(127\\.0\\.\\d{1,3}\\.\\d{1,3})");
static String addStoragePortToIP(String s)
{
return IP4_ADDRESS.matcher(s).replaceAll("$1:" + DatabaseDescriptor.getStoragePort());
}
static String addStoragePortToInstanceName(String s)
{
return Arrays.stream(s.split("\n")).map(line -> {
if (line.startsWith(" "))
{
return line;
}
else // Host header line
{
return addStoragePortToIP(line);
}
}).collect(Collectors.joining("\n", "", "\n")); // to match final blank line
}
static <V> Map<String, V> addPortToKeys(Map<String, V> source)
{
return source.entrySet().stream().collect(Collectors.toMap(entry -> addStoragePortToIP(entry.getKey()),
Map.Entry::getValue));
}
static <K> Map<K, String> addPortToValues(Map<K, String> source)
{
return source.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> addStoragePortToIP(entry.getValue())));
}
static List<String> addPortToList(List<String> list)
{
return list.stream().map(GossipSettlesTest::addStoragePortToIP).collect(Collectors.toList());
}
static Map<String, List<String>> addPortToSchemaVersions(Map<String, List<String>> source)
{
return source.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
hostAndPortEntry -> addPortToList(hostAndPortEntry.getValue())));
}
}