blob: 8afdd3ed36b23c9be4f250c6ba8cf4d499489d5a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.simulator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.TriFunction;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static java.util.function.Function.identity;
import static org.apache.cassandra.simulator.Action.Modifier.INFO;
import static org.apache.cassandra.simulator.Action.Modifier.WAKEUP;
import static org.apache.cassandra.simulator.ActionListener.runAfter;
import static org.apache.cassandra.simulator.ActionListener.runAfterAndTransitivelyAfter;
import static org.apache.cassandra.simulator.ActionListener.recursive;
import static org.apache.cassandra.simulator.Debug.EventType.*;
import static org.apache.cassandra.simulator.Debug.Info.LOG;
import static org.apache.cassandra.simulator.Debug.Level.*;
import static org.apache.cassandra.simulator.paxos.Ballots.paxosDebugInfo;
// TODO (feature): move logging to a depth parameter
// TODO (feature): log only deltas for schema/cluster data
public class Debug
private static final Logger logger = LoggerFactory.getLogger(Debug.class);
public enum EventType { PARTITION, CLUSTER }
public enum Level
private static final Level[] LEVELS = values();
public enum Info
public final EventType[] defaultEventTypes;
Info(EventType ... defaultEventTypes)
this.defaultEventTypes = defaultEventTypes;
public static class Levels
private final EnumMap<EventType, Level> levels;
public Levels(EnumMap<EventType, Level> levels)
this.levels = levels;
public Levels(Level level, EventType ... types)
this.levels = new EnumMap<>(EventType.class);
for (EventType type : types)
this.levels.put(type, level);
public Levels(int partition, int cluster)
this.levels = new EnumMap<>(EventType.class);
if (partition > 0) this.levels.put(PARTITION, Level.LEVELS[partition - 1]);
if (cluster > 0) this.levels.put(CLUSTER, Level.LEVELS[cluster - 1]);
Level get(EventType type)
return levels.get(type);
boolean anyMatch(Predicate<Level> test)
return levels.values().stream().anyMatch(test);
private final EnumMap<Info, Levels> levels;
public final int[] primaryKeys;
public Debug()
this(new EnumMap<>(Info.class), null);
public Debug(Map<Info, Levels> levels, int[] primaryKeys)
this.levels = new EnumMap<>(levels);
this.primaryKeys = primaryKeys;
public ActionListener debug(EventType type, SimulatedTime time, Cluster cluster, String keyspace, Integer primaryKey)
List<ActionListener> listeners = new ArrayList<>();
for (Map.Entry<Info, Levels> e : levels.entrySet())
Info info = e.getKey();
Level level = e.getValue().get(type);
if (level == null) continue;
ActionListener listener;
if (info == LOG)
Function<ActionListener, ActionListener> adapt = type == CLUSTER ? LogTermination::new : identity();
switch (level)
default: throw new AssertionError();
case PLANNED: listener = adapt.apply(new LogOne(time, false)); break;
case CONSEQUENCES: case ALL: listener = adapt.apply(recursive(new LogOne(time, true))); break;
else if (keyspace != null)
Consumer<Action> debug;
switch (info)
default: throw new AssertionError();
case GOSSIP: debug = debugGossip(cluster); break;
case RF: debug = debugRf(cluster, keyspace); break;
case RING: debug = debugRing(cluster, keyspace); break;
case PAXOS: debug = forKeys(cluster, keyspace, primaryKey, Debug::debugPaxos); break;
case OWNERSHIP: debug = forKeys(cluster, keyspace, primaryKey, Debug::debugOwnership); break;
switch (level)
default: throw new AssertionError();
case PLANNED: listener = type == CLUSTER ? runAfterAndTransitivelyAfter(debug) : runAfter(debug); break;
case CONSEQUENCES: listener = recursive(runAfter(ignoreWakeupAndLogEvents(debug))); break;
case ALL: listener = recursive(runAfter(ignoreLogEvents(debug))); break;
else continue;
if (listeners.isEmpty())
return null;
return new ActionListener.Combined(listeners);
public boolean isOn(Info info)
return isOn(info, PLANNED);
public boolean isOn(Info info, Level level)
Levels levels = this.levels.get(info);
if (levels == null) return false;
return levels.anyMatch(test -> level.compareTo(test) >= 0);
private static class LogOne implements ActionListener
final SimulatedTime time;
final boolean logConsequences;
private LogOne(SimulatedTime time, boolean logConsequences)
this.time = time;
this.logConsequences = logConsequences;
public void before(Action action, Before before)
if (logger.isWarnEnabled()) // invoke toString() eagerly to ensure we have the task's descriptin
logger.warn(String.format("%6ds %s %s", TimeUnit.NANOSECONDS.toSeconds(time.nanoTime()), before, action));
public void consequences(ActionList consequences)
if (logConsequences && !consequences.isEmpty() && logger.isWarnEnabled())
logger.warn(String.format("%6ds Next: %s", TimeUnit.NANOSECONDS.toSeconds(time.nanoTime()), consequences));
private static class LogTermination extends ActionListener.Wrapped
public LogTermination(ActionListener wrap)
public void transitivelyAfter(Action finished)
logger.warn("Terminated {}", finished);
private static Consumer<Action> ignoreWakeupAndLogEvents(Consumer<Action> consumer)
return action -> {
if (! && !
private static Consumer<Action> ignoreLogEvents(Consumer<Action> consumer)
return action -> {
if (!
private Consumer<Action> debugGossip(Cluster cluster)
return ignore -> {
cluster.forEach(i -> i.unsafeRunOnThisThread(() -> {
for (InetAddressAndPort ep : Gossiper.instance.getLiveMembers())
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
logger.warn("Gossip {}: {} {}", ep, epState.isAlive(), epState.states().stream()
.map(e -> e.getKey().toString() + "=(" + e.getValue().value + ',' + e.getValue().version + ')')
.collect(Collectors.joining(", ", "[", "]")));
private Consumer<Action> forKeys(Cluster cluster, String keyspace, @Nullable Integer specificPrimaryKey, TriFunction<Cluster, String, Integer, Consumer<Action>> factory)
if (specificPrimaryKey != null) return factory.apply(cluster, keyspace, specificPrimaryKey);
else return forEachKey(cluster, keyspace, primaryKeys, Debug::debugPaxos);
public static Consumer<Action> forEachKey(Cluster cluster, String keyspace, int[] primaryKeys, TriFunction<Cluster, String, Integer, Consumer<Action>> factory)
Consumer<Action>[] eachKey = new Consumer[primaryKeys.length];
for (int i = 0 ; i < primaryKeys.length ; ++i)
eachKey[i] = factory.apply(cluster, keyspace, primaryKeys[i]);
return action -> {
for (Consumer<Action> run : eachKey)
public static Consumer<Action> debugPaxos(Cluster cluster, String keyspace, int primaryKey)
return ignore -> {
for (int node = 1 ; node <= cluster.size() ; ++node)
cluster.get(node).unsafeAcceptOnThisThread((num, pkint) -> {
TableMetadata metadata ="tbl").metadata.get();
ByteBuffer pkbb = Int32Type.instance.decompose(pkint);
DecoratedKey key = new BufferDecoratedKey(DatabaseDescriptor.getPartitioner().getToken(pkbb), pkbb);
logger.warn("node{}({}): {}", num, primaryKey, paxosDebugInfo(key, metadata, FBUtilities.nowInSeconds()));
catch (Throwable t)
logger.warn("node{}({})", num, primaryKey, t);
}, node, primaryKey);
public static Consumer<Action> debugRf(Cluster cluster, String keyspace)
return ignore -> {
cluster.forEach(i -> i.unsafeRunOnThisThread(() -> {
logger.warn("{} {}",
Schema.instance.getKeyspaceMetadata(keyspace) == null ? "" : Schema.instance.getKeyspaceMetadata(keyspace).params.replication.toString(),
Schema.instance.getKeyspaceMetadata(keyspace) == null ? "" :;
public static Consumer<Action> debugOwnership(Cluster cluster, String keyspace, int primaryKey)
return ignore -> {
for (int node = 1 ; node <= cluster.size() ; ++node)
logger.warn("node{}({}): {}", node, primaryKey, cluster.get(node).unsafeApplyOnThisThread(v -> {
return ReplicaLayout.forTokenWriteLiveAndDown(, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))).all().endpointList().toString();
catch (Throwable t)
return "Error";
}, primaryKey));
public static Consumer<Action> debugRing(Cluster cluster, String keyspace)
return ignore -> cluster.forEach(i -> i.unsafeRunOnThisThread(() -> {
if (Schema.instance.getKeyspaceMetadata(keyspace) != null)
logger.warn("{}", StorageService.instance.getTokenMetadata().toString());