Merge branch 'cassandra-2.1' into cassandra-2.2
diff --git a/CHANGES.txt b/CHANGES.txt
index bca5fb0..787d145 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 Merged from 2.1:
+ * Fix incremental repair hang when replica is down (CASSANDRA-10288)
  * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
  * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16de071..8ecae23 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.repair;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
@@ -27,6 +28,7 @@
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -54,17 +56,24 @@
 
     public void run()
     {
-        AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
-        CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
-        if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+        if (FailureDetector.instance.isAlive(neighbor))
         {
-            MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+            AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
+            CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+            if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+            {
+                MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+            }
+            else
+            {
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                // immediately return after sending request
+                set(neighbor);
+            }
         }
         else
         {
-            MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
-            // immediately return after sending request
-            set(neighbor);
+            setException(new IOException(neighbor + " is down"));
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0cb4252..61f4196 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -267,12 +267,21 @@
 
         for (InetAddress neighbour : endpoints)
         {
-            CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
-            boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
-            logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
-            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
-            MessageOut<RepairMessage> msg = message.createMessage();
-            MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+            if (FailureDetector.instance.isAlive(neighbour))
+            {
+                CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
+                boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
+                logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
+                PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
+                MessageOut<RepairMessage> msg = message.createMessage();
+                MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+            }
+            else
+            {
+                status.set(false);
+                failedNodes.add(neighbour.getHostAddress());
+                prepareLatch.countDown();
+            }
         }
         try
         {