Merge branch 'cassandra-4.1' into trunk
diff --git a/CHANGES.txt b/CHANGES.txt
index 3342318..626975b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations (CASSANDRA-17495)
  * Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544)
 Merged from 4.1:
+ * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the BulkLoader tool (CASSANDRA-17740)
  * Uncomment prepared_statements_cache_size, key_cache_size, counter_cache_size, index_summary_capacity which were
    commented out by mistake in a previous patch
    Fix breaking change with cache_load_timeout; cache_load_timeout_seconds <=0 and cache_load_timeout=0 are equivalent
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index c8c21b5..4e11c93 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@
 import javax.management.openmbean.TabularData;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -988,6 +989,8 @@
      */
     public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
     {
+        Preconditions.checkState(DatabaseDescriptor.isDaemonInitialized()); // Make sure being used as a daemon, not a tool
+
         String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE peer=? AND peer_port = ?";
         UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
         if (!result.isEmpty() && result.one().has("preferred_ip"))
diff --git a/src/java/org/apache/cassandra/streaming/StreamingChannel.java b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
index a638638..6b623b8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@ -58,6 +58,20 @@
             // Implementations can decide whether or not to do something with the preferred address.
             return create(to, messagingVersion, kind);
         }
+
+        /** Provide way to disable getPreferredIP() for tools without access to the system keyspace
+         *
+         * CASSANDRA-17663 moves calls to SystemKeyspace.getPreferredIP() outside of any threads
+         * that are regularly interrupted.  However the streaming subsystem is also used
+         * by the bulk loader tool, which does not have direct access to the local tables
+         * and uses the client metadata/queries to retrieve it.
+         *
+         * @return true if SystemKeyspace.getPreferredIP() should be used when connecting
+         */
+        default boolean supportsPreferredIp()
+        {
+            return true;
+        }
     }
 
     public enum Kind { CONTROL, FILE }
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index c2e551e..38277e6 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -211,7 +211,7 @@
             if (logger.isDebugEnabled())
                 logger.debug("{} Sending {}", createLogTag(session), message);
 
-            InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
+            InetAddressAndPort connectTo = factory.supportsPreferredIp() ? SystemKeyspace.getPreferredIP(to) : to;
             return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) message, connectTo));
         }
 
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 38a9e31..b282932 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -53,7 +53,8 @@
                                    int messagingVersion,
                                    StreamingChannel.Kind kind) throws IOException
     {
-        // Supply a preferred address to the template, which will be overwritten if encryption is configured.
+        // The preferred address is always overwritten in create(). This method override only exists so we can avoid
+        // falling back to the NettyStreamingConnectionFactory implementation.
         OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
         return create(template, messagingVersion, kind);
     }
@@ -70,4 +71,9 @@
 
         return connect(template, messagingVersion, kind);
     }
+    @Override
+    public boolean supportsPreferredIp()
+    {
+        return false; // called in a tool context, do not use getPreferredIP
+    }
 }
diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java b/test/unit/org/apache/cassandra/tools/ToolRunner.java
index 6f55160..d3b9f10 100644
--- a/test/unit/org/apache/cassandra/tools/ToolRunner.java
+++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java
@@ -46,9 +46,7 @@
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.utils.Pair;
-import org.assertj.core.api.Assertions;
 import org.assertj.core.util.Lists;
-import org.assertj.core.util.Strings;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -402,8 +400,11 @@
          */
         public void assertCleanStdErr()
         {
-            assertTrue("Failed because cleaned stdErr wasn't empty: " + getCleanedStderr(),
-                       getCleanedStderr().isEmpty());
+            String raw = getStderr();
+            String cleaned = getCleanedStderr();
+            assertTrue("Failed to clean stderr completely.\nRaw (length=" + raw.length() + "):\n" + raw +
+                       "\nCleaned (length=" + cleaned.length() + "):\n" + cleaned,
+                       cleaned.trim().isEmpty());
         }
 
         public void assertOnExitCode()