blob: 55ab20a2192db7c0da923dd481bc835cab078f2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.cluster;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.PendingRangeMaps;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.simulator.Action;
import org.apache.cassandra.simulator.ActionList;
import org.apache.cassandra.simulator.ActionListener;
import org.apache.cassandra.simulator.ActionPlan;
import org.apache.cassandra.simulator.Actions;
import org.apache.cassandra.simulator.Debug;
import org.apache.cassandra.simulator.OrderOn.StrictSequential;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_SERIAL;
import static org.apache.cassandra.simulator.Debug.EventType.CLUSTER;
import static org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange.CHANGE_RF;
import static org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange.JOIN;
import static org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange.LEAVE;
import static org.apache.cassandra.simulator.cluster.ClusterReliableQueryAction.schemaChange;
public class KeyspaceActions extends ClusterActions
{
final String keyspace;
final String table;
final String createTableCql;
final ConsistencyLevel serialConsistency;
final int[] primaryKeys;
final EnumSet<TopologyChange> ops = EnumSet.noneOf(TopologyChange.class);
final NodeLookup nodeLookup;
final int[] minRf, initialRf, maxRf;
final int[] membersOfQuorumDcs;
// working state
final NodesByDc all;
final NodesByDc prejoin;
final NodesByDc joined;
final NodesByDc left;
final int[] currentRf;
final TokenMetadata tokenMetadata = new TokenMetadata(snitch.get());
Topology topology;
boolean haveChangedVariant;
int topologyChangeCount = 0;
public KeyspaceActions(SimulatedSystems simulated,
String keyspace, String table, String createTableCql,
Cluster cluster,
Options options,
ConsistencyLevel serialConsistency,
ClusterActionListener listener,
int[] primaryKeys,
Debug debug)
{
super(simulated, cluster, options, listener, debug);
this.keyspace = keyspace;
this.table = table;
this.createTableCql = createTableCql;
this.primaryKeys = primaryKeys;
this.serialConsistency = serialConsistency;
this.nodeLookup = simulated.snitch;
int[] dcSizes = new int[options.initialRf.length];
for (int dc : nodeLookup.nodeToDc)
++dcSizes[dc];
this.all = new NodesByDc(nodeLookup, dcSizes);
this.prejoin = new NodesByDc(nodeLookup, dcSizes);
this.joined = new NodesByDc(nodeLookup, dcSizes);
this.left = new NodesByDc(nodeLookup, dcSizes);
for (int i = 1 ; i <= nodeLookup.nodeToDc.length ; ++i)
{
this.prejoin.add(i);
this.all.add(i);
}
minRf = options.minRf;
initialRf = options.initialRf;
maxRf = options.maxRf;
currentRf = initialRf.clone();
membersOfQuorumDcs = serialConsistency == LOCAL_SERIAL ? all.dcs[0] : all.toArray();
ops.addAll(Arrays.asList(options.allChoices.options));
}
public ActionPlan plan()
{
ActionList pre = ActionList.of(pre(createKeyspaceCql(keyspace), createTableCql));
ActionList interleave = stream();
ActionList post = ActionList.empty();
return new ActionPlan(pre, singletonList(interleave), post);
}
@SuppressWarnings("StringConcatenationInLoop")
private String createKeyspaceCql(String keyspace)
{
String createKeyspaceCql = "CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'NetworkTopologyStrategy'";
for (int i = 0 ; i < options.initialRf.length ; ++i)
createKeyspaceCql += ", '" + snitch.nameOfDc(i) + "': " + options.initialRf[i];
createKeyspaceCql += "};";
return createKeyspaceCql;
}
private Action pre(String createKeyspaceCql, String createTableCql)
{
// randomise initial cluster, and return action to initialise it
for (int dc = 0 ; dc < options.initialRf.length ; ++dc)
{
for (int i = 0 ; i < options.initialRf[dc] ; ++i)
{
int join = prejoin.removeRandom(random, dc);
joined.add(join);
tokenMetadata.updateNormalToken(tokenOf(join), inet(join));
}
}
updateTopology(recomputeTopology());
int[] joined = this.joined.toArray();
int[] prejoin = this.prejoin.toArray();
return Actions.StrictAction.of("Initialize", () -> {
return ActionList.of(initializeCluster(joined, prejoin),
schemaChange("Create Keyspace", KeyspaceActions.this, 1, createKeyspaceCql),
schemaChange("Create Table", KeyspaceActions.this, 1, createTableCql));
});
}
@SuppressWarnings("StatementWithEmptyBody")
private ActionList stream()
{
ActionListener listener = debug.debug(CLUSTER, time, cluster, keyspace, null);
if (listener == null)
return ActionList.of(Actions.stream(new StrictSequential("Cluster Actions"), this::next));
return ActionList.of(Actions.stream(new StrictSequential("Cluster Actions"), () -> {
Action action = next();
if (action != null)
action.register(listener);
return action;
}));
}
private Action next()
{
if (options.topologyChangeLimit >= 0 && topologyChangeCount++ > options.topologyChangeLimit)
return null;
while (!ops.isEmpty() && (!prejoin.isEmpty() || joined.size() > sum(minRf)))
{
if (options.changePaxosVariantTo != null && !haveChangedVariant && random.decide(1f / (1 + prejoin.size())))
{
haveChangedVariant = true;
return schedule(new OnClusterSetPaxosVariant(KeyspaceActions.this, options.changePaxosVariantTo));
}
// pick a dc
int dc = random.uniform(0, currentRf.length);
// try to pick an action (and simply loop again if we cannot for this dc)
TopologyChange next;
if (prejoin.size(dc) > 0 && joined.size(dc) > currentRf[dc]) next = options.allChoices.choose(random);
else if (prejoin.size(dc) > 0 && ops.contains(JOIN)) next = options.choicesNoLeave.choose(random);
else if (joined.size(dc) > currentRf[dc] && ops.contains(LEAVE)) next = options.choicesNoJoin.choose(random);
else if (joined.size(dc) > minRf[dc]) next = CHANGE_RF;
else continue;
// TODO (feature): introduce some time period between cluster actions
switch (next)
{
case REPLACE:
{
Topology before = topology;
int join = prejoin.removeRandom(random, dc);
int leave = joined.selectRandom(random, dc);
joined.add(join);
joined.remove(leave);
left.add(leave);
nodeLookup.setTokenOf(join, nodeLookup.tokenOf(leave));
Collection<Token> token = singleton(tokenOf(leave));
tokenMetadata.addReplaceTokens(token, inet(join), inet(leave));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology during = recomputeTopology();
updateTopology(during);
tokenMetadata.updateNormalTokens(token, inet(join));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology after = recomputeTopology();
Action action = new OnClusterReplace(KeyspaceActions.this, before, during, after, leave, join);
return scheduleAndUpdateTopologyOnCompletion(action, after);
// if replication factor is 2, cannot perform safe replacements
// however can have operations that began earlier during RF=2
// so need to introduce some concept of barriers/ordering/sync points
}
case JOIN:
{
Topology before = topology;
int join = prejoin.removeRandom(random, dc);
joined.add(join);
Collection<Token> token = singleton(tokenOf(join));
tokenMetadata.addBootstrapTokens(token, inet(join));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology during = recomputeTopology();
updateTopology(during);
tokenMetadata.updateNormalTokens(token, inet(join));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology after = recomputeTopology();
Action action = new OnClusterJoin(KeyspaceActions.this, before, during, after, join);
return scheduleAndUpdateTopologyOnCompletion(action, after);
}
case LEAVE:
{
Topology before = topology;
int leave = joined.removeRandom(random, dc);
left.add(leave);
tokenMetadata.addLeavingEndpoint(inet(leave));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology during = recomputeTopology();
updateTopology(during);
tokenMetadata.removeEndpoint(inet(leave));
tokenMetadata.unsafeCalculatePendingRanges(strategy(), keyspace);
Topology after = recomputeTopology();
Action action = new OnClusterLeave(KeyspaceActions.this, before, during, after, leave);
return scheduleAndUpdateTopologyOnCompletion(action, after);
}
case CHANGE_RF:
if (maxRf[dc] == minRf[dc]) {} // cannot perform RF changes at all
if (currentRf[dc] == minRf[dc] && joined.size(dc) == currentRf[dc]) {} // can do nothing until joined grows
else
{
boolean increase;
if (currentRf[dc] == minRf[dc]) // can only grow
{ ++currentRf[dc]; increase = true;}
else if (currentRf[dc] == joined.size(dc) || currentRf[dc] == maxRf[dc]) // can only shrink, and we know currentRf > minRf
{ --currentRf[dc]; increase = false; }
else if (random.decide(0.5f)) // can do either
{ --currentRf[dc]; increase = false; }
else
{ ++currentRf[dc]; increase = true; }
// this isn't used on 4.0+ nodes, but no harm in supplying it anyway
long timestamp = time.nextGlobalMonotonicMicros();
int coordinator = joined.selectRandom(random, dc);
Topology before = topology;
Topology after = recomputeTopology();
return scheduleAndUpdateTopologyOnCompletion(new OnClusterChangeRf(KeyspaceActions.this, timestamp, coordinator, before, after, increase), after);
}
}
}
if (options.changePaxosVariantTo != null && !haveChangedVariant)
{
haveChangedVariant = true;
return schedule(new OnClusterSetPaxosVariant(KeyspaceActions.this, options.changePaxosVariantTo));
}
return null;
}
private Action schedule(Action action)
{
action.setDeadline(time, time.nanoTime() + options.topologyChangeInterval.get(random));
return action;
}
private Action scheduleAndUpdateTopologyOnCompletion(Action action, Topology newTopology)
{
action.register(new ActionListener()
{
@Override
public void before(Action action, Before before)
{
if (before == Before.EXECUTE)
time.forbidDiscontinuities();
}
@Override
public void transitivelyAfter(Action finished)
{
updateTopology(newTopology);
time.permitDiscontinuities();
}
});
return schedule(action);
}
void updateTopology(Topology newTopology)
{
topology = newTopology;
announce(topology);
}
private Topology recomputeTopology()
{
AbstractReplicationStrategy strategy = strategy();
Map<InetSocketAddress, Integer> lookup = Cluster.getUniqueAddressLookup(cluster, i -> i.config().num());
int[][] replicasForKey = new int[primaryKeys.length][];
int[][] pendingReplicasForKey = new int[primaryKeys.length][];
for (int i = 0 ; i < primaryKeys.length ; ++i)
{
int primaryKey = primaryKeys[i];
Token token = new Murmur3Partitioner().getToken(Int32Type.instance.decompose(primaryKey));
replicasForKey[i] = strategy.calculateNaturalReplicas(token, tokenMetadata)
.endpointList().stream().mapToInt(lookup::get).toArray();
PendingRangeMaps pendingRanges = tokenMetadata.getPendingRanges(keyspace);
EndpointsForToken pendingEndpoints = pendingRanges == null ? null : pendingRanges.pendingEndpointsFor(token);
if (pendingEndpoints == null) pendingReplicasForKey[i] = new int[0];
else pendingReplicasForKey[i] = pendingEndpoints.endpointList().stream().mapToInt(lookup::get).toArray();
}
int[] membersOfRing = joined.toArray();
long[] membersOfRingTokens = IntStream.of(membersOfRing).mapToLong(nodeLookup::tokenOf).toArray();
return new Topology(primaryKeys, membersOfRing, membersOfRingTokens, membersOfQuorum(), currentRf.clone(),
quorumRf(), replicasForKey, pendingReplicasForKey);
}
private int quorumRf()
{
if (serialConsistency == LOCAL_SERIAL)
return currentRf[0];
return sum(currentRf);
}
private int[] membersOfQuorum()
{
if (serialConsistency == LOCAL_SERIAL)
return joined.toArray(0);
return joined.toArray();
}
private static int sum(int[] vs)
{
int sum = 0;
for (int v : vs)
sum += v;
return sum;
}
private InetAddressAndPort inet(int node)
{
return InetAddressAndPort.getByAddress(cluster.get(node).config().broadcastAddress());
}
AbstractReplicationStrategy strategy()
{
Map<String, String> rf = new HashMap<>();
for (int i = 0 ; i < snitch.dcCount() ; ++i)
rf.put(snitch.nameOfDc(i), Integer.toString(currentRf[i]));
return new NetworkTopologyStrategy(keyspace, tokenMetadata, snitch.get(), rf);
}
private Token tokenOf(int node)
{
return new LongToken(Long.parseLong(cluster.get(nodeLookup.tokenOf(node)).config().getString("initial_token")));
}
}