Merge branch 'cassandra-4.1' into trunk
diff --git a/CHANGES.txt b/CHANGES.txt
index 3342318..626975b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
* Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations (CASSANDRA-17495)
* Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544)
Merged from 4.1:
+ * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the BulkLoader tool (CASSANDRA-17740)
* Uncomment prepared_statements_cache_size, key_cache_size, counter_cache_size, index_summary_capacity which were
commented out by mistake in a previous patch
Fix breaking change with cache_load_timeout; cache_load_timeout_seconds <=0 and cache_load_timeout=0 are equivalent
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index c8c21b5..4e11c93 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@
import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -988,6 +989,8 @@
*/
public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
{
+ Preconditions.checkState(DatabaseDescriptor.isDaemonInitialized()); // Make sure being used as a daemon, not a tool
+
String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE peer=? AND peer_port = ?";
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
if (!result.isEmpty() && result.one().has("preferred_ip"))
diff --git a/src/java/org/apache/cassandra/streaming/StreamingChannel.java b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
index a638638..6b623b8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@ -58,6 +58,20 @@
// Implementations can decide whether or not to do something with the preferred address.
return create(to, messagingVersion, kind);
}
+
+ /** Provide way to disable getPreferredIP() for tools without access to the system keyspace
+ *
+ * CASSANDRA-17663 moves calls to SystemKeyspace.getPreferredIP() outside of any threads
+ * that are regularly interrupted. However the streaming subsystem is also used
+ * by the bulk loader tool, which does not have direct access to the local tables
+ * and uses the client metadata/queries to retrieve it.
+ *
+ * @return true if SystemKeyspace.getPreferredIP() should be used when connecting
+ */
+ default boolean supportsPreferredIp()
+ {
+ return true;
+ }
}
public enum Kind { CONTROL, FILE }
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index c2e551e..38277e6 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -211,7 +211,7 @@
if (logger.isDebugEnabled())
logger.debug("{} Sending {}", createLogTag(session), message);
- InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
+ InetAddressAndPort connectTo = factory.supportsPreferredIp() ? SystemKeyspace.getPreferredIP(to) : to;
return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) message, connectTo));
}
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 38a9e31..b282932 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -53,7 +53,8 @@
int messagingVersion,
StreamingChannel.Kind kind) throws IOException
{
- // Supply a preferred address to the template, which will be overwritten if encryption is configured.
+ // The preferred address is always overwritten in create(). This method override only exists so we can avoid
+ // falling back to the NettyStreamingConnectionFactory implementation.
OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
return create(template, messagingVersion, kind);
}
@@ -70,4 +71,9 @@
return connect(template, messagingVersion, kind);
}
+ @Override
+ public boolean supportsPreferredIp()
+ {
+ return false; // called in a tool context, do not use getPreferredIP
+ }
}
diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java b/test/unit/org/apache/cassandra/tools/ToolRunner.java
index 6f55160..d3b9f10 100644
--- a/test/unit/org/apache/cassandra/tools/ToolRunner.java
+++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java
@@ -46,9 +46,7 @@
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.utils.Pair;
-import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
-import org.assertj.core.util.Strings;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
@@ -402,8 +400,11 @@
*/
public void assertCleanStdErr()
{
- assertTrue("Failed because cleaned stdErr wasn't empty: " + getCleanedStderr(),
- getCleanedStderr().isEmpty());
+ String raw = getStderr();
+ String cleaned = getCleanedStderr();
+ assertTrue("Failed to clean stderr completely.\nRaw (length=" + raw.length() + "):\n" + raw +
+ "\nCleaned (length=" + cleaned.length() + "):\n" + cleaned,
+ cleaned.trim().isEmpty());
}
public void assertOnExitCode()