blob: b5c93de5b10372a14a4dc1dc6a9c7746c2d66ffb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.db;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.TestModule;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.server.MainModule;
import org.apache.cassandra.sidecar.server.Server;
import org.mockito.stubbing.Answer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link SidecarSchema} setup.
*/
@ExtendWith(VertxExtension.class)
public class SidecarSchemaTest
{
private static final Logger logger = LoggerFactory.getLogger(SidecarSchemaTest.class);
public static final String DEFAULT_SIDECAR_SCHEMA_KEYSPACE_NAME = "sidecar_internal";
private static List<String> interceptedExecStmts = new ArrayList<>();
private static List<String> interceptedPrepStmts = new ArrayList<>();
private Vertx vertx;
private SidecarSchema sidecarSchema;
Server server;
@BeforeEach
public void setUp() throws InterruptedException
{
Injector injector = Guice.createInjector(Modules.override(new MainModule())
.with(Modules.override(new TestModule())
.with(new SidecarSchemaTestModule())));
this.vertx = injector.getInstance(Vertx.class);
server = injector.getInstance(Server.class);
sidecarSchema = injector.getInstance(SidecarSchema.class);
VertxTestContext context = new VertxTestContext();
server.start()
.onSuccess(s -> context.completeNow())
.onFailure(context::failNow);
context.awaitCompletion(5, TimeUnit.SECONDS);
}
@AfterEach
public void tearDown() throws InterruptedException
{
interceptedExecStmts.clear();
interceptedPrepStmts.clear();
CountDownLatch closeLatch = new CountDownLatch(1);
vertx.close(result -> closeLatch.countDown());
if (closeLatch.await(60, TimeUnit.SECONDS))
logger.info("Close event received before timeout.");
else
logger.error("Close event timed out.");
}
@Test
public void testSchemaInitOnStartup(VertxTestContext context)
{
sidecarSchema.startSidecarSchemaInitializer();
context.verify(() -> {
int maxWaitTime = 20; // about 10 seconds
while (interceptedPrepStmts.size() < 10
|| interceptedExecStmts.size() < 3
|| !sidecarSchema.isInitialized())
{
if (maxWaitTime-- <= 0)
{
context.failNow("test timeout");
break;
}
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
assertEquals(3, interceptedExecStmts.size());
assertTrue(interceptedExecStmts.get(0).contains("CREATE KEYSPACE IF NOT EXISTS sidecar_internal"),
"Create keyspace should be executed the first");
assertTrue(hasElementContains(interceptedExecStmts,
"CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v2"),
"Create table should be executed the next for job table");
assertTrue(hasElementContains(interceptedExecStmts,
"CREATE TABLE IF NOT EXISTS sidecar_internal.restore_slice_v2"),
"Create table should be executed the next for slice table");
List<String> expectedPrepStatements = Arrays.asList(
"INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, keyspace_name, table_name, " +
"job_agent, status, blob_secrets, import_options, consistency_level, expire_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, blob_secrets) VALUES (?, ? ,?)",
"INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, status) VALUES (?, ?, ?)",
"INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, job_agent) VALUES (?, ?, ?)",
"INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, expire_at) VALUES (?, ?, ?)",
"SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " +
"consistency_level, expire_at FROM sidecar_internal.restore_job_v2 WHERE created_at = ? AND job_id = ?",
"SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " +
"consistency_level, expire_at FROM sidecar_internal.restore_job_v2 WHERE created_at = ?",
"INSERT INTO sidecar_internal.restore_slice_v2 ( job_id, bucket_id, slice_id, bucket, key, " +
"checksum, start_token, end_token, compressed_size, uncompressed_size, status_by_replica, " +
"all_replicas) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"SELECT job_id, bucket_id, slice_id, bucket, key, checksum, start_token, end_token, compressed_size, " +
"uncompressed_size, status_by_replica, all_replicas FROM sidecar_internal.restore_slice_v2 " +
"WHERE job_id = ? AND bucket_id = ? AND end_token >= ? AND start_token < ? ALLOW FILTERING",
"UPDATE sidecar_internal.restore_slice_v2 SET status_by_replica = status_by_replica + ?, " +
"all_replicas = all_replicas + ? WHERE job_id = ? AND bucket_id = ? AND start_token = ? AND slice_id = ?"
);
Set<String> expected = new HashSet<>(expectedPrepStatements);
Set<String> actual = new HashSet<>(interceptedPrepStmts);
Set<String> notInExpected = Sets.difference(actual, expected);
assertEquals(expected.size(), actual.size(), "Number of prepared statements should match");
assertTrue(notInExpected.isEmpty(),
"Found the following statements that not in expected: " + notInExpected);
assertTrue(sidecarSchema.isInitialized());
context.completeNow();
});
}
/**
* Test module override for {@link SidecarSchemaTest}
*/
public static class SidecarSchemaTestModule extends AbstractModule
{
public final boolean intercept;
public SidecarSchemaTestModule()
{
this(true);
}
public SidecarSchemaTestModule(boolean intercept)
{
this.intercept = intercept;
}
@Provides
@Singleton
public CQLSessionProvider cqlSessionProvider()
{
CQLSessionProvider cqlSession = mock(CQLSessionProvider.class);
Session session = mock(Session.class, RETURNS_DEEP_STUBS);
KeyspaceMetadata ks = mock(KeyspaceMetadata.class);
when(ks.getTable(anyString())).thenReturn(null);
when(session.getCluster()
.getMetadata()
.getKeyspace(anyString())).thenAnswer((Answer<KeyspaceMetadata>) invocation -> {
if (DEFAULT_SIDECAR_SCHEMA_KEYSPACE_NAME.equals(invocation.getArgument(0)))
{
return null;
}
return ks;
});
when(session.execute(any(String.class))).then(invocation -> {
if (intercept)
{
interceptedExecStmts.add(invocation.getArgument(0));
}
ResultSet rs = mock(ResultSet.class);
ExecutionInfo ei = mock(ExecutionInfo.class);
when(ei.isSchemaInAgreement()).thenReturn(true);
when(rs.getExecutionInfo()).thenReturn(ei);
return rs;
});
when(session.prepare(any(String.class))).then(invocation -> {
if (intercept)
{
interceptedPrepStmts.add(invocation.getArgument(0));
}
PreparedStatement ps = mock(PreparedStatement.class);
BoundStatement stmt = mock(BoundStatement.class);
when(ps.bind(any(Object[].class))).thenReturn(stmt);
return ps;
});
when(cqlSession.get()).thenReturn(session);
return cqlSession;
}
@Provides
@Singleton
public InstancesConfig instancesConfig()
{
InstanceMetadata instanceMeta = mock(InstanceMetadata.class);
when(instanceMeta.stagingDir()).thenReturn("/tmp/staging"); // not an actual file
InstancesConfig instancesConfig = mock(InstancesConfig.class);
when(instancesConfig.instances()).thenReturn(Collections.singletonList(instanceMeta));
when(instancesConfig.instanceFromHost(any())).thenReturn(instanceMeta);
when(instancesConfig.instanceFromId(anyInt())).thenReturn(instanceMeta);
return instancesConfig;
}
}
private boolean hasElementContains(List<String> list, String substring)
{
for (String s : list)
{
if (s.contains(substring))
{
return true;
}
}
return false;
}
}