JAVA-3099: Fix failing integration tests (#1701)

should_extract_hosts_using_native_transport_address_port_ssl_from_peers
- use testng style ignore

should_function_with_snappy_compression
- skip for cassandra 4.0.0 and greater (not supported)

should_handle_failing_or_missing_contact_points
- use V4 protocol to avoid initial retry with default V5

should_ignore_and_warn_peers_with_null_entries_by_default
- encode expected "missing native_transport_*" strings when not using v2-style peers

should_init_policy_with_up_contact_points
- since adding SNI support for astra, only contact points will have state INIT

should_throw_proper_read_timeout_exception
- update expected error message string

should_throw_exception_when_frame_exceeds_configured_max
- use V4 protocol for cassandra 4.0+

OPPTokenIntegrationTest/OPPTokenVnodeIntegrationTest
- not applicable for cassandra 4.0+

MailboxServiceSnappyIT
- not applicable for cassandra 4.0+

osgi/BundleOptions
- add missing dependencies for recent library upgrades

override org.ops4j.pax.url.mvn.repositories in osgi tests because https is now required by maven central
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
index 6bcb65f..ca9f5e4 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
@@ -56,6 +56,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.ITestResult;
+import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -1137,4 +1138,13 @@
       return (T) constructor.newInstance(enclosingInstance);
     }
   }
+
+  protected void skipTestWithCassandraVersionOrHigher(String version, String testKind) {
+    if (CCMBridge.getGlobalCassandraVersion().compareTo(VersionNumber.parse(version)) >= 0) {
+      throw new SkipException(
+          String.format(
+              "%s tests not applicable to cassandra version >= %s (configured: %s)",
+              testKind, version, CCMBridge.getGlobalCassandraVersion()));
+    }
+  }
 }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java
index dfc5387..ed2bdba 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java
@@ -107,6 +107,8 @@
       cluster =
           Cluster.builder()
               .withPort(scassandra.getBinaryPort())
+              // scassandra supports max V4 protocol
+              .withProtocolVersion(ProtocolVersion.V4)
               .addContactPoints(
                   ipOfNode(1),
                   failingHosts.get(0).address,
diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
index 83f54e7..62decf5 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
@@ -52,7 +52,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.log4j.Level;
-import org.junit.Ignore;
 import org.scassandra.http.client.PrimingClient;
 import org.scassandra.http.client.PrimingRequest;
 import org.scassandra.http.client.Result;
@@ -335,11 +334,17 @@
       cluster.init();
 
       InetAddress node2Address = scassandraCluster.address(2).getAddress();
+      String invalidValues =
+          withPeersV2
+              ? columnData
+              : String.format(
+                  "missing native_transport_address, missing native_transport_port, missing native_transport_port_ssl, %s",
+                  columnData);
       String expectedError =
           String.format(
               "Found invalid row in system.peers: [peer=%s, %s]. "
                   + "This is likely a gossip or snitch issue, this host will be ignored.",
-              node2Address, columnData);
+              node2Address, invalidValues);
       String log = logs.get();
       // then: A peer with a null rack should not show up in host metadata, unless allowed via
       // system property.
