blob: 4c344ec2bed4869fbde9e30b916887511dde1f7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.data;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.clients.SslConfig;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/**
* Tests the cache in {@link CassandraDataSourceHelper}
*/
public class CassandraDataSourceHelperCacheTest
{
public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
ImmutableMap.of("sidecar_instances", "127.0.0.1",
"keyspace", "big-data",
"table", "customers");
private CacheTicker cacheTicker;
@Before
public void setup()
{
cacheTicker = new CacheTicker();
CassandraDataSourceHelper.initCassandraDataSourceCache(cacheTicker);
}
@Test
public void testCassandraDataLayerCacheExpiration() throws ExecutionException
{
Map<String, String> options = new HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
Cache<Map<String, String>, CassandraDataLayer> cassandraDataLayerCache = CassandraDataSourceHelper.getCassandraDataLayerCache();
CassandraDataLayer dataLayer = getCassandraDataLayer(cassandraDataLayerCache, options, options);
assertNotNull(dataLayer);
// Advance ticker 1 minute
cacheTicker.advance(1, TimeUnit.MINUTES);
// Should get the same instance
CassandraDataLayer dataLayer1 = getCassandraDataLayer(cassandraDataLayerCache, options, options);
assertSame(dataLayer, dataLayer1);
// Advance ticker 1 hour
cacheTicker.advance(1, TimeUnit.HOURS);
// Should still get the same instance
CassandraDataLayer dataLayer2 = getCassandraDataLayer(cassandraDataLayerCache, options, options);
assertSame(dataLayer, dataLayer2);
// Advance ticker 10 hours and 58 minutes and 59 seconds
cacheTicker.advance(10, TimeUnit.HOURS);
cacheTicker.advance(58, TimeUnit.MINUTES);
cacheTicker.advance(59, TimeUnit.SECONDS);
// Should still get the same instance
CassandraDataLayer dataLayer3 = getCassandraDataLayer(cassandraDataLayerCache, options, options);
assertSame(dataLayer, dataLayer3);
// Advance ticker for 1 second
cacheTicker.advance(1, TimeUnit.SECONDS);
// Should get a different instance
CassandraDataLayer dataLayer4 = getCassandraDataLayer(cassandraDataLayerCache, options, options);
assertNotSame(dataLayer, dataLayer4);
}
@Test
public void testMultipleThreadsAccessingTheSameKey() throws InterruptedException, ExecutionException
{
Map<String, String> options = new HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
Cache<Map<String, String>, CassandraDataLayer> cassandraDataLayerCache = CassandraDataSourceHelper.getCassandraDataLayerCache();
int threads = 20;
ExecutorService pool = Executors.newFixedThreadPool(threads);
CassandraDataLayer[] cassandraDataLayerArray = new CassandraDataLayer[threads];
CountDownLatch latch = new CountDownLatch(threads);
for (int thread = 0; thread < threads; thread++)
{
int finalI = thread;
pool.submit(() -> {
try
{
// Invoke getCassandraDataLayer roughly at the same time
latch.countDown();
latch.await();
// The first thread to win creates the object, the rest should get the same instance
cassandraDataLayerArray[finalI] = getCassandraDataLayer(cassandraDataLayerCache, options, options);
}
catch (InterruptedException | ExecutionException exception)
{
throw new RuntimeException(exception);
}
});
}
pool.shutdown();
assertTrue(pool.awaitTermination(1, TimeUnit.MINUTES));
for (int thread = 1; thread < threads; thread++)
{
assertSame(cassandraDataLayerArray[0], cassandraDataLayerArray[thread]);
}
// Advance ticker for 12 hours
cacheTicker.advance(12, TimeUnit.HOURS);
assertNotSame(cassandraDataLayerArray[0], getCassandraDataLayer(cassandraDataLayerCache, options, options));
}
private CassandraDataLayer getCassandraDataLayer(Cache<Map<String, String>, CassandraDataLayer> cassandraDataLayerCache,
Map<String, String> key,
Map<String, String> options) throws ExecutionException
{
return cassandraDataLayerCache.get(key, () -> {
CassandraDataLayer.ClientConfig config = CassandraDataLayer.ClientConfig.create(options);
return new CassandraDataLayer(config, Sidecar.ClientConfig.create(options), SslConfig.create(options));
});
}
private static class CacheTicker extends Ticker
{
private final AtomicLong nanos = new AtomicLong(0);
/**
* Returns the number of nanoseconds elapsed since this ticker's fixed point of reference
*/
@Override
public long read()
{
return nanos.get();
}
/**
* Artificially advance time for a given {@code value} in the given {@link TimeUnit}
*
* @param value the value to advance
* @param unit the {@link TimeUnit} for the given {@code value}
*/
public void advance(long value, TimeUnit unit)
{
nanos.addAndGet(unit.toNanos(value));
}
}
}