Merge branch 'cassandra-2.1' into cassandra-2.2
diff --git a/CHANGES.txt b/CHANGES.txt
index bca5fb0..787d145 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
* Reduce contention getting instances of CompositeType (CASSANDRA-10433)
* Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
Merged from 2.1:
+ * Fix incremental repair hang when replica is down (CASSANDRA-10288)
* Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16de071..8ecae23 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.repair;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
@@ -27,6 +28,7 @@
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -54,17 +56,24 @@
public void run()
{
- AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
- CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
- if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+ if (FailureDetector.instance.isAlive(neighbor))
{
- MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
+ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+ if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+ {
+ MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ }
+ else
+ {
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ // immediately return after sending request
+ set(neighbor);
+ }
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
- // immediately return after sending request
- set(neighbor);
+ setException(new IOException(neighbor + " is down"));
}
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0cb4252..61f4196 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -267,12 +267,21 @@
for (InetAddress neighbour : endpoints)
{
- CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
- boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
- logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
- PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
- MessageOut<RepairMessage> msg = message.createMessage();
- MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+ if (FailureDetector.instance.isAlive(neighbour))
+ {
+ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
+ boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
+ logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
+ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
+ MessageOut<RepairMessage> msg = message.createMessage();
+ MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+ }
+ else
+ {
+ status.set(false);
+ failedNodes.add(neighbour.getHostAddress());
+ prepareLatch.countDown();
+ }
}
try
{