@@ -614,9 +619,8 @@
    * be selected if the Cluster is created with SSL support (i.e. if {@link
    * Cluster.Builder#withSSL()} is used).
    */
-  @Test(groups = "short")
+  @Test(groups = "short", enabled = false /* Requires SSL support in scassandra */)
   @CCMConfig(createCcm = false)
-  @Ignore("Requires SSL support in scassandra")
   public void should_extract_hosts_using_native_transport_address_port_ssl_from_peers()
       throws UnknownHostException {
 
diff --git a/driver-core/src/test/java/com/datastax/driver/core/DirectCompressionTest.java b/driver-core/src/test/java/com/datastax/driver/core/DirectCompressionTest.java
index 6bfa4a9..0c71326 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/DirectCompressionTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/DirectCompressionTest.java
@@ -29,6 +29,7 @@
    */
   @Test(groups = "short")
   public void should_function_with_snappy_compression() throws Exception {
+    skipTestWithCassandraVersionOrHigher("4.0.0", "snappy");
     compressionTest(ProtocolOptions.Compression.SNAPPY);
   }
 
diff --git a/driver-core/src/test/java/com/datastax/driver/core/FrameLengthTest.java b/driver-core/src/test/java/com/datastax/driver/core/FrameLengthTest.java
index 7258f76..7bfaae3 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/FrameLengthTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/FrameLengthTest.java
@@ -26,6 +26,7 @@
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.schemabuilder.Create;
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.datastax.driver.core.utils.CassandraVersion;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Random;
@@ -105,22 +106,42 @@
    */
   @Test(groups = "isolated")
   public void should_throw_exception_when_frame_exceeds_configured_max() {
+    skipTestWithCassandraVersionOrHigher("4.0.0", "frame-size exceeding with default-protocol");
+    run_frame_size_exceeding_queries(session());
+  }
+
+  /**
+   * With cassandra 4.0.0+, V5 protocol is default which breaks requests into segments. Force V4
+   * protocol to allow us to test frame-size limitation code.
+   */
+  @CassandraVersion("4.0.0")
+  @Test(groups = "isolated")
+  public void should_throw_exception_when_frame_exceeds_configured_max_v4_protocol_cassandra4() {
+    Cluster cluster =
+        register(createClusterBuilder().withProtocolVersion(ProtocolVersion.V4).build());
+    Session session = register(cluster.connect());
+    useKeyspace(session, keyspace);
+
+    run_frame_size_exceeding_queries(session);
+  }
+
+  private void run_frame_size_exceeding_queries(Session session) {
     try {
-      session().execute(select().from(tableName).where(eq("k", 0)));
+      session.execute(select().from(tableName).where(eq("k", 0)));
       fail("Exception expected");
     } catch (FrameTooLongException ftle) {
       // Expected.
     }
 
     // Both hosts should remain up.
-    Collection<Host> hosts = session().getState().getConnectedHosts();
+    Collection<Host> hosts = session.getState().getConnectedHosts();
     assertThat(hosts).hasSize(2).extractingResultOf("isUp").containsOnly(true);
 
     // Should be able to make a query that is less than the max frame size.
     // Execute multiple time to exercise all hosts.
     for (int i = 0; i < 10; i++) {
       ResultSet result =
-          session().execute(select().from(tableName).where(eq("k", 0)).and(eq("c", 0)));
+          session.execute(select().from(tableName).where(eq("k", 0)).and(eq("c", 0)));
       assertThat(result.getAvailableWithoutFetching()).isEqualTo(1);
     }
   }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/HeapCompressionTest.java b/driver-core/src/test/java/com/datastax/driver/core/HeapCompressionTest.java
index d27309e..2153639 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/HeapCompressionTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/HeapCompressionTest.java
@@ -37,6 +37,7 @@
    */
   @Test(groups = "isolated")
   public void should_function_with_snappy_compression() throws Exception {
+    skipTestWithCassandraVersionOrHigher("4.0.0", "snappy");
     compressionTest(ProtocolOptions.Compression.SNAPPY);
   }
 
diff --git a/driver-core/src/test/java/com/datastax/driver/core/LoadBalancingPolicyBootstrapTest.java b/driver-core/src/test/java/com/datastax/driver/core/LoadBalancingPolicyBootstrapTest.java
index 8cb4d6c..5069d33 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/LoadBalancingPolicyBootstrapTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/LoadBalancingPolicyBootstrapTest.java
@@ -57,10 +57,13 @@
     try {
       cluster.init();
 
+      // To support astra, only hosts in Metadata#getContactPoints are passed to init()
+      // TestUtils#configureClusterBuilder only uses the first host as the contact point
+      // Remaining hosts are learned after connection via onAdd()
       assertThat(policy.history)
           .containsOnly(
               entry(INIT, TestUtils.findHost(cluster, 1)),
-              entry(INIT, TestUtils.findHost(cluster, 2)));
+              entry(ADD, TestUtils.findHost(cluster, 2)));
     } finally {
       cluster.close();
     }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/OPPTokenIntegrationTest.java b/driver-core/src/test/java/com/datastax/driver/core/OPPTokenIntegrationTest.java
index ab6f590..680195f 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/OPPTokenIntegrationTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/OPPTokenIntegrationTest.java
@@ -28,4 +28,10 @@
   protected Token.Factory tokenFactory() {
     return OPPToken.FACTORY;
   }
+
+  @Override
+  public void beforeTestClass(Object testInstance) throws Exception {
+    skipTestWithCassandraVersionOrHigher("4.0.0", "ByteOrderedPartitioner");
+    super.beforeTestClass(testInstance);
+  }
 }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/OPPTokenVnodeIntegrationTest.java b/driver-core/src/test/java/com/datastax/driver/core/OPPTokenVnodeIntegrationTest.java
index 084f8a7..4a94196 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/OPPTokenVnodeIntegrationTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/OPPTokenVnodeIntegrationTest.java
@@ -26,4 +26,10 @@
   protected Token.Factory tokenFactory() {
     return Token.OPPToken.FACTORY;
   }
+
+  @Override
+  public void beforeTestClass(Object testInstance) throws Exception {
+    skipTestWithCassandraVersionOrHigher("4.0.0", "ByteOrderedPartitioner");
+    super.beforeTestClass(testInstance);
+  }
 }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/exceptions/ExceptionsScassandraTest.java b/driver-core/src/test/java/com/datastax/driver/core/exceptions/ExceptionsScassandraTest.java
