blob: cae6eefef807ee4024c4eda5fa4da72ae01b4ed9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.HeartBeatState;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.PaxosRepair;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REQ;
import static org.apache.cassandra.net.Verb.PAXOS2_PROPOSE_REQ;
import static org.apache.cassandra.net.Verb.PAXOS2_REPAIR_REQ;
import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
import static org.apache.cassandra.net.Verb.READ_REQ;
public abstract class CASTestBase extends TestBaseImpl
{
private static final Logger logger = LoggerFactory.getLogger(CASTestBase.class);
static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0);
static String tableName()
{
return tableName("tbl");
}
static String tableName(String prefix)
{
return prefix + TABLE_COUNTER.getAndIncrement();
}
static void repair(Cluster cluster, String tableName, int pk, int repairWith, int repairWithout)
{
IMessageFilters.Filter filter = cluster.filters().verbs(
PAXOS2_REPAIR_REQ.id,
PAXOS2_PREPARE_REQ.id, PAXOS_PREPARE_REQ.id, READ_REQ.id).from(repairWith).to(repairWithout).drop();
cluster.get(repairWith).runOnInstance(() -> {
TableMetadata schema = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).metadata.get();
DecoratedKey key = schema.partitioner.decorateKey(Int32Type.instance.decompose(pk));
try
{
PaxosRepair.create(SERIAL, key, null, schema).start().await();
}
catch (Throwable t)
{
throw new RuntimeException(t);
}
});
filter.off();
}
static int pk(Cluster cluster, int lb, int ub)
{
return pk(cluster.get(lb), cluster.get(ub));
}
static int pk(IInstance lb, IInstance ub)
{
return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
}
static int pk(Token lb, Token ub)
{
int pk = 0;
Token pkt;
while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0)
++pk;
return pk;
}
int[] to(int ... nodes)
{
return nodes;
}
private static final IMessageFilters.Matcher LOG_DROPPED = (from, to, message) -> { logger.info("Dropping {} from {} to {}", Verb.fromId(message.verb()), from, to); return true; };
AutoCloseable drop(Cluster cluster, int from, int[] toPrepareAndRead, int[] toPropose, int[] toCommit)
{
IMessageFilters.Filter filter1 = cluster.filters().verbs(PAXOS2_PREPARE_REQ.id, PAXOS_PREPARE_REQ.id, READ_REQ.id).from(from).to(toPrepareAndRead).messagesMatching(LOG_DROPPED).drop();
IMessageFilters.Filter filter2 = cluster.filters().verbs(PAXOS2_PROPOSE_REQ.id, PAXOS_PROPOSE_REQ.id).from(from).to(toPropose).messagesMatching(LOG_DROPPED).drop();
IMessageFilters.Filter filter3 = cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(from).to(toCommit).messagesMatching(LOG_DROPPED).drop();
return () -> {
filter1.off();
filter2.off();
filter3.off();
};
}
AutoCloseable drop(Cluster cluster, int from, int[] toPrepare, int[] toRead, int[] toPropose, int[] toCommit)
{
IMessageFilters.Filter filter1 = cluster.filters().verbs(PAXOS2_PREPARE_REQ.id, PAXOS_PREPARE_REQ.id).from(from).to(toPrepare).drop();
IMessageFilters.Filter filter2 = cluster.filters().verbs(READ_REQ.id).from(from).to(toRead).drop();
IMessageFilters.Filter filter3 = cluster.filters().verbs(PAXOS2_PROPOSE_REQ.id, PAXOS_PROPOSE_REQ.id).from(from).to(toPropose).drop();
IMessageFilters.Filter filter4 = cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(from).to(toCommit).drop();
return () -> {
filter1.off();
filter2.off();
filter3.off();
filter4.off();
};
}
public static void addToRing(boolean bootstrapping, IInstance peer)
{
try
{
IInstanceConfig config = peer.config();
IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
InetAddressAndPort address = InetAddressAndPort.getByAddress(peer.broadcastAddress());
UUID hostId = config.hostId();
Gossiper.runInGossipStageBlocking(() -> {
Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
Gossiper.instance.injectApplicationState(address,
ApplicationState.TOKENS,
new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
VersionedValue status = bootstrapping
? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
: new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token));
Gossiper.instance.injectApplicationState(address, ApplicationState.STATUS, status);
StorageService.instance.onChange(address, ApplicationState.STATUS, status);
Gossiper.instance.realMarkAlive(address, Gossiper.instance.getEndpointStateForEndpoint(address));
});
int version = Math.min(MessagingService.current_version, peer.getMessagingVersion());
MessagingService.instance().versions.set(address, version);
if (!bootstrapping)
assert StorageService.instance.getTokenMetadata().isMember(address);
PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
throw new RuntimeException(e);
}
}
public static void assertVisibleInRing(IInstance peer)
{
InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(peer.broadcastAddress());
Assert.assertTrue(Gossiper.instance.isAlive(endpoint));
}
// reset gossip state so we know of the node being alive only
public static void removeFromRing(IInstance peer)
{
try
{
IInstanceConfig config = peer.config();
IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
Gossiper.runInGossipStageBlocking(() -> {
StorageService.instance.onChange(address,
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L, 0));
Gossiper.instance.unsafeAnnulEndpoint(address);
Gossiper.instance.realMarkAlive(address, new EndpointState(new HeartBeatState(0, 0)));
});
PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
throw new RuntimeException(e);
}
}
public static void assertNotVisibleInRing(IInstance peer)
{
InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(peer.broadcastAddress());
Assert.assertFalse(Gossiper.instance.isAlive(endpoint));
}
public static void addToRingNormal(IInstance peer)
{
addToRing(false, peer);
assert StorageService.instance.getTokenMetadata().isMember(InetAddressAndPort.getByAddress(peer.broadcastAddress()));
}
public static void addToRingBootstrapping(IInstance peer)
{
addToRing(true, peer);
}
}