blob: 519625e76194a01c54934d9f7bd747265c3f2254 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.upgrade;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.vdurmont.semver4j.Semver;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
public class MixedModeConsistencyTestBase extends UpgradeTestBase
{
protected static void testConsistency(Semver initial) throws Throwable
{
List<Tester> testers = new ArrayList<>();
testers.addAll(Tester.create(1, ALL));
testers.addAll(Tester.create(2, ALL, QUORUM));
testers.addAll(Tester.create(3, ALL, QUORUM, ONE));
new TestCase()
.nodes(3)
.nodesToUpgrade(1)
.upgradesToCurrentFrom(initial)
.withConfig(config -> config.set("read_request_timeout_in_ms", SECONDS.toMillis(30))
.set("write_request_timeout_in_ms", SECONDS.toMillis(30)))
.setup(cluster -> {
Tester.createTable(cluster);
for (Tester tester : testers)
tester.writeRows(cluster);
}).runAfterNodeUpgrade((cluster, node) -> {
for (Tester tester : testers)
tester.readRows(cluster);
}).run();
}
private static class Tester
{
private final int numWrittenReplicas;
private final ConsistencyLevel readConsistencyLevel;
private final UUID partitionKey;
private Tester(int numWrittenReplicas, ConsistencyLevel readConsistencyLevel)
{
this.numWrittenReplicas = numWrittenReplicas;
this.readConsistencyLevel = readConsistencyLevel;
partitionKey = UUID.randomUUID();
}
private static List<Tester> create(int numWrittenReplicas, ConsistencyLevel... readConsistencyLevels)
{
return Stream.of(readConsistencyLevels)
.map(readConsistencyLevel -> new Tester(numWrittenReplicas, readConsistencyLevel))
.collect(Collectors.toList());
}
private static void createTable(UpgradeableCluster cluster)
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k uuid, c int, v int, PRIMARY KEY (k, c))"));
}
private void writeRows(UpgradeableCluster cluster)
{
String query = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
for (int i = 1; i <= numWrittenReplicas; i++)
{
IUpgradeableInstance node = cluster.get(i);
node.executeInternal(query, partitionKey, 1, 10);
node.executeInternal(query, partitionKey, 2, 20);
node.executeInternal(query, partitionKey, 3, 30);
}
}
private void readRows(UpgradeableCluster cluster)
{
String query = withKeyspace("SELECT * FROM %s.t WHERE k = ?");
int coordinator = 1;
try
{
for (coordinator = 1; coordinator <= cluster.size(); coordinator++)
{
assertRows(cluster.coordinator(coordinator).execute(query, readConsistencyLevel, partitionKey),
row(partitionKey, 1, 10),
row(partitionKey, 2, 20),
row(partitionKey, 3, 30));
}
}
catch (Throwable t)
{
String format = "Unexpected error reading rows with %d written replicas, CL=%s and coordinator=%s";
throw new AssertionError(format(format, numWrittenReplicas, readConsistencyLevel, coordinator), t);
}
}
}
}