index 1e9b6cb..54c4036 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/exceptions/ExceptionsScassandraTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/exceptions/ExceptionsScassandraTest.java
@@ -78,7 +78,7 @@
     } catch (ReadTimeoutException e) {
       assertThat(e.getMessage())
           .isEqualTo(
-              "Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)");
+              "Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded). In case this was generated during read repair, the consistency level is not representative of the actual consistency.");
       assertThat(e.getConsistencyLevel()).isEqualTo(LOCAL_ONE);
       assertThat(e.getReceivedAcknowledgements()).isEqualTo(0);
       assertThat(e.getRequiredAcknowledgements()).isEqualTo(1);
diff --git a/driver-tests/osgi/pom.xml b/driver-tests/osgi/pom.xml
index f30355b..f47d7f0 100644
--- a/driver-tests/osgi/pom.xml
+++ b/driver-tests/osgi/pom.xml
@@ -180,6 +180,9 @@
                         <jackson.version>${jackson.version}</jackson.version>
                         <jackson-databind.version>${jackson-databind.version}</jackson-databind.version>
                         <ipprefix>${ipprefix}</ipprefix>
+                        <org.ops4j.pax.url.mvn.useFallbackRepositories>false</org.ops4j.pax.url.mvn.useFallbackRepositories>
+                        <!-- repo1.maven.org requires https but the old version of pax-exam we use hard-codes http -->
+                        <org.ops4j.pax.url.mvn.repositories>https://repo1.maven.org/maven2@id=central</org.ops4j.pax.url.mvn.repositories>
                     </systemPropertyVariables>
                 </configuration>
             </plugin>
diff --git a/driver-tests/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java b/driver-tests/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
index 46f6751..6c52a0f 100644
--- a/driver-tests/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
+++ b/driver-tests/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
@@ -16,6 +16,7 @@
 package com.datastax.driver.osgi.impl;
 
 import static com.datastax.driver.core.ProtocolOptions.Compression.LZ4;
+import static com.datastax.driver.core.ProtocolOptions.Compression.SNAPPY;
 import static com.datastax.driver.osgi.api.MailboxMessage.TABLE;
 
 import com.datastax.driver.core.Cluster;
@@ -69,7 +70,10 @@
     String compression = context.getProperty("cassandra.compression");
     if (compression != null) {
       if (ver.getMajor() < 2 && compression.equals(LZ4.name())) {
-        LOGGER.warn("Requested LZ4 compression but C* version is not compatible, disabling");
+        LOGGER.warn("Requested LZ4 compression but C* version < 2.0 is not compatible, disabling");
+      } else if (ver.getMajor() >= 4 && compression.equals(SNAPPY.name())) {
+        LOGGER.warn(
+            "Requested snappy compression but C* version >= 4.0 is not compatible, disabling");
       } else {
         LOGGER.info("Compression: {}", compression);
         builder.withCompression(ProtocolOptions.Compression.valueOf(compression));
diff --git a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java
index f3c4dbe..7a58b16 100644
--- a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java
+++ b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java
@@ -136,7 +136,9 @@
             mavenBundle("io.netty", "netty-codec", nettyVersion),
             mavenBundle("io.netty", "netty-common", nettyVersion),
             mavenBundle("io.netty", "netty-handler", nettyVersion),
-            mavenBundle("io.netty", "netty-transport", nettyVersion));
+            mavenBundle("io.netty", "netty-transport", nettyVersion),
+            mavenBundle("io.netty", "netty-transport-native-unix-common", nettyVersion),
+            mavenBundle("io.netty", "netty-resolver", nettyVersion));
       }
     };
   }
diff --git a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceSnappyIT.java b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceSnappyIT.java
index 8d8ae4a..6c3304d 100644
--- a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceSnappyIT.java
+++ b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceSnappyIT.java
@@ -25,10 +25,12 @@
 import static com.datastax.driver.osgi.BundleOptions.snappyBundle;
 import static org.ops4j.pax.exam.CoreOptions.options;
 
+import com.datastax.driver.core.VersionNumber;
 import com.datastax.driver.osgi.api.MailboxException;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
 import org.ops4j.pax.exam.testng.listener.PaxExam;
+import org.testng.SkipException;
 import org.testng.annotations.Listeners;
 import org.testng.annotations.Test;
 
@@ -59,6 +61,10 @@
    */
   @Test(groups = "short")
   public void test_snappy() throws MailboxException {
+    VersionNumber ver = VersionNumber.parse(bundleContext.getProperty("cassandra.version"));
+    if (ver.getMajor() >= 4) {
+      throw new SkipException("Snappy not supported with cassandra 4.0+");
+    }
     checkService();
   }
 }
diff --git a/pom.xml b/pom.xml
index 46f1d82..2a560b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1141,6 +1141,7 @@
                 <netty-tcnative.artifact>netty-tcnative-boringssl-static</netty-tcnative.artifact>
                 <!-- https://github.com/jnr/jffi/pull/116 apple-silicon requires signed jffi binaries, added in 1.3.8 (jnr-ffi 2.2.10+) -->
                 <jnr-ffi.version>2.2.10</jnr-ffi.version>
+                <snappy.version>1.1.10.1</snappy.version>
             </properties>
 
         </profile>