blob: 8b3c9bc46610ca83e34e59ac3e7f76c93c69d3eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.analytics;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.collect.Range;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.sidecar.common.server.JmxClient;
import org.apache.cassandra.sidecar.testing.QualifiedName;
import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
import org.apache.cassandra.spark.bulkwriter.Tokenizer;
import org.apache.cassandra.spark.common.schema.ColumnType;
import org.apache.cassandra.spark.common.schema.ColumnTypes;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
import scala.Tuple2;
import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Base class for resiliency tests. Contains helper methods for data generation and validation
*/
public abstract class ResiliencyTestBase extends SharedClusterSparkIntegrationTestBase
{
public static final String QUERY_ALL_ROWS = "SELECT * FROM %s";
public Set<String> getDataForRange(Range<BigInteger> range, int rowCount)
{
// Iterate through all data entries; filter only entries that belong to range; convert to strings
return generateExpectedData(rowCount).stream()
.filter(t -> range.contains(t._1().getToken()))
.map(t -> t._2()[0] + ":" + t._2()[1] + ":" + t._2()[2])
.collect(Collectors.toSet());
}
public List<Tuple2<DecoratedKey, Object[]>> generateExpectedData(int rowCount)
{
// "create table if not exists %s (id int, course text, marks int, primary key (id));";
List<ColumnType<?>> columnTypes = Collections.singletonList(ColumnTypes.INT);
Tokenizer tokenizer = new Tokenizer(Collections.singletonList(0),
Collections.singletonList("id"),
columnTypes,
true
);
return IntStream.range(0, rowCount).mapToObj(recordNum -> {
Object[] columns = new Object[]
{
recordNum, "course" + recordNum, recordNum
};
return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
}).collect(Collectors.toList());
}
public Map<IInstance, Set<String>> getInstanceData(List<? extends IInstance> instances,
boolean isPending, int rowCount)
{
return instances.stream().collect(Collectors.toMap(Function.identity(),
i -> filterTokenRangeData(rangesForInstance(i, isPending), rowCount)));
}
public Set<String> filterTokenRangeData(List<Range<BigInteger>> ranges, int rowCount)
{
return ranges.stream()
.map((Range<BigInteger> range) -> getDataForRange(range, rowCount))
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
/**
* Returns the expected set of rows as strings for each instance in the cluster
*/
public Map<IInstance, Set<String>> generateExpectedInstanceData(ICluster<? extends IInstance> cluster,
List<? extends IInstance> pendingNodes,
int rowCount)
{
List<IInstance> instances = cluster.stream().collect(Collectors.toList());
Map<IInstance, Set<String>> expectedInstanceData = getInstanceData(instances, false, rowCount);
// Use pending ranges to get data for each transitioning instance
Map<IInstance, Set<String>> transitioningInstanceData = getInstanceData(pendingNodes, true, rowCount);
expectedInstanceData.putAll(transitioningInstanceData.entrySet()
.stream()
.filter(e -> !e.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
return expectedInstanceData;
}
protected void validateData(QualifiedName table, ConsistencyLevel readCL, int rowCount)
{
String query = String.format(QUERY_ALL_ROWS, table);
Set<String> actualEntries =
Arrays.stream(cluster.coordinator(1)
.execute(query, mapConsistencyLevel(readCL)))
.map((Object[] columns) -> String.format("%s:%s:%s", columns[0], columns[1], columns[2]))
.collect(Collectors.toSet());
assertThat(actualEntries.size()).isEqualTo(rowCount);
IntStream.range(0, rowCount)
.forEach(i -> actualEntries.remove(i + ":course" + i + ":" + i));
assertThat(actualEntries).isEmpty();
}
protected static QualifiedName uniqueTestTableFullName(String keyspace, Object[] arguments)
{
TestConsistencyLevel cl = (TestConsistencyLevel) arguments[0];
return uniqueTestTableFullName(keyspace, cl.readCL, cl.writeCL);
}
protected static QualifiedName uniqueTestTableFullName(String keyspace, ConsistencyLevel readCL, ConsistencyLevel writeCL)
{
String tableName = String.format("r_%s__w_%s", readCL, writeCL).toLowerCase();
return new QualifiedName(keyspace, tableName);
}
public void validateNodeSpecificData(QualifiedName table,
Map<? extends IInstance, Set<String>> expectedInstanceData)
{
validateNodeSpecificData(table, expectedInstanceData, true);
}
public void validateNodeSpecificData(QualifiedName table,
Map<? extends IInstance, Set<String>> expectedInstanceData,
boolean hasNewNodes)
{
for (IInstance instance : expectedInstanceData.keySet())
{
SimpleQueryResult qr = instance.executeInternalWithResult(String.format(QUERY_ALL_ROWS, table));
Set<String> rows = new HashSet<>();
while (qr.hasNext())
{
Row row = qr.next();
int id = row.getInteger("id");
String course = row.getString("course");
int marks = row.getInteger("marks");
rows.add(id + ":" + course + ":" + marks);
}
if (hasNewNodes)
{
assertThat(rows).containsExactlyInAnyOrderElementsOf(expectedInstanceData.get(instance));
}
else
{
assertThat(rows).containsAll(expectedInstanceData.get(instance));
}
}
}
private List<Range<BigInteger>> rangesForInstance(IInstance instance, boolean isPending)
{
IInstanceConfig config = instance.config();
Map<List<String>, List<String>> ranges = null;
try (JmxClient client = wrapJmxClient(JmxClient.builder()
.host(config.broadcastAddress().getAddress().getHostAddress())
.port(config.jmxPort())))
{
SSProxy ss = client.proxy(SSProxy.class, "org.apache.cassandra.db:type=StorageService");
ranges = isPending ? ss.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)
: ss.getRangeToEndpointWithPortMap(TEST_KEYSPACE);
}
catch (IOException exception)
{
logger.warn("Unable to close JMX client");
}
assertThat(ranges).isNotNull();
// filter ranges that belong to the instance
return ranges.entrySet()
.stream()
.filter(e -> e.getValue().contains(instance.broadcastAddress().getAddress().getHostAddress()
+ ":" + instance.broadcastAddress().getPort()))
.map(e -> unwrapRanges(e.getKey()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private List<Range<BigInteger>> unwrapRanges(List<String> range)
{
List<Range<BigInteger>> ranges = new ArrayList<Range<BigInteger>>();
BigInteger start = new BigInteger(range.get(0));
BigInteger end = new BigInteger(range.get(1));
if (start.compareTo(end) > 0)
{
ranges.add(Range.openClosed(start, BigInteger.valueOf(Long.MAX_VALUE)));
ranges.add(Range.openClosed(BigInteger.valueOf(Long.MIN_VALUE), end));
}
else
{
ranges.add(Range.openClosed(start, end));
}
return ranges;
}
private org.apache.cassandra.distributed.api.ConsistencyLevel mapConsistencyLevel(ConsistencyLevel cl)
{
return org.apache.cassandra.distributed.api.ConsistencyLevel.valueOf(cl.name());
}
public static ClusterBuilderConfiguration clusterConfig()
{
return new ClusterBuilderConfiguration();
}
/**
* An interface that pulls a method from the Cassandra Storage Service Proxy
*/
public interface SSProxy
{
Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace);
Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace);
}
}