CASSANDRASC-100 Fix flaky SSTableImportHandlerTest
Patch by Saranya Krishnakumar; Reviewed by Francisco Guerrero and Yifan Cai for CASSANDRASC-100
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 3e904fc..7bc3a64 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -129,8 +129,14 @@
this.versionProvider = versionProvider;
this.cqlSessionProvider = session;
this.jmxClient = jmxClient;
- notificationListener = new JmxNotificationListener();
+ notificationListener = initializeJmxListener();
+ }
+
+ protected JmxNotificationListener initializeJmxListener()
+ {
+ JmxNotificationListener notificationListener = new JmxNotificationListener();
this.jmxClient.registerListener(notificationListener);
+ return notificationListener;
}
private void maybeRegisterHostListener(@NotNull Session session)
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
index 86ee719..30bf04b 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
@@ -49,8 +49,10 @@
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.adapters.base.CassandraTableOperations;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.TableOperations;
import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -60,6 +62,7 @@
import org.apache.cassandra.sidecar.server.MainModule;
import org.apache.cassandra.sidecar.server.Server;
import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
+import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_CHECK_INTERVAL;
import static org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_INBOUND_FILE_GLOBAL_BANDWIDTH_LIMIT;
@@ -68,7 +71,6 @@
import static org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_PEAK_OUTBOUND_GLOBAL_BANDWIDTH_LIMIT;
import static org.apache.cassandra.sidecar.snapshots.SnapshotUtils.mockInstancesConfig;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -82,7 +84,7 @@
protected Vertx vertx;
protected Server server;
protected WebClient client;
- protected CassandraAdapterDelegate mockDelegate;
+ protected TestCassandraAdapterDelegate testDelegate;
protected SidecarConfiguration sidecarConfiguration;
@TempDir
protected Path temporaryPath;
@@ -90,13 +92,15 @@
protected SSTableUploadConfiguration mockSSTableUploadConfiguration;
protected TrafficShapingConfiguration trafficShapingConfiguration;
protected SidecarRateLimiter ingressFileRateLimiter;
+ protected CassandraTableOperations mockCFOperations;
+
+
@BeforeEach
void setup() throws InterruptedException, IOException
{
canonicalTemporaryPath = temporaryPath.toFile().getCanonicalPath();
- mockDelegate = mock(CassandraAdapterDelegate.class);
- doNothing().when(mockDelegate).healthCheck();
+ testDelegate = new TestCassandraAdapterDelegate();
TestModule testModule = new TestModule();
mockSSTableUploadConfiguration = mock(SSTableUploadConfiguration.class);
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(3);
@@ -121,7 +125,7 @@
sidecarConfiguration = SidecarConfigurationImpl.builder()
.serviceConfiguration(serviceConfiguration)
.build();
- TestModuleOverride testModuleOverride = new TestModuleOverride(mockDelegate);
+ TestModuleOverride testModuleOverride = new TestModuleOverride(testDelegate);
Injector injector = Guice.createInjector(Modules.override(new MainModule())
.with(Modules.override(testModule)
.with(testModuleOverride)));
@@ -136,7 +140,10 @@
TableMetadata mockTableMetadata = mock(TableMetadata.class);
when(mockMetadata.getKeyspace("ks")).thenReturn(mockKeyspaceMetadata);
when(mockMetadata.getKeyspace("ks").getTable("tbl")).thenReturn(mockTableMetadata);
- when(mockDelegate.metadata()).thenReturn(mockMetadata);
+ testDelegate.setMetadata(mockMetadata);
+
+ mockCFOperations = mock(CassandraTableOperations.class);
+ testDelegate.setTableOperations(mockCFOperations);
VertxTestContext context = new VertxTestContext();
server.start()
@@ -222,4 +229,53 @@
return sidecarConfiguration;
}
}
+
+ static class TestCassandraAdapterDelegate extends CassandraAdapterDelegate
+ {
+ Metadata metadata;
+ TableOperations tableOperations;
+
+ public TestCassandraAdapterDelegate()
+ {
+ super(Vertx.vertx(), 1, null, null, null, null, null, "localhost", 9043);
+ }
+
+ protected JmxNotificationListener initializeJmxListener()
+ {
+ return null;
+ }
+
+ @Override
+ public void healthCheck()
+ {
+ // do nothing
+ }
+
+ @Override
+ public @Nullable Metadata metadata()
+ {
+ return metadata;
+ }
+
+ public void setMetadata(Metadata metadata)
+ {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public @Nullable TableOperations tableOperations()
+ {
+ return tableOperations;
+ }
+
+ public void setTableOperations(TableOperations tableOperations)
+ {
+ this.tableOperations = tableOperations;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
index 4c34a49..c8caf63 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
@@ -40,13 +40,11 @@
import io.vertx.ext.web.client.predicate.ResponsePredicate;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.common.TableOperations;
import static org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_COPY_DATA;
import static org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_INVALIDATE_CACHES;
import static org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_VERIFY_TOKENS;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -110,9 +108,6 @@
{
UUID uploadId = UUID.randomUUID();
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
-
String requestURI = "/api/v1/uploads/" + uploadId + "/keyspaces/ks/tables/table/import";
clientRequest(context, requestURI,
response -> assertThat(response.statusCode())
@@ -124,6 +119,7 @@
{
UUID uploadId = UUID.randomUUID();
createStagedUploadFiles(uploadId);
+ testDelegate.setTableOperations(null);
client.put(server.actualPort(), "localhost", "/api/v1/uploads/"
+ uploadId + "/keyspaces/ks/tables/table/import")
@@ -137,8 +133,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
true, true, true,
@@ -157,8 +151,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
false, true, true,
@@ -182,8 +174,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
false, true, true,
@@ -217,8 +207,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
true, false, true,
@@ -252,8 +240,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
true, true, false,
@@ -287,8 +273,6 @@
{
UUID uploadId = UUID.randomUUID();
Path stagedUploadDirectory = createStagedUploadFiles(uploadId);
- TableOperations mockCFOperations = mock(TableOperations.class);
- when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
String stageDirectoryAbsolutePath = stagedUploadDirectory.toString();
when(mockCFOperations.importNewSSTables("ks", "table", stageDirectoryAbsolutePath,
true, true, true,
@@ -313,6 +297,7 @@
DEFAULT_COPY_DATA);
context.completeNow();
})));
+
assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
}