blob: ada44d3e41a67342239b6afdb2e0e0bb55fc95f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import org.junit.jupiter.api.Test;
import org.apache.cassandra.sidecar.common.data.RingEntry;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class RingInstanceTest
{
public static final String DATACENTER_1 = "DATACENTER1";
public static final String KEYSPACE = "KEYSPACE";
static List<RingInstance> getInstances(BigInteger[] tokens, String datacenter)
{
List<RingInstance> instances = new ArrayList<>();
for (int token = 0; token < tokens.length; token++)
{
instances.add(instance(tokens[token], "node-" + token, datacenter, "host"));
}
return instances;
}
static RingInstance instance(BigInteger token, String nodeName, String datacenter, String hostName)
{
return new RingInstance(new RingEntry.Builder()
.datacenter(datacenter)
.port(7000)
.address(nodeName)
.status("UP")
.state("NORMAL")
.token(token.toString())
.fqdn(nodeName)
.rack("rack")
.owns("")
.load("")
.hostId("")
.build());
}
static BigInteger[] getTokens(Partitioner partitioner, int nodes)
{
BigInteger[] tokens = new BigInteger[nodes];
for (int node = 0; node < nodes; node++)
{
tokens[node] = partitioner == Partitioner.Murmur3Partitioner
? getMurmur3Token(nodes, node)
: getRandomToken(nodes, node);
}
return tokens;
}
static BigInteger getRandomToken(int nodes, int index)
{
// ((2^127 / nodes) * i)
return ((BigInteger.valueOf(2).pow(127)).divide(BigInteger.valueOf(nodes))).multiply(BigInteger.valueOf(index));
}
static BigInteger getMurmur3Token(int nodes, int index)
{
// (((2^64 / n) * i) - 2^63)
return (((BigInteger.valueOf(2).pow(64)).divide(BigInteger.valueOf(nodes)))
.multiply(BigInteger.valueOf(index))).subtract(BigInteger.valueOf(2).pow(63));
}
private static ReplicaAwareFailureHandler<CassandraInstance> ntsStrategyHandler(Partitioner partitioner)
{
return new ReplicaAwareFailureHandler<>(partitioner);
}
private static Map<String, Integer> ntsOptions(String[] names, int[] values)
{
assert names.length == values.length : "Invalid replication options - array lengths do not match";
Map<String, Integer> options = Maps.newHashMap();
for (int name = 0; name < names.length; name++)
{
options.put(names[name], values[name]);
}
return options;
}
@Test
public void testEquals()
{
RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
assertEquals(instance1, instance2);
}
@Test
public void testHashCode()
{
RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
assertEquals(instance1.hashCode(), instance2.hashCode());
}
@Test
public void testEqualsAndHashcodeIgnoreHost()
{
RingInstance realInstance = new RingInstance(new RingEntry.Builder()
.datacenter("DATACENTER1")
.address("127.0.0.1")
.port(7000)
.rack("Rack")
.status("UP")
.state("NORMAL")
.load("0")
.token("0")
.fqdn("fqdn")
.hostId("")
.owns("")
.build());
RingInstance questionInstance = new RingInstance(new RingEntry.Builder()
.datacenter("DATACENTER1")
.address("127.0.0.1")
.port(7000)
.rack("Rack")
.status("UP")
.state("NORMAL")
.load("0")
.token("0")
.fqdn("fqdn")
.hostId("")
.owns("")
.build());
assertEquals(realInstance, questionInstance);
assertEquals(realInstance.hashCode(), questionInstance.hashCode());
}
@Test
public void multiMapWorksWithRingInstances()
{
RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0);
byte[] buffer;
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeObject(instance2);
out.close();
buffer = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
ObjectInputStream in = new ObjectInputStream(bais);
instance2 = (RingInstance) in.readObject();
in.close();
}
catch (final IOException | ClassNotFoundException exception)
{
throw new RuntimeException(exception);
}
Multimap<RingInstance, String> initialErrorMap = ArrayListMultimap.create();
initialErrorMap.put(instance1, "Failure 1");
Multimap<RingInstance, String> newErrorMap = ArrayListMultimap.create(initialErrorMap);
newErrorMap.put(instance2, "Failure2");
assertEquals(1, newErrorMap.keySet().size());
}
@Test
public void testMultipleFailuresSingleInstanceSucceedRF3()
{
Partitioner partitioner = Partitioner.Murmur3Partitioner;
BigInteger[] tokens = getTokens(partitioner, 5);
List<RingInstance> instances = getInstances(tokens, DATACENTER_1);
CassandraInstance instance1 = instances.get(0);
CassandraInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter(), "?");
ReplicaAwareFailureHandler<CassandraInstance> replicationFactor3 = ntsStrategyHandler(partitioner);
ReplicationFactor repFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
ntsOptions(new String[]{DATACENTER_1 }, new int[]{3 }));
Map<String, Set<String>> writeReplicas = instances.stream()
.collect(Collectors.groupingBy(CassandraInstance::datacenter,
Collectors.mapping(CassandraInstance::nodeName,
Collectors.toSet())));
Multimap<RingInstance, Range<BigInteger>> tokenRanges = TokenRangeMappingUtils.setupTokenRangeMap(partitioner, repFactor, instances);
TokenRangeMapping<RingInstance> tokenRange = new TokenRangeMapping<>(partitioner,
repFactor,
writeReplicas,
Collections.emptyMap(),
tokenRanges,
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet());
// This test proves that for any RF3 keyspace
replicationFactor3.addFailure(Range.openClosed(tokens[0], tokens[1]), instance1, "Complete Failure");
replicationFactor3.addFailure(Range.openClosed(tokens[0], tokens[0].add(BigInteger.ONE)), instance2, "Failure 1");
replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
replicationFactor3.getFailedEntries(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
assertFalse(replicationFactor3.hasFailed(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1));
}
}