CASSJAVA-104: Fix Flaky tests
patch by Jane He; review by
Bret McGuire and Andy Tolbert
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java
index a3d11cf..48988ac 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java
@@ -34,7 +34,6 @@
 import com.datastax.oss.driver.internal.core.session.RequestProcessor;
 import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
 import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
-import com.datastax.oss.driver.shaded.guava.common.base.Functions;
 import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
 import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
 import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
@@ -64,14 +63,15 @@
   }
 
   public CqlPrepareAsyncProcessor(@NonNull Optional<? extends DefaultDriverContext> context) {
-    this(context, Functions.identity());
+    // Use weakValues to evict prepared statements from the cache as soon are they are
+    // no longer referenced elsewhere.
+    this(context, CacheBuilder::weakValues);
   }
 
   protected CqlPrepareAsyncProcessor(
       Optional<? extends DefaultDriverContext> context,
       Function<CacheBuilder<Object, Object>, CacheBuilder<Object, Object>> decorator) {
-
-    CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder().weakValues();
+    CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder();
     this.cache = decorator.apply(baseCache).build();
     context.ifPresent(
         (ctx) -> {
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java
index 617d489..c2c359b 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java
@@ -194,8 +194,8 @@
       Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
     invalidationTestInner(
         setupTestSchema,
-        "select f from test_table_1 where e = ?",
-        "select h from test_table_2 where g = ?",
+        "select f from test_table_caching_1 where e = ?",
+        "select h from test_table_caching_2 where g = ?",
         expectedChangedTypes);
   }
 
@@ -206,8 +206,8 @@
     String condition = isCollection ? "contains ?" : "= ?";
     invalidationTestInner(
         setupTestSchema,
-        String.format("select e from test_table_1 where f %s allow filtering", condition),
-        String.format("select g from test_table_2 where h %s allow filtering", condition),
+        String.format("select e from test_table_caching_1 where f %s allow filtering", condition),
+        String.format("select g from test_table_caching_2 where h %s allow filtering", condition),
         expectedChangedTypes);
   }
 
@@ -263,16 +263,18 @@
                 preparedStmtCacheRemoveLatch.countDown();
               });
 
-      // alter test_type_2 to trigger cache invalidation and above events
-      session.execute("ALTER TYPE test_type_2 add i blob");
+      // alter test_type_caching_2 to trigger cache invalidation and above events
+      session.execute("ALTER TYPE test_type_caching_2 add i blob");
+
+      session.checkSchemaAgreement();
 
       // wait for latches and fail if they don't reach zero before timeout
       assertThat(
               Uninterruptibles.awaitUninterruptibly(
-                  preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS))
+                  preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS))
           .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout")
           .isTrue();
-      assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS))
+      assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS))
           .withFailMessage("typeChangeEventLatch did not trigger before timeout")
           .isTrue();
 
@@ -295,17 +297,20 @@
 
   Consumer<CqlSession> setupCacheEntryTestBasic =
       (session) -> {
-        session.execute("CREATE TYPE test_type_1 (a text, b int)");
-        session.execute("CREATE TYPE test_type_2 (c int, d text)");
-        session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
-        session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_2>)");
+        session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
+        session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
+        session.execute(
+            "CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_1>)");
+        session.execute(
+            "CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_2>)");
       };
 
   @Test
   public void should_invalidate_cache_entry_on_basic_udt_change_result_set() {
     SchemaChangeSynchronizer.withLock(
         () -> {
-          invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2"));
+          invalidationResultSetTest(
+              setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
@@ -314,25 +319,26 @@
     SchemaChangeSynchronizer.withLock(
         () -> {
           invalidationVariableDefsTest(
-              setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2"));
+              setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
   Consumer<CqlSession> setupCacheEntryTestCollection =
       (session) -> {
-        session.execute("CREATE TYPE test_type_1 (a text, b int)");
-        session.execute("CREATE TYPE test_type_2 (c int, d text)");
+        session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
+        session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
         session.execute(
-            "CREATE TABLE test_table_1 (e int primary key, f list<frozen<test_type_1>>)");
+            "CREATE TABLE test_table_caching_1 (e int primary key, f list<frozen<test_type_caching_1>>)");
         session.execute(
-            "CREATE TABLE test_table_2 (g int primary key, h list<frozen<test_type_2>>)");
+            "CREATE TABLE test_table_caching_2 (g int primary key, h list<frozen<test_type_caching_2>>)");
       };
 
   @Test
   public void should_invalidate_cache_entry_on_collection_udt_change_result_set() {
     SchemaChangeSynchronizer.withLock(
         () -> {
-          invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2"));
+          invalidationResultSetTest(
+              setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
@@ -341,25 +347,26 @@
     SchemaChangeSynchronizer.withLock(
         () -> {
           invalidationVariableDefsTest(
-              setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2"));
+              setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
   Consumer<CqlSession> setupCacheEntryTestTuple =
       (session) -> {
-        session.execute("CREATE TYPE test_type_1 (a text, b int)");
-        session.execute("CREATE TYPE test_type_2 (c int, d text)");
+        session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
+        session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
         session.execute(
-            "CREATE TABLE test_table_1 (e int primary key, f tuple<int, test_type_1, text>)");
+            "CREATE TABLE test_table_caching_1 (e int primary key, f tuple<int, test_type_caching_1, text>)");
         session.execute(
-            "CREATE TABLE test_table_2 (g int primary key, h tuple<text, test_type_2, int>)");
+            "CREATE TABLE test_table_caching_2 (g int primary key, h tuple<text, test_type_caching_2, int>)");
       };
 
   @Test
   public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() {
     SchemaChangeSynchronizer.withLock(
         () -> {
-          invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2"));
+          invalidationResultSetTest(
+              setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
@@ -368,18 +375,20 @@
     SchemaChangeSynchronizer.withLock(
         () -> {
           invalidationVariableDefsTest(
-              setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2"));
+              setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2"));
         });
   }
 
   Consumer<CqlSession> setupCacheEntryTestNested =
       (session) -> {
-        session.execute("CREATE TYPE test_type_1 (a text, b int)");
-        session.execute("CREATE TYPE test_type_2 (c int, d text)");
-        session.execute("CREATE TYPE test_type_3 (e frozen<test_type_1>, f int)");
-        session.execute("CREATE TYPE test_type_4 (g int, h frozen<test_type_2>)");
-        session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_3>)");
-        session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_4>)");
+        session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
+        session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
+        session.execute("CREATE TYPE test_type_caching_3 (e frozen<test_type_caching_1>, f int)");
+        session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen<test_type_caching_2>)");
+        session.execute(
+            "CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_3>)");
+        session.execute(
+            "CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_4>)");
       };
 
   @Test
@@ -387,7 +396,8 @@
     SchemaChangeSynchronizer.withLock(
         () -> {
           invalidationResultSetTest(
-              setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4"));
+              setupCacheEntryTestNested,
+              ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
         });
   }
 
@@ -396,7 +406,9 @@
     SchemaChangeSynchronizer.withLock(
         () -> {
           invalidationVariableDefsTest(
-              setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4"));
+              setupCacheEntryTestNested,
+              false,
+              ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
         });
   }
 
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java
index d7e581e..6eb1a7d 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java
@@ -21,20 +21,34 @@
 import static org.junit.Assert.fail;
 
 import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.context.DriverContext;
 import com.datastax.oss.driver.api.core.cql.PrepareRequest;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
+import com.datastax.oss.driver.api.core.session.SessionBuilder;
 import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
 import com.datastax.oss.driver.api.testinfra.session.SessionRule;
 import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
 import com.datastax.oss.driver.categories.IsolatedTests;
 import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
 import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
+import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
+import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
+import com.datastax.oss.driver.internal.core.session.RequestProcessor;
+import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
 import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
 import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
 import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -50,6 +64,69 @@
 
   @Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
 
+  private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor {
+
+    public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
+      // Default CqlPrepareAsyncProcessor uses weak values here as well.  We avoid doing so
+      // to prevent cache entries from unexpectedly disappearing mid-test.
+      super(context, Function.identity());
+    }
+  }
+
+  private static class TestDefaultDriverContext extends DefaultDriverContext {
+    public TestDefaultDriverContext(
+        DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
+      super(configLoader, programmaticArguments);
+    }
+
+    @Override
+    protected RequestProcessorRegistry buildRequestProcessorRegistry() {
+      // Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong
+      // prepared statement cache, see JAVA-3062
+      List<RequestProcessor<?, ?>> processors =
+          BuiltInRequestProcessors.createDefaultProcessors(this);
+      processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor);
+      processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor);
+      CqlPrepareAsyncProcessor asyncProcessor =
+          new PreparedStatementCancellationIT.TestCqlPrepareAsyncProcessor(Optional.of(this));
+      processors.add(2, asyncProcessor);
+      processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor));
+      return new RequestProcessorRegistry(
+          getSessionName(), processors.toArray(new RequestProcessor[0]));
+    }
+  }
+
+  private static class TestSessionBuilder extends SessionBuilder {
+
+    @Override
+    protected Object wrap(@NonNull CqlSession defaultSession) {
+      return defaultSession;
+    }
+
+    @Override
+    protected DriverContext buildContext(
+        DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
+      return new PreparedStatementCancellationIT.TestDefaultDriverContext(
+          configLoader, programmaticArguments);
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() {
+    System.setProperty(
+        SessionUtils.SESSION_BUILDER_CLASS_PROPERTY,
+        PreparedStatementCancellationIT.class.getName());
+  }
+
+  @AfterClass
+  public static void teardownAfterClass() {
+    System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY);
+  }
+
+  public static SessionBuilder builder() {
+    return new PreparedStatementCancellationIT.TestSessionBuilder();
+  }
+
   @Before
   public void setup() {
 
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java
index c00cf06..77a449d 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java
@@ -31,6 +31,7 @@
 import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
 import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.cql.Statement;
 import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
 import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer;
 import com.datastax.oss.driver.api.testinfra.session.SessionRule;
@@ -67,19 +68,15 @@
     CqlSession session = sessionRule.session();
     SchemaChangeSynchronizer.withLock(
         () -> {
-          session.execute("DROP TABLE IF EXISTS test_reactive_read");
-          session.execute("DROP TABLE IF EXISTS test_reactive_write");
+          session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read"));
+          session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write"));
           session.checkSchemaAgreement();
           session.execute(
-              SimpleStatement.builder(
-                      "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
-                  .setExecutionProfile(sessionRule.slowProfile())
-                  .build());
+              createSlowStatement(
+                  "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
           session.execute(
-              SimpleStatement.builder(
-                      "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
-                  .setExecutionProfile(sessionRule.slowProfile())
-                  .build());
+              createSlowStatement(
+                  "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
           session.checkSchemaAgreement();
         });
     for (int i = 0; i < 1000; i++) {
@@ -92,6 +89,12 @@
     }
   }
 
+  static Statement<?> createSlowStatement(String statement) {
+    return SimpleStatement.builder(statement)
+        .setExecutionProfile(sessionRule.slowProfile())
+        .build();
+  }
+
   @Before
   public void truncateTables() throws Exception {
     CqlSession session = sessionRule.session();
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java
index df55719..728bd3c 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java
@@ -151,11 +151,11 @@
     sessionRule
         .session()
         .execute(
-            SimpleStatement.builder("CREATE TABLE foo(k int primary key)")
+            SimpleStatement.builder("CREATE TABLE foo_schema_it(k int primary key)")
                 .setExecutionProfile(slowProfile)
                 .build());
     assertThat(session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
-        .doesNotContainKey(CqlIdentifier.fromInternal("foo"));
+        .doesNotContainKey(CqlIdentifier.fromInternal("foo_schema_it"));
 
     // Reset to config value (true), should refresh and load the new table
     session.setSchemaMetadataEnabled(null);
@@ -167,7 +167,7 @@
             () ->
                 assertThat(
                         session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
-                    .containsKey(CqlIdentifier.fromInternal("foo")));
+                    .containsKey(CqlIdentifier.fromInternal("foo_schema_it")));
   }
 
   @Test
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
index e612121..716dc1b 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
@@ -174,11 +174,13 @@
       // trigger node1 UP -> DOWN
       eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));
 
-      Thread.sleep(expireAfter.toMillis());
+      Thread.sleep(expireAfter.toMillis() + 100);
 
       // then node-level metrics should be evicted from node1, but
       // node2 and node3 metrics should not have been evicted
-      await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
+      await()
+          .atMost(Duration.ofMinutes(2))
+          .untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
       assertNodeMetricsNotEvicted(session, node2);
       assertNodeMetricsNotEvicted(session, node3);
 
@@ -226,7 +228,8 @@
       eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
       eventBus.fire(NodeStateEvent.added(node3));
 
-      Thread.sleep(expireAfter.toMillis());
+      // Add a small buffer to ensure the timeout would have fired if it wasn't cancelled
+      Thread.sleep(expireAfter.toMillis() + 100);
 
       // then no node-level metrics should be evicted
       assertNodeMetricsNotEvicted(session, node1);
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java
index 804a078..ff6f3a9 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java
@@ -22,12 +22,14 @@
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
 import com.datastax.oss.driver.api.core.data.UdtValue;
 import com.datastax.oss.driver.api.core.type.UserDefinedType;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
 import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
 import com.datastax.oss.driver.api.testinfra.session.SessionRule;
 import com.datastax.oss.driver.categories.ParallelizableTests;
+import java.time.Duration;
 import java.util.Objects;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,23 +49,31 @@
   @Test
   public void should_decoding_udt_be_backward_compatible() {
     CqlSession session = sessionRule.session();
-    session.execute("CREATE TYPE test_type_1 (a text, b int)");
-    session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
+    session.execute(
+        SimpleStatement.newInstance("CREATE TYPE test_type_udt_1 (a text, b int)")
+            .setTimeout(Duration.ofSeconds(20)));
+    session.execute(
+        SimpleStatement.newInstance(
+                "CREATE TABLE test_table_udt_1 (e int primary key, f frozen<test_type_udt_1>)")
+            .setTimeout(Duration.ofSeconds(20)));
     // insert a row using version 1 of the UDT schema
-    session.execute("INSERT INTO test_table_1(e, f) VALUES(1, {a: 'a', b: 1})");
+    session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(1, {a: 'a', b: 1})");
     UserDefinedType udt =
         session
             .getMetadata()
             .getKeyspace(sessionRule.keyspace())
-            .flatMap(ks -> ks.getUserDefinedType("test_type_1"))
+            .flatMap(ks -> ks.getUserDefinedType("test_type_udt_1"))
             .orElseThrow(IllegalStateException::new);
     TypeCodec<?> oldCodec = session.getContext().getCodecRegistry().codecFor(udt);
     // update UDT schema
-    session.execute("ALTER TYPE test_type_1 add i text");
+    session.execute(
+        SimpleStatement.newInstance("ALTER TYPE test_type_udt_1 add i text")
+            .setTimeout(Duration.ofSeconds(20)));
     // insert a row using version 2 of the UDT schema
-    session.execute("INSERT INTO test_table_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})");
+    session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})");
     Row row =
-        Objects.requireNonNull(session.execute("SELECT f FROM test_table_1 WHERE e = ?", 2).one());
+        Objects.requireNonNull(
+            session.execute("SELECT f FROM test_table_udt_1 WHERE e = ?", 2).one());
     // Try to read new row with old codec. Using row.getUdtValue() would not cause any issues,
     // because new codec will be automatically registered (using all 3 attributes).
     // If application leverages generic row.get(String, Codec) method, data reading with old codec
diff --git a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java
index cb7e86b..f152072 100644
--- a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java
+++ b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java
@@ -35,12 +35,14 @@
   public static CompositeOption commonBundles() {
     return () ->
         options(
-            mavenBundle("org.apache.cassandra", "java-driver-guava-shaded").versionAsInProject(),
-            mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject(),
-            mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),
-            mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject(),
-            mavenBundle("com.typesafe", "config").versionAsInProject(),
-            mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject(),
+            mavenBundle("org.apache.cassandra", "java-driver-guava-shaded")
+                .versionAsInProject()
+                .startLevel(1),
+            mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject().startLevel(1),
+            mavenBundle("org.slf4j", "slf4j-api").versionAsInProject().startLevel(1),
+            mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject().startLevel(1),
+            mavenBundle("com.typesafe", "config").versionAsInProject().startLevel(1),
+            mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject().startLevel(1),
             logbackBundles(),
             debugOptions());
   }
@@ -51,7 +53,7 @@
             systemProperty("cassandra.contactpoints").value("127.0.0.1"),
             systemProperty("cassandra.port").value("9042"),
             systemProperty("cassandra.keyspace").value("test_osgi"),
-            bundle("reference:file:target/classes"));
+            bundle("reference:file:target/classes").startLevel(3));
   }
 
   public static UrlProvisionOption driverCoreBundle() {
@@ -59,15 +61,15 @@
   }
 
   public static UrlProvisionOption driverCoreShadedBundle() {
-    return bundle("reference:file:../core-shaded/target/classes");
+    return bundle("reference:file:../core-shaded/target/classes").startLevel(1);
   }
 
   public static UrlProvisionOption driverQueryBuilderBundle() {
-    return bundle("reference:file:../query-builder/target/classes");
+    return bundle("reference:file:../query-builder/target/classes").startLevel(2);
   }
 
   public static UrlProvisionOption driverMapperRuntimeBundle() {
-    return bundle("reference:file:../mapper-runtime/target/classes");
+    return bundle("reference:file:../mapper-runtime/target/classes").startLevel(2);
   }
 
   public static UrlProvisionOption driverTestInfraBundle() {