blob: 7d3a164cb4ec7c6319175d3d0e5eb83c0101c242 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.auth;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Policy;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import static;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTH_CACHE_WARMING_MAX_RETRIES;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTH_CACHE_WARMING_RETRY_INTERVAL_MS;
import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_AUTH_CACHES_REMOTE_CONFIGURATION;
public class AuthCache<K, V> implements AuthCacheMBean, Shutdownable
private static final Logger logger = LoggerFactory.getLogger(AuthCache.class);
public static final String MBEAN_NAME_BASE = "org.apache.cassandra.auth:type=";
private volatile ScheduledFuture cacheRefresher = null;
// Keep a handle on created instances so their executors can be terminated cleanly
private static final Set<Shutdownable> REGISTRY = new HashSet<>(4);
public static void shutdownAllAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
ExecutorUtils.shutdownNowAndWait(timeout, unit, REGISTRY);
* Underlying cache. LoadingCache will call underlying load function on {@link #get} if key is not present
protected volatile LoadingCache<K, V> cache;
private ExecutorPlus cacheRefreshExecutor;
private final String name;
private final IntConsumer setValidityDelegate;
private final IntSupplier getValidityDelegate;
private final IntConsumer setUpdateIntervalDelegate;
private final IntSupplier getUpdateIntervalDelegate;
private final IntConsumer setMaxEntriesDelegate;
private final IntSupplier getMaxEntriesDelegate;
private final Consumer<Boolean> setActiveUpdate;
private final BooleanSupplier getActiveUpdate;
private final Function<K, V> loadFunction;
private final Supplier<Map<K, V>> bulkLoadFunction;
private final BooleanSupplier enableCache;
// Determines whether the presence of a specific value should trigger the invalidation of
// the supplied key. Used by CredentialsCache & CacheRefresher to identify when the
// credentials for a role couldn't be loaded without throwing an exception or serving stale
// values until the natural expiry time.
private final BiPredicate<K, V> invalidateCondition;
* @param name Used for MBean
* @param setValidityDelegate Used to set cache validity period. See {@link Policy#expireAfterWrite()}
* @param getValidityDelegate Getter for validity period
* @param setUpdateIntervalDelegate Used to set cache update interval. See {@link Policy#refreshAfterWrite()}
* @param getUpdateIntervalDelegate Getter for update interval
* @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
* @param getMaxEntriesDelegate Getter for max entries.
* @param setActiveUpdate Method to process config to actively update the auth cache prior to configured cache expiration
* @param getActiveUpdate Getter for active update
* @param loadFunction Function to load the cache. Called on {@link #get(Object)}
* @param cacheEnabledDelegate Used to determine if cache is enabled.
protected AuthCache(String name,
IntConsumer setValidityDelegate,
IntSupplier getValidityDelegate,
IntConsumer setUpdateIntervalDelegate,
IntSupplier getUpdateIntervalDelegate,
IntConsumer setMaxEntriesDelegate,
IntSupplier getMaxEntriesDelegate,
Consumer<Boolean> setActiveUpdate,
BooleanSupplier getActiveUpdate,
Function<K, V> loadFunction,
Supplier<Map<K, V>> bulkLoadFunction,
BooleanSupplier cacheEnabledDelegate)
(k, v) -> false);
* @param name Used for MBean
* @param setValidityDelegate Used to set cache validity period. See {@link Policy#expireAfterWrite()}
* @param getValidityDelegate Getter for validity period
* @param setUpdateIntervalDelegate Used to set cache update interval. See {@link Policy#refreshAfterWrite()}
* @param getUpdateIntervalDelegate Getter for update interval
* @param setMaxEntriesDelegate Used to set max # entries in cache. See {@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
* @param getMaxEntriesDelegate Getter for max entries.
* @param setActiveUpdate Actively update the cache before expiry
* @param getActiveUpdate Getter for active update
* @param loadFunction Function to load the cache. Called on {@link #get(Object)}
* @param cacheEnabledDelegate Used to determine if cache is enabled.
* @param invalidationCondition Used during active updates to determine if a refreshed value indicates a missing
* entry in the underlying table. If satisfied, the key will be invalidated.
protected AuthCache(String name,
IntConsumer setValidityDelegate,
IntSupplier getValidityDelegate,
IntConsumer setUpdateIntervalDelegate,
IntSupplier getUpdateIntervalDelegate,
IntConsumer setMaxEntriesDelegate,
IntSupplier getMaxEntriesDelegate,
Consumer<Boolean> setActiveUpdate,
BooleanSupplier getActiveUpdate,
Function<K, V> loadFunction,
Supplier<Map<K, V>> bulkLoadFunction,
BooleanSupplier cacheEnabledDelegate,
BiPredicate<K, V> invalidationCondition)
{ = checkNotNull(name);
this.setValidityDelegate = checkNotNull(setValidityDelegate);
this.getValidityDelegate = checkNotNull(getValidityDelegate);
this.setUpdateIntervalDelegate = checkNotNull(setUpdateIntervalDelegate);
this.getUpdateIntervalDelegate = checkNotNull(getUpdateIntervalDelegate);
this.setMaxEntriesDelegate = checkNotNull(setMaxEntriesDelegate);
this.getMaxEntriesDelegate = checkNotNull(getMaxEntriesDelegate);
this.setActiveUpdate = checkNotNull(setActiveUpdate);
this.getActiveUpdate = checkNotNull(getActiveUpdate);
this.loadFunction = checkNotNull(loadFunction);
this.bulkLoadFunction = checkNotNull(bulkLoadFunction);
this.enableCache = checkNotNull(cacheEnabledDelegate);
this.invalidateCondition = checkNotNull(invalidationCondition);
* Do setup for the cache and MBean.
protected void init()
this.cacheRefreshExecutor = executorFactory().sequential(name + "Refresh");
cache = initCache(null);
MBeanWrapper.instance.registerMBean(this, getObjectName());
protected void unregisterMBean()
MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
protected String getObjectName()
return MBEAN_NAME_BASE + name;
* Retrieve all cached entries. Will call {@link LoadingCache#asMap()} which does not trigger "load".
* @return a map of cached key-value pairs
public Map<K, V> getAll()
if (cache == null)
return Collections.emptyMap();
return Collections.unmodifiableMap(cache.asMap());
* Retrieve a value from the cache. Will call {@link LoadingCache#get(Object)} which will
* "load" the value if it's not present, thus populating the key.
* @param k
* @return The current value of {@code K} if cached or loaded.
* See {@link LoadingCache#get(Object)} for possible exceptions.
public V get(K k)
if (cache == null)
return loadFunction.apply(k);
return cache.get(k);
* Invalidate the entire cache.
public synchronized void invalidate()
cache = initCache(null);
* Invalidate a key.
* @param k key to invalidate
public void invalidate(K k)
if (cache != null)
* Time in milliseconds that a value in the cache will expire after.
* @param validityPeriod in milliseconds
public synchronized void setValidity(int validityPeriod)
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
cache = initCache(cache);
public int getValidity()
return getValidityDelegate.getAsInt();
* Time in milliseconds after which an entry in the cache should be refreshed (it's load function called again)
* @param updateInterval in milliseconds
public synchronized void setUpdateInterval(int updateInterval)
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
cache = initCache(cache);
public int getUpdateInterval()
return getUpdateIntervalDelegate.getAsInt();
* Set maximum number of entries in the cache.
* @param maxEntries
public synchronized void setMaxEntries(int maxEntries)
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
cache = initCache(cache);
public int getMaxEntries()
return getMaxEntriesDelegate.getAsInt();
public boolean getActiveUpdate()
return getActiveUpdate.getAsBoolean();
public synchronized void setActiveUpdate(boolean update)
throw new UnsupportedOperationException("Remote configuration of auth caches is disabled");
cache = initCache(cache);
public long getEstimatedSize()
return cache == null ? 0L : cache.estimatedSize();
* (Re-)initialise the underlying cache. Will update validity, max entries, and update interval if
* any have changed. The underlying {@link LoadingCache} will be initiated based on the provided {@code loadFunction}.
* Note: If you need some unhandled cache setting to be set you should extend {@link AuthCache} and override this method.
* @param existing If not null will only update cache update validity, max entries, and update interval.
* @return New {@link LoadingCache} if existing was null, otherwise the existing {@code cache}
protected LoadingCache<K, V> initCache(LoadingCache<K, V> existing)
if (!enableCache.getAsBoolean())
return null;
if (getValidity() <= 0)
return null;
boolean activeUpdate = getActiveUpdate();"(Re)initializing {} (validity period/update interval/max entries/active update) ({}/{}/{}/{})",
name, getValidity(), getUpdateInterval(), getMaxEntries(), activeUpdate);
LoadingCache<K, V> updatedCache;
if (existing == null)
updatedCache = Caffeine.newBuilder().refreshAfterWrite(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS)
.expireAfterWrite(getValidity(), TimeUnit.MILLISECONDS)
updatedCache = cache;
// Always set as mandatory
cache.policy().refreshAfterWrite().ifPresent(policy ->
policy.setExpiresAfter(activeUpdate ? getValidity() : getUpdateInterval(), TimeUnit.MILLISECONDS));
cache.policy().expireAfterWrite().ifPresent(policy -> policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
cache.policy().eviction().ifPresent(policy -> policy.setMaximum(getMaxEntries()));
if (cacheRefresher != null)
cacheRefresher.cancel(false); // permit the two refreshers to race until the old one dies, should be harmless.
cacheRefresher = null;
if (activeUpdate)
cacheRefresher = ScheduledExecutors.optionalTasks.scheduleAtFixedRate(CacheRefresher.create(name,
return updatedCache;
public boolean isTerminated()
return cacheRefreshExecutor.isTerminated();
public void shutdown()
public Object shutdownNow()
return cacheRefreshExecutor.shutdownNow();
public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException
return cacheRefreshExecutor.awaitTermination(timeout, units);
public void warm()
if (cache == null)
{"{} cache not enabled, skipping pre-warming", name);
int retries = AUTH_CACHE_WARMING_MAX_RETRIES.getInt(10);
long retryInterval = AUTH_CACHE_WARMING_RETRY_INTERVAL_MS.getLong(1000);
while (retries-- > 0)
Map<K, V> entries = bulkLoadFunction.get();
catch (Exception e)
Uninterruptibles.sleepUninterruptibly(retryInterval, TimeUnit.MILLISECONDS);
* Implemented when we can provide an efficient way to bulk load all entries for a cache. This isn't a
* @FunctionalInterface due to the default impl, which is for IRoleManager, IAuthorizer, and INetworkAuthorizer.
* They all extend this interface so that implementations only need to provide an override if it's useful.
* IAuthenticator doesn't implement this interface because CredentialsCache is more tightly coupled to
* PasswordAuthenticator, which does expose a bulk loader.
public interface BulkLoader<K, V>
default Supplier<Map<K, V>> bulkLoader()
return Collections::emptyMap;