Enable EpochAwareDebounce to cancel in flight rpc requests
Patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson
for CASSANDRA-19514
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index 5621845..f65c03d 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -18,13 +18,23 @@
package org.apache.cassandra.tcm;
-import java.util.concurrent.Callable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
/**
* When debouncing from a replica we know exactly which epoch we need, so to avoid retries we
@@ -32,12 +42,13 @@
* comes in, we create a new future. If a request for a newer epoch comes in, we simply
* swap out the current future reference for a new one which is requesting the newer epoch.
*/
-public class EpochAwareDebounce<T>
+public class EpochAwareDebounce
{
- public static final EpochAwareDebounce<ClusterMetadata> instance = new EpochAwareDebounce<>();
-
- private final AtomicReference<EpochAwareAsyncPromise<T>> currentFuture = new AtomicReference<>();
+ private static final Logger logger = LoggerFactory.getLogger(EpochAwareDebounce.class);
+ public static final EpochAwareDebounce instance = new EpochAwareDebounce();
+ private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new AtomicReference<>();
private final ExecutorPlus executor;
+ private final List<Promise<LogState>> inflightRequests = new CopyOnWriteArrayList<>();
private EpochAwareDebounce()
{
@@ -45,24 +56,50 @@
this.executor = ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
}
- public Future<T> getAsync(Callable<T> get, Epoch epoch)
+ /**
+ * Deduplicate requests to catch up log state based on the desired epoch. Callers supply a target epoch and
+ * a function obtain the ClusterMetadata that corresponds with it. It is expected that this function will make rpc
+ * calls to peers, retrieving a LogState which can be applied locally to produce the necessary {@code
+ * ClusterMetadata}. The function takes a {@code Promise<LogState>} as input, with the expectation that this
+ * specific instance will be used to provide blocking behaviour when making the rpc calls that fetch the {@code
+ * LogState}. These promises are memoized in order to cancel them when {@link #shutdownAndWait(long, TimeUnit)} is
+ * called. This causes the fetch function to stop waiting on any in flight {@code LogState} requests and prevents
+ * shutdown from being blocked.
+ *
+ * @param fetchFunction executes the request for LogState. It's expected that this popluates fetchResult with the
+ * successful result.
+ * @param epoch the desired epoch
+ * @return
+ */
+ public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, ClusterMetadata> fetchFunction,
+ Epoch epoch)
{
while (true)
{
- EpochAwareAsyncPromise<T> running = currentFuture.get();
+ EpochAwareAsyncPromise running = currentFuture.get();
if (running != null && !running.isDone() && running.epoch.isEqualOrAfter(epoch))
return running;
- EpochAwareAsyncPromise<T> promise = new EpochAwareAsyncPromise<>(epoch);
+ Promise<LogState> fetchResult = new AsyncPromise<>();
+
+ EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
if (currentFuture.compareAndSet(running, promise))
{
+ fetchResult.addCallback((logState, error) -> {
+ logger.debug("Removing future remotely requesting epoch {} from in flight list", epoch);
+ inflightRequests.remove(fetchResult);
+ });
+ inflightRequests.add(fetchResult);
+
executor.submit(() -> {
try
{
- promise.setSuccess(get.call());
+ promise.setSuccess(fetchFunction.apply(fetchResult));
}
catch (Throwable t)
{
+ fetchResult.cancel(true);
+ inflightRequests.remove(fetchResult);
promise.setFailure(t);
}
});
@@ -71,7 +108,7 @@
}
}
- private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T>
+ private static class EpochAwareAsyncPromise extends AsyncPromise<ClusterMetadata>
{
private final Epoch epoch;
public EpochAwareAsyncPromise(Epoch epoch)
@@ -79,4 +116,12 @@
this.epoch = epoch;
}
}
+
+ public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+ {
+ logger.info("Cancelling {} in flight log fetch requests", inflightRequests.size());
+ for (Promise<LogState> toCancel : inflightRequests)
+ toCancel.cancel(true);
+ ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+ }
}
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 29c7e6b..61cbc63 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
public class PeerLogFetcher
{
@@ -72,10 +74,11 @@
public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, Epoch awaitAtleast)
{
- return EpochAwareDebounce.instance.getAsync(() -> fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
+ Function<Promise<LogState>, ClusterMetadata> fn = promise -> fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
+ return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
}
- private ClusterMetadata fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
+ private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
@@ -85,11 +88,13 @@
try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.time())
{
- LogState logState = RemoteProcessor.sendWithCallback(Verb.TCM_FETCH_PEER_LOG_REQ,
- new FetchPeerLog(before),
- new RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
- Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
- new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+ RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+ Verb.TCM_FETCH_PEER_LOG_REQ,
+ new FetchPeerLog(before),
+ new RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
+ Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+ new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+ LogState logState = remoteRequest.awaitUninterruptibly().get();
log.append(logState);
ClusterMetadata fetched = log.waitForHighestConsecutive();
if (fetched.epoch.isEqualOrAfter(awaitAtleast))
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 260d151..a647d4d 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
@@ -49,6 +50,7 @@
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
@@ -124,13 +126,19 @@
@Override
public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy)
{
- // Synchonous, non-debounced call if we are waiting for the highest epoch. Should be used sparingly.
+ Function<Promise<LogState>, ClusterMetadata> fetchFunction =
+ promise -> fetchLogAndWaitInternal(promise,
+ new CandidateIterator(candidates(true), false),
+ log);
+ // Synchonous, non-debounced call if we are waiting for the highest epoch (without knowing/caring what it is).
+ // Should be used sparingly.
if (waitFor == null)
- return fetchLogAndWaitInternal();
+ return fetchFunction.apply(new AsyncPromise<>());
try
{
- return EpochAwareDebounce.instance.getAsync(this::fetchLogAndWaitInternal, waitFor).get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS);
+ Future<ClusterMetadata> cmFuture = EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor);
+ return cmFuture.get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS);
}
catch (InterruptedException e)
{
@@ -142,29 +150,37 @@
}
}
- private ClusterMetadata fetchLogAndWaitInternal()
+ public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log)
{
- return fetchLogAndWait(new CandidateIterator(candidates(true), false), log);
+ Promise<LogState> remoteRequest = new AsyncPromise<>();
+ return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
}
- public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log)
+ private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState> remoteRequest,
+ CandidateIterator candidates,
+ LocalLog log)
{
try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
{
Epoch currentEpoch = log.metadata().epoch;
- LogState replay = sendWithCallback(Verb.TCM_FETCH_CMS_LOG_REQ,
- new FetchCMSLog(currentEpoch, ClusterMetadataService.state() == REMOTE),
- candidateIterator,
- new Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+ sendWithCallbackAsync(remoteRequest,
+ Verb.TCM_FETCH_CMS_LOG_REQ,
+ new FetchCMSLog(currentEpoch, ClusterMetadataService.state() == REMOTE),
+ candidates,
+ new Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+ LogState replay = remoteRequest.awaitUninterruptibly().get();
if (!replay.isEmpty())
{
logger.info("Replay request returned replay data: {}", replay);
log.append(replay);
TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch, replay.latestEpoch());
}
-
return log.waitForHighestConsecutive();
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
}
// todo rename to send with retries or something
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 25b2cb8..a2b225c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -131,12 +131,14 @@
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.NettyStreamingChannel;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.EpochAwareDebounce;
import org.apache.cassandra.tcm.Startup;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
@@ -925,7 +927,8 @@
() -> SSTableReader.shutdownBlocking(1L, MINUTES),
() -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
() -> ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES),
- () -> org.apache.cassandra.service.snapshot.SnapshotManager.shutdownAndWait(1L, MINUTES)
+ () -> SnapshotManager.shutdownAndWait(1L, MINUTES),
+ () -> EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES)
);
internodeMessagingStarted = false;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
new file mode 100644
index 0000000..3e8cb9c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+
+import static org.junit.Assert.assertTrue;
+
+public class CMSCatchupTest extends TestBaseImpl
+{
+ @Test
+ public void testCMSCatchup() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(4)
+ .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));
+ cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
+ cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
+ // isolate node2 from the other CMS members to ensure it's behind
+ cluster.filters().inbound().from(1).to(2).drop();
+ cluster.filters().inbound().from(3).to(2).drop();
+ AtomicInteger fetchedFromPeer = new AtomicInteger();
+ cluster.filters().inbound().from(2).to(4).messagesMatching((from, to, msg) -> {
+ if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
+ fetchedFromPeer.getAndIncrement();
+ return false;
+ }).drop().on();
+
+ long mark = cluster.get(4).logs().mark();
+ cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test 123'"), ConsistencyLevel.ONE);
+ cluster.get(4).logs().watchFor(mark, "AlterOptions");
+
+ mark = cluster.get(2).logs().mark();
+ cluster.get(1).shutdown().get();
+ cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump to shutdown");
+ // node2, a CMS member, is now behind and node1 is shut down.
+ // Try reading at QUORUM from node4, node2 should detect it's behind and catch up from node4
+ int before = fetchedFromPeer.get();
+ cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl where id = 55"), ConsistencyLevel.QUORUM);
+ assertTrue(fetchedFromPeer.get() > before);
+ }
+ }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
new file mode 100644
index 0000000..d3b549d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState;
+import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+
+import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FetchLogFromPeers2Test extends TestBaseImpl
+{
+ @Test
+ public void testSchema() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(3)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)"));
+
+ for (ClusterState clusterState : ClusterState.values())
+ for (Operation operation : Operation.values())
+ {
+ setupSchemaBehind(cluster);
+ runQuery(cluster, clusterState, operation);
+ }
+ }
+ }
+
+ public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws ExecutionException, InterruptedException
+ {
+ cluster.get(1).shutdown().get();
+
+ // node2 is behind
+ String query;
+ switch (operation)
+ {
+ case READ:
+ query = "select * from %s.tbl where id = 5";
+ break;
+ case WRITE:
+ query = "insert into %s.tbl (id) values (5)";
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ int coordinator = coordinator(clusterState);
+ long mark = cluster.get(2).logs().mark();
+ long metricsBefore = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+ if (clusterState == ClusterState.COORDINATOR_BEHIND)
+ {
+ long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
+ try
+ {
+ for (int i = 1; i <= cluster.size(); i++)
+ if (!cluster.get(i).isShutdown())
+ coordinatorBehindMetricsBefore[i - 1] = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount());
+ cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
+ fail("should fail");
+ }
+ catch (Exception ignored) {}
+
+ boolean metricBumped = false;
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ if (i == coordinator || cluster.get(i).isShutdown())
+ continue;
+ long metricAfter = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount());
+ if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
+ {
+ metricBumped = true;
+ break;
+ }
+ }
+ assertTrue("Metric CoordinatorBehindSchema should have been bumped for at least one replica", metricBumped);
+
+ }
+ cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
+ assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from /127.0.0.3:7012").getResult().size() > 0);
+ long metricsAfter = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+ assertTrue(metricsAfter > metricsBefore);
+
+ cluster.get(1).startup();
+ }
+
+ public void setupSchemaBehind(Cluster cluster)
+ {
+ cluster.filters().reset();
+ cluster.filters().inbound().from(1).to(2).drop();
+ long epochBefore = cluster.get(3).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch());
+ cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
+ cluster.get(3).runOnInstance(() -> {
+ try
+ {
+ ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ cluster.filters().reset();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
index fbcd080..6996f0c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.distributed.test.log;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,7 +27,6 @@
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IMessageFilters;
@@ -38,14 +36,12 @@
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.tcm.sequences.AddToCMS;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import static org.junit.Assert.assertTrue;
@@ -53,29 +49,10 @@
public class FetchLogFromPeersTest extends TestBaseImpl
{
- enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
- enum Operation { READ, WRITE }
+ public enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
+ public enum Operation { READ, WRITE }
- @Test
- public void testSchema() throws Exception
- {
- try (Cluster cluster = init(builder().withNodes(3)
- .start()))
- {
- cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
- cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));
- cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)"));
-
- for (ClusterState clusterState : ClusterState.values())
- for (Operation operation : Operation.values())
- {
- setupSchemaBehind(cluster);
- runQuery(cluster, clusterState, operation);
- }
- }
- }
-
- public int coordinator(ClusterState clusterState)
+ public static int coordinator(ClusterState clusterState)
{
switch (clusterState)
{
@@ -87,81 +64,6 @@
throw new IllegalStateException();
}
- public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws ExecutionException, InterruptedException
- {
- cluster.get(1).shutdown().get();
-
- // node2 is behind
- String query;
- switch (operation)
- {
- case READ:
- query = "select * from %s.tbl where id = 5";
- break;
- case WRITE:
- query = "insert into %s.tbl (id) values (5)";
- break;
- default:
- throw new IllegalStateException();
- }
- int coordinator = coordinator(clusterState);
- long mark = cluster.get(2).logs().mark();
- long metricsBefore = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount());
- if (clusterState == ClusterState.COORDINATOR_BEHIND)
- {
- long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
- try
- {
- for (int i = 1; i <= cluster.size(); i++)
- if (!cluster.get(i).isShutdown())
- coordinatorBehindMetricsBefore[i - 1] = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount());
- cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
- fail("should fail");
- }
- catch (Exception ignored) {}
-
- boolean metricBumped = false;
- for (int i = 1; i <= cluster.size(); i++)
- {
- if (i == coordinator || cluster.get(i).isShutdown())
- continue;
- long metricAfter = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount());
- if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
- {
- metricBumped = true;
- break;
- }
- }
- assertTrue("Metric CoordinatorBehindSchema should have been bumped for at least one replica", metricBumped);
-
- }
- cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
- assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from /127.0.0.3:7012").getResult().size() > 0);
- long metricsAfter = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount());
- assertTrue(metricsAfter > metricsBefore);
-
- cluster.get(1).startup();
- }
-
- public void setupSchemaBehind(Cluster cluster)
- {
- cluster.filters().reset();
- cluster.filters().inbound().from(1).to(2).drop();
- long epochBefore = cluster.get(3).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch());
- cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
- cluster.get(3).runOnInstance(() -> {
- try
- {
- ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- });
- cluster.filters().reset();
- }
-
@Test
public void catchupCoordinatorBehindTestPlacements() throws Exception
{
@@ -263,40 +165,6 @@
}
}
- @Test
- public void testCMSCatchupTest() throws Exception
- {
- try (Cluster cluster = init(builder().withNodes(4)
- .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
- .start()))
- {
- cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));
- cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
- cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
- // isolate node2 from the other CMS members to ensure it's behind
- cluster.filters().inbound().from(1).to(2).drop();
- cluster.filters().inbound().from(3).to(2).drop();
- AtomicInteger fetchedFromPeer = new AtomicInteger();
- cluster.filters().inbound().from(2).to(4).messagesMatching((from, to, msg) -> {
- if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
- fetchedFromPeer.getAndIncrement();
- return false;
- }).drop().on();
-
- long mark = cluster.get(4).logs().mark();
- cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test 123'"), ConsistencyLevel.ONE);
- cluster.get(4).logs().watchFor(mark, "AlterOptions");
-
- mark = cluster.get(2).logs().mark();
- cluster.get(1).shutdown().get();
- cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump to shutdown");
- // node2, a CMS member, is now behind and node1 is shut down.
- // Try reading at QUORUM from node4, node2 should detect it's behind and catch up from node4
- int before = fetchedFromPeer.get();
- cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl where id = 55"), ConsistencyLevel.QUORUM);
- assertTrue(fetchedFromPeer.get() > before);
- }
- }
@Test
public void catchupWithSnapshot() throws Exception
@@ -370,7 +238,6 @@
}
}
-
private static void executeAlters(Cluster cluster)
{
for (int i = 0; i < 10; i++)