blob: 42391297a0a61af32291a0ba0bab3e5b2b0d19e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db.cache;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import com.google.common.base.Optional;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.fail;
/**
* Class tests partial table cache.
*/
@RunWith(value = Parameterized.class)
public class TestTableCacheImpl {
private TableCache<CacheKey<String>, CacheValue<String>> tableCache;
private final TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy;
@Parameterized.Parameters
public static Collection<Object[]> policy() {
Object[][] params = new Object[][] {
{TableCacheImpl.CacheCleanupPolicy.NEVER},
{TableCacheImpl.CacheCleanupPolicy.MANUAL}
};
return Arrays.asList(params);
}
public TestTableCacheImpl(
TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy) {
this.cacheCleanupPolicy = cacheCleanupPolicy;
}
@Before
public void create() {
tableCache =
new TableCacheImpl<>(cacheCleanupPolicy);
}
@Test
public void testPartialTableCache() {
for (int i = 0; i< 10; i++) {
tableCache.put(new CacheKey<>(Integer.toString(i)),
new CacheValue<>(Optional.of(Integer.toString(i)), i));
}
for (int i=0; i < 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}
// On a full table cache if some one calls cleanup it is a no-op.
tableCache.cleanup(4);
for (int i=5; i < 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}
}
@Test
public void testPartialTableCacheParallel() throws Exception {
int totalCount = 0;
CompletableFuture<Integer> future =
CompletableFuture.supplyAsync(() -> {
try {
return writeToCache(10, 1, 0);
} catch (InterruptedException ex) {
fail("writeToCache got interrupt exception");
}
return 0;
});
int value = future.get();
Assert.assertEquals(10, value);
totalCount += value;
future =
CompletableFuture.supplyAsync(() -> {
try {
return writeToCache(10, 11, 100);
} catch (InterruptedException ex) {
fail("writeToCache got interrupt exception");
}
return 0;
});
// Check we have first 10 entries in cache.
for (int i=1; i <= 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}
value = future.get();
Assert.assertEquals(10, value);
totalCount += value;
if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) {
int deleted = 5;
// cleanup first 5 entires
tableCache.cleanup(deleted);
// We should totalCount - deleted entries in cache.
final int tc = totalCount;
GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
5000);
// Check if we have remaining entries.
for (int i=6; i <= totalCount; i++) {
Assert.assertEquals(Integer.toString(i), tableCache.get(
new CacheKey<>(Integer.toString(i))).getCacheValue());
}
tableCache.cleanup(10);
tableCache.cleanup(totalCount);
// Cleaned up all entries, so cache size should be zero.
GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
5000);
} else {
tableCache.cleanup(totalCount);
Assert.assertEquals(totalCount, tableCache.size());
}
}
private int writeToCache(int count, int startVal, long sleep)
throws InterruptedException {
int counter = 1;
while (counter <= count){
tableCache.put(new CacheKey<>(Integer.toString(startVal)),
new CacheValue<>(Optional.of(Integer.toString(startVal)), startVal));
startVal++;
counter++;
Thread.sleep(sleep);
}
return count;
}
}