blob: 7d3a164cb4ec7c6319175d3d0e5eb83c0101c242 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.auth;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Policy;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTH_CACHE_WARMING_MAX_RETRIES;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTH_CACHE_WARMING_RETRY_INTERVAL_MS;
import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION;
public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable
{
private static final Logger logger = LoggerFactory.getLogger(AuthCache.class);
public static final String MBEAN_NAME_BASE = "org.apache.cassandra.auth:type=";
private volatile ScheduledFuture cacheRefresher = null;
// Keep a handle on created instances so their executors can be terminated cleanly
private static final Set<Shutdownable> REGISTRY = new HashSet<>(4);
public static void shutdownAllAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownNowAndWait(timeout, unit, REGISTRY);
}
/**
* Underlying cache. LoadingCache will call underlying load function on {@link #get} if key is not present
*/
protected volatile LoadingCache<K, V> cache;
private ExecutorPlus cacheRefreshExecutor;
private final String name;
private final IntConsumer setValidityDelegate;
private final IntSupplier getValidityDelegate;
private final IntConsumer setUpdateIntervalDelegate;
private final IntSupplier getUpdateIntervalDelegate;
private final IntConsumer setMaxEntriesDelegate;
private final IntSupplier getMaxEntriesDelegate;
private final Consumer<Boolean> setActiveUpdate;
private final BooleanSupplier getActiveUpdate;
private final Function<K, V> loadFunction;
private final Supplier<Map<K, V>> bulkLoadFunction;
private final BooleanSupplier enableCache;
// Determines whether the presence of a specific value should trigger the invalidation of
// the supplied key. Used by CredentialsCache & CacheRefresher to identify when the
// credentials for a role couldn't be loaded without throwing an exception or serving stale
// values until the natural expiry time.
private final BiPredicate<K, V> invalidateCondition;
/**
* @param name Used for MBean
* @param setValidityDelegate Used to set cache validity period. See {@link Policy#expireAfterWrite()}
* @param getValidityDelegate Getter for validity period
* @param setUpdateIntervalDelegate Used to set cache update interval. See {@link Policy#refreshAfterWrite()}
* @param getUpdateIntervalDelegate Getter for update interval
* @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
* @param getMaxEntriesDelegate Getter for max entries.
* @param setActiveUpdate Method to process config to actively update the auth cache prior to configured cache expiration
* @param getActiveUpdate Getter for active update
* @param loadFunction Function to load the cache. Called on {@link #get(Object)}
* @param cacheEnabledDelegate Used to determine if cache is enabled.
*/
protected AuthCache(String name,
IntConsumer setValidityDelegate,
IntSupplier getValidityDelegate,
IntConsumer setUpdateIntervalDelegate,
IntSupplier getUpdateIntervalDelegate,
IntConsumer setMaxEntriesDelegate,
IntSupplier getMaxEntriesDelegate,
Consumer<Boolean> setActiveUpdate,
BooleanSupplier getActiveUpdate,
Function<K, V> loadFunction,
Supplier<Map<K, V>> bulkLoadFunction,
BooleanSupplier cacheEnabledDelegate)
{
this(name,
setValidityDelegate,
getValidityDelegate,
setUpdateIntervalDelegate,
getUpdateIntervalDelegate,
setMaxEntriesDelegate,
getMaxEntriesDelegate,
setActiveUpdate,
getActiveUpdate,
loadFunction,
bulkLoadFunction,
cacheEnabledDelegate,
(k, v) -> false);
}
/**
* @param name Used for MBean
* @param setValidityDelegate Used to set cache validity period. See {@link Policy#expireAfterWrite()}
* @param getValidityDelegate Getter for validity period
* @param setUpdateIntervalDelegate Used to set cache update interval. See {@link Policy#refreshAfterWrite()}
* @param getUpdateIntervalDelegate Getter for update interval
* @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
* @param getMaxEntriesDelegate Getter for max entries.
* @param setActiveUpdate Actively update the cache before expiry
* @param getActiveUpdate Getter for active update
* @param loadFunction Function to load the cache. Called on {@link #get(Object)}
* @param cacheEnabledDelegate Used to determine if cache is enabled.
* @param invalidationCondition Used during active updates to determine if a refreshed value indicates a missing
* entry in the underlying table. If satisfied, the key will be invalidated.
*/
protected AuthCache(String name,
IntConsumer setValidityDelegate,
IntSupplier getValidityDelegate,
IntConsumer setUpdateIntervalDelegate,
IntSupplier getUpdateIntervalDelegate,
IntConsumer setMaxEntriesDelegate,
IntSupplier getMaxEntriesDelegate,
Consumer<Boolean> setActiveUpdate,
BooleanSupplier getActiveUpdate,
Function<K, V> loadFunction,
Supplier<Map<K, V>> bulkLoadFunction,
BooleanSupplier cacheEnabledDelegate,
BiPredicate<K, V> invalidationCondition)
{
this.name = checkNotNull(name);
this.setValidityDelegate = checkNotNull(setValidityDelegate);
this.getValidityDelegate = checkNotNull(getValidityDelegate);
this.setUpdateIntervalDelegate = checkNotNull(setUpdateIntervalDelegate);
this.getUpdateIntervalDelegate = checkNotNull(getUpdateIntervalDelegate);
this.setMaxEntriesDelegate = checkNotNull(setMaxEntriesDelegate);
this.getMaxEntriesDelegate = checkNotNull(getMaxEntriesDelegate);
this.setActiveUpdate = checkNotNull(setActiveUpdate);
this.getActiveUpdate = checkNotNull(getActiveUpdate);
this.loadFunction = checkNotNull(loadFunction);
this.bulkLoadFunction = checkNotNull(bulkLoadFunction);
this.enableCache = checkNotNull(cacheEnabledDelegate);
this.invalidateCondition = checkNotNull(invalidationCondition);
init();
}
/**
* Do setup for the cache and MBean.
*/
protected void init()
{
this.cacheRefreshExecutor = executorFactory().sequential(name + "Refresh");
cache = initCache(null);
MBeanWrapper.instance.registerMBean(this, getObjectName());
REGISTRY.add(this);
}
protected void unregisterMBean()
{
MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
}
protected String getObjectName()
{
return MBEAN_NAME_BASE + name;
}
/**
* Retrieve all cached entries. Will call {@link LoadingCache#asMap()} which does not trigger "load".
* @return a map of cached key-value pairs
*/
public Map<K, V> getAll()
{
if (cache == null)
return Collections.emptyMap();
return Collections.unmodifiableMap(cache.asMap());
}
/**
* Retrieve a value from the cache. Will call {@link LoadingCache#get(Object)} which will
* "load" the value if it's not present, thus populating the key.
* @param k
* @return The current value of {@code K} if cached or loaded.
*
* See {@link LoadingCache#get(Object)} for possible exceptions.
*/
public V get(K k)
{
if (cache == null)
return loadFunction.apply(k);
return cache.get(k);
}
/**
* Invalidate the entire cache.
*/
public synchronized void invalidate()
{
cache = initCache(null);
}
/**
* Invalidate a key.
* @param k key to invalidate
*/
public void invalidate(K k)
{
if (cache != null)
cache.invalidate(k);
}
/**
* Time in milliseconds that a value in the cache will expire after.
* @param validityPeriod in milliseconds
*/
public synchronized void setValidity(int validityPeriod)
{
if (DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION.getBoolean())
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
setValidityDelegate.accept(validityPeriod);
cache = initCache(cache);
}
public int getValidity()
{
return getValidityDelegate.getAsInt();
}
/**
* Time in milliseconds after which an entry in the cache should be refreshed (it's load function called again)
* @param updateInterval in milliseconds
*/
public synchronized void setUpdateInterval(int updateInterval)
{
if (DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION.getBoolean())
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
setUpdateIntervalDelegate.accept(updateInterval);
cache = initCache(cache);
}
public int getUpdateInterval()
{
return getUpdateIntervalDelegate.getAsInt();
}
/**
* Set maximum number of entries in the cache.
* @param maxEntries
*/
public synchronized void setMaxEntries(int maxEntries)
{
if (DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION.getBoolean())
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
setMaxEntriesDelegate.accept(maxEntries);
cache = initCache(cache);
}
public int getMaxEntries()
{
return getMaxEntriesDelegate.getAsInt();
}
public boolean getActiveUpdate()
{
return getActiveUpdate.getAsBoolean();
}
public synchronized void setActiveUpdate(boolean update)
{
if (DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION.getBoolean())
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
setActiveUpdate.accept(update);
cache = initCache(cache);
}
public long getEstimatedSize()
{
return cache == null ? 0L : cache.estimatedSize();
}
/**
* (Re-)initialise the underlying cache. Will update validity, max entries, and update interval if
* any have changed. The underlying {@link LoadingCache} will be initiated based on the provided {@code loadFunction}.
* Note: If you need some unhandled cache setting to be set you should extend {@link AuthCache} and override this method.
* @param existing If not null will only update cache update validity, max entries, and update interval.
* @return New {@link LoadingCache} if existing was null, otherwise the existing {@code cache}
*/
protected LoadingCache<K, V> initCache(LoadingCache<K, V> existing)
{
if (!enableCache.getAsBoolean())
return null;
if (getValidity() <= 0)
return null;
boolean activeUpdate = getActiveUpdate();
logger.info("(Re)initializing {} (validity period/update interval/max entries/active update) ({}/{}/{}/{})",
name, getValidity(), getUpdateInterval(), getMaxEntries(), activeUpdate);
LoadingCache<K, V> updatedCache;
if (existing == null)
{
updatedCache = Caffeine.newBuilder().refreshAfterWrite(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS)
.expireAfterWrite(getValidity(), TimeUnit.MILLISECONDS)
.maximumSize(getMaxEntries())
.executor(cacheRefreshExecutor)
.build(loadFunction::apply);
}
else
{
updatedCache = cache;
// Always set as mandatory
cache.policy().refreshAfterWrite().ifPresent(policy ->
policy.setExpiresAfter(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS));
cache.policy().expireAfterWrite().ifPresent(policy -> policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
cache.policy().eviction().ifPresent(policy -> policy.setMaximum(getMaxEntries()));
}
if (cacheRefresher != null)
{
cacheRefresher.cancel(false); // permit the two refreshers to race until the old one dies, should be harmless.
cacheRefresher = null;
}
if (activeUpdate)
{
cacheRefresher = ScheduledExecutors.optionalTasks.scheduleAtFixedRate(CacheRefresher.create(name,
updatedCache,
invalidateCondition),
getUpdateInterval(),
getUpdateInterval(),
TimeUnit.MILLISECONDS);
}
return updatedCache;
}
@Override
public boolean isTerminated()
{
return cacheRefreshExecutor.isTerminated();
}
@Override
public void shutdown()
{
cacheRefreshExecutor.shutdown();
}
@Override
public Object shutdownNow()
{
return cacheRefreshExecutor.shutdownNow();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException
{
return cacheRefreshExecutor.awaitTermination(timeout, units);
}
public void warm()
{
if (cache == null)
{
logger.info("{} cache not enabled, skipping pre-warming", name);
return;
}
int retries = AUTH_CACHE_WARMING_MAX_RETRIES.getInt(10);
long retryInterval = AUTH_CACHE_WARMING_RETRY_INTERVAL_MS.getLong(1000);
while (retries-- > 0)
{
try
{
Map<K, V> entries = bulkLoadFunction.get();
cache.putAll(entries);
break;
}
catch (Exception e)
{
Uninterruptibles.sleepUninterruptibly(retryInterval, TimeUnit.MILLISECONDS);
}
}
}
/*
* Implemented when we can provide an efficient way to bulk load all entries for a cache. This isn't a
* @FunctionalInterface due to the default impl, which is for IRoleManager, IAuthorizer, and INetworkAuthorizer.
* They all extend this interface so that implementations only need to provide an override if it's useful.
* IAuthenticator doesn't implement this interface because CredentialsCache is more tightly coupled to
* PasswordAuthenticator, which does expose a bulk loader.
*/
public interface BulkLoader<K, V>
{
default Supplier<Map<K, V>> bulkLoader()
{
return Collections::emptyMap;
}
}
}