blob: f4ecb6c71a121678f7a871658805e05ba6d086e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.testing.FakeTicker;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.github.benmanes.caffeine.cache.Cache;
import io.vertx.core.Future;
import org.apache.cassandra.sidecar.config.CacheConfiguration;
import org.apache.cassandra.sidecar.config.SSTableImportConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.yaml.CacheConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SSTableImportConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Unit tests for the {@link CacheFactory} class
*/
class CacheFactoryTest
{
static final long SSTABLE_IMPORT_EXPIRE_AFTER_ACCESS_MILLIS = TimeUnit.HOURS.toMillis(2);
static final long SSTABLE_IMPORT_CACHE_MAX_SIZE = 10L;
private CacheFactory cacheFactory;
private FakeTicker fakeTicker;
@BeforeEach
void setup()
{
fakeTicker = new FakeTicker();
CacheConfiguration ssTableImportCacheConfiguration =
new CacheConfigurationImpl(SSTABLE_IMPORT_EXPIRE_AFTER_ACCESS_MILLIS, // 2 hours
SSTABLE_IMPORT_CACHE_MAX_SIZE);
SSTableImportConfiguration ssTableImportConfiguration =
new SSTableImportConfigurationImpl(ssTableImportCacheConfiguration);
ServiceConfiguration serviceConfiguration = new ServiceConfigurationImpl(ssTableImportConfiguration);
SSTableImporter mockSSTableImporter = mock(SSTableImporter.class);
cacheFactory = new CacheFactory(serviceConfiguration, mockSSTableImporter, fakeTicker::read);
}
@Test
void testSSTableImportCacheExpiration() throws ExecutionException, InterruptedException
{
Cache<SSTableImporter.ImportOptions, Future<Void>> cache = cacheFactory.ssTableImportCache();
SSTableImporter.ImportOptions type1Options1 = buildImportOptions("ks", "tbl", "uuid");
SSTableImporter.ImportOptions type1Options2 = buildImportOptions("ks", "tbl", "uuid");
SSTableImporter.ImportOptions type1Options3 = buildImportOptions("ks", "tbl", "uuid");
SSTableImporter.ImportOptions type1Options4 = buildImportOptions("ks", "tbl", "uuid");
SSTableImporter.ImportOptions type1Options5 = buildImportOptions("ks", "tbl", "uuid");
SSTableImporter.ImportOptions type2Options1 = buildImportOptions("ks2", "tbl2", "uuid");
Void result1 = ssTableImportCacheEntry(cache, type1Options1, mock(Void.class));
Void type2Result1 = ssTableImportCacheEntry(cache, type2Options1, mock(Void.class));
assertThat(result1).isNotNull()
.isNotSameAs(type2Result1);
// advance ticker 1 minute
fakeTicker.advance(1, TimeUnit.MINUTES);
// should get the same instance, type1Options2.equals(type1Options1)
Void result2 = ssTableImportCacheEntry(cache, type1Options2, mock(Void.class));
assertThat(result2).isSameAs(result1); // same instance
// advance ticker 1 hour
fakeTicker.advance(1, TimeUnit.HOURS);
// should still get the same instance
Void result3 = ssTableImportCacheEntry(cache, type1Options3, mock(Void.class));
assertThat(result3).isSameAs(result1);
// advance ticker 1 hour and 59 minutes and 59 seconds
fakeTicker.advance(1, TimeUnit.HOURS);
fakeTicker.advance(59, TimeUnit.MINUTES);
fakeTicker.advance(59, TimeUnit.SECONDS);
// should still get the same instance
Void result4 = ssTableImportCacheEntry(cache, type1Options4, mock(Void.class));
assertThat(result4).isSameAs(result1);
// advance ticker for 2 hours
fakeTicker.advance(2, TimeUnit.HOURS);
// should get a different instance
Void result5 = ssTableImportCacheEntry(cache, type1Options5, mock(Void.class));
assertThat(result5).isNotSameAs(result1);
}
@Test
void testSSTableImportCacheLimit() throws ExecutionException, InterruptedException
{
Cache<SSTableImporter.ImportOptions, Future<Void>> cache = cacheFactory.ssTableImportCache();
cache.invalidateAll(); // make sure our cache is emptied out before testing
int n = (int) SSTABLE_IMPORT_CACHE_MAX_SIZE * 2;
for (int i = 0; i < n; i++)
{
Void mockVoid = mock(Void.class);
SSTableImporter.ImportOptions importOptions = buildImportOptions("ks" + i, "tbl" + i, "uuid" + i);
Void result = ssTableImportCacheEntry(cache, importOptions, mockVoid);
assertThat(result).isNotNull();
}
assertThat(cache.estimatedSize()).isLessThanOrEqualTo(SSTABLE_IMPORT_CACHE_MAX_SIZE);
}
@Test
void testConcurrentThreadsAccessingSameKey() throws InterruptedException, ExecutionException
{
Cache<SSTableImporter.ImportOptions, Future<Void>> cache = cacheFactory.ssTableImportCache();
final int nThreads = 20;
final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
final Void[] voidArray = new Void[nThreads];
final CountDownLatch latch = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++)
{
final int finalI = i;
pool.submit(() -> {
try
{
// Invoke getDirectory roughly at the same time
latch.countDown();
latch.await();
SSTableImporter.ImportOptions importOptions = buildImportOptions("ks", "tbl", "uuid");
// The first thread to win creates the object, the rest should get the same instance
voidArray[finalI] = ssTableImportCacheEntry(cache, importOptions, mock(Void.class));
fakeTicker.advance(1, TimeUnit.MINUTES);
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
});
}
pool.shutdown();
assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
for (int i = 1; i < nThreads; i++)
{
assertThat(voidArray[i]).isSameAs(voidArray[0]);
}
// advance ticker for 4 hours
fakeTicker.advance(4, TimeUnit.HOURS);
SSTableImporter.ImportOptions importOptions = buildImportOptions("ks", "tbl", "uuid");
assertThat(ssTableImportCacheEntry(cache, importOptions, mock(Void.class))).isNotSameAs(voidArray[0]);
}
private Void ssTableImportCacheEntry(Cache<SSTableImporter.ImportOptions, Future<Void>> cache,
SSTableImporter.ImportOptions key, Void value)
throws ExecutionException, InterruptedException
{
Future<Void> voidFuture = cache.get(key, k -> Future.succeededFuture(value));
assertThat(voidFuture).isNotNull();
return voidFuture.toCompletionStage().toCompletableFuture().get();
}
private static SSTableImporter.ImportOptions buildImportOptions(String keyspace, String tableName, String uuid)
{
return new SSTableImporter.ImportOptions.Builder()
.keyspace(keyspace)
.tableName(tableName)
.directory("/tmp/" + uuid)
.uploadId(uuid)
.host("localhost")
.build();
}
}