blob: 53fcb5e2b11bf39aeb461a5c8364a3f2fdaba52c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.schema;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.range.RangeCommands;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.cql3.QueryProcessor.process;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
/**
* PartitionDenylist uses the system_distributed.partition_denylist table to maintain a list of denylisted partition keys
* for each keyspace/table.
*
* Keys can be entered manually into the partition_denylist table or via the JMX operation StorageProxyMBean.denylistKey
*
* The denylist is stored as one CQL partition per table, and the denylisted keys are column names in that partition. The denylisted
* keys for each table are cached in memory, and reloaded from the partition_denylist table every 10 minutes (default) or when the
* StorageProxyMBean.loadPartitionDenylist is called via JMX.
*
* Concurrency of the cache is provided by the concurrency semantics of the guava LoadingCache. All values (DenylistEntry) are
* immutable collections of keys/tokens which are replaced in whole when the cache refreshes from disk.
*
* The CL for the denylist is used on initial node load as well as on timer instigated cache refreshes. A JMX call by the
* operator to load the denylist cache will warn on CL unavailability but go through with the denylist load. This is to
* allow operators flexibility in the face of degraded cluster state and still grant them the ability to mutate the denylist
* cache and bring it up if there are things they need to block on startup.
*
* Notably, in the current design it's possible for a table *cache expiration instigated* reload to end up violating the
* contract on total denylisted keys allowed in the case where it initially loads with a value less than the DBD
* allowable max per table limit due to global constraint enforcement on initial load. Our load and reload function
* simply enforce the *per table* limit without consideration to what that entails at the global key level. While we
* could track the constrained state and count in DenylistEntry, for now the complexity doesn't seem to justify the
* protection against that edge case. The enforcement should take place on a user-instigated full reload as well as
* error messaging about count violations, so this only applies to situations in which someone adds a key and doesn't
* actively tell the cache to fully reload to take that key into consideration, which one could reasonably expect to be
* an antipattern.
*/
public class PartitionDenylist
{
private static final Logger logger = LoggerFactory.getLogger(PartitionDenylist.class);
private static final NoSpamLogger AVAILABILITY_LOGGER = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
private final ExecutorService executor = executorFactory().pooled("DenylistCache", 2);
/** We effectively don't use our initial empty cache to denylist until the {@link #load()} call which will replace it */
private volatile LoadingCache<TableId, DenylistEntry> denylist = buildEmptyCache();
/** Denylist entry is never mutated once constructed, only replaced with a new entry when the cache is refreshed */
private static class DenylistEntry
{
public final ImmutableSet<ByteBuffer> keys;
public final ImmutableSortedSet<Token> tokens;
public DenylistEntry()
{
keys = ImmutableSet.of();
tokens = ImmutableSortedSet.of();
}
public DenylistEntry(final ImmutableSet<ByteBuffer> keys, final ImmutableSortedSet<Token> tokens)
{
this.keys = keys;
this.tokens = tokens;
}
}
/** synchronized on this */
private int loadAttempts = 0;
/** synchronized on this */
private int loadSuccesses = 0;
public synchronized int getLoadAttempts()
{
return loadAttempts;
}
public synchronized int getLoadSuccesses()
{
return loadSuccesses;
}
/**
* Performs initial load of the partition denylist. Should be called at startup and only loads if the operation
* is expected to succeed. If it is not possible to load at call time, a timer is set to retry.
*/
public void initialLoad()
{
if (!DatabaseDescriptor.getPartitionDenylistEnabled())
return;
synchronized (this)
{
loadAttempts++;
}
// Check if there are sufficient nodes to attempt reading all the denylist partitions before issuing the query.
// The pre-check prevents definite range-slice unavailables being marked and triggering an alert. Nodes may still change
// state between the check and the query, but it should significantly reduce the alert volume.
String retryReason = "Insufficient nodes";
try
{
if (checkDenylistNodeAvailability())
{
load();
return;
}
}
catch (Throwable tr)
{
logger.error("Failed to load partition denylist", tr);
retryReason = "Exception";
}
// This path will also be taken on other failures other than UnavailableException,
// but seems like a good idea to retry anyway.
int retryInSeconds = DatabaseDescriptor.getDenylistInitialLoadRetrySeconds();
logger.info("{} while loading partition denylist cache. Scheduled retry in {} seconds.", retryReason, retryInSeconds);
ScheduledExecutors.optionalTasks.schedule(this::initialLoad, retryInSeconds, TimeUnit.SECONDS);
}
private boolean checkDenylistNodeAvailability()
{
boolean sufficientNodes = RangeCommands.sufficientLiveNodesForSelectStar(SystemDistributedKeyspace.PartitionDenylistTable,
DatabaseDescriptor.getDenylistConsistencyLevel());
if (!sufficientNodes)
{
AVAILABILITY_LOGGER.warn("Attempting to load denylist and not enough nodes are available for a {} refresh. Reload the denylist when unavailable nodes are recovered to ensure your denylist remains in sync.",
DatabaseDescriptor.getDenylistConsistencyLevel());
}
return sufficientNodes;
}
/** Helper method as we need to both build cache on initial init but also on reload of cache contents and params */
private LoadingCache<TableId, DenylistEntry> buildEmptyCache()
{
// We rely on details of .refreshAfterWrite to reload this async in the background when it's hit:
// https://github.com/ben-manes/caffeine/wiki/Refresh
return Caffeine.newBuilder()
.refreshAfterWrite(DatabaseDescriptor.getDenylistRefreshSeconds(), TimeUnit.SECONDS)
.executor(executor)
.build(new CacheLoader<TableId, DenylistEntry>()
{
@Override
public DenylistEntry load(final TableId tid)
{
// We load whether or not the CL required count are available as the alternative is an
// empty denylist. This allows operators to intervene in the event they need to deny or
// undeny a specific partition key around a node recovery.
checkDenylistNodeAvailability();
return getDenylistForTableFromCQL(tid);
}
// The synchronous reload method defaults to being wrapped with a supplyAsync in CacheLoader.asyncReload
@Override
public DenylistEntry reload(final TableId tid, final DenylistEntry oldValue)
{
// Only process when we can hit the user specified CL for the denylist consistency on a timer prompted reload
if (checkDenylistNodeAvailability())
{
final DenylistEntry newEntry = getDenylistForTableFromCQL(tid);
if (newEntry != null)
return newEntry;
}
if (oldValue != null)
return oldValue;
return new DenylistEntry();
}
});
}
/**
* We need to fully rebuild a new cache to accommodate deleting items from the denylist and potentially shrinking
* the max allowable size in the list. We do not serve queries out of this denylist until it is populated
* so as not to introduce a window of having a partially filled cache allow denylisted entries.
*/
public void load()
{
final long start = currentTimeMillis();
final Map<TableId, DenylistEntry> allDenylists = getDenylistForAllTablesFromCQL();
// On initial load we have the slight overhead of GC'ing our initial empty cache
LoadingCache<TableId, DenylistEntry> newDenylist = buildEmptyCache();
newDenylist.putAll(allDenylists);
synchronized (this)
{
loadSuccesses++;
}
denylist = newDenylist;
logger.info("Loaded partition denylist cache in {}ms", currentTimeMillis() - start);
}
/**
* We expect the caller to confirm that we are working with a valid keyspace and table. Further, we expect the usage
* pattern of this to be one-off key by key, not in a bulk process, so we reload the entire table's deny list entry
* on an addition or removal.
*/
public boolean addKeyToDenylist(final String keyspace, final String table, final ByteBuffer key)
{
if (!canDenylistKeyspace(keyspace))
return false;
final String insert = String.format("INSERT INTO system_distributed.partition_denylist (ks_name, table_name, key) VALUES ('%s', '%s', 0x%s)",
keyspace, table, ByteBufferUtil.bytesToHex(key));
try
{
process(insert, DatabaseDescriptor.getDenylistConsistencyLevel());
return refreshTableDenylist(keyspace, table);
}
catch (final RequestExecutionException e)
{
logger.error("Failed to denylist key [{}] in {}/{}", ByteBufferUtil.bytesToHex(key), keyspace, table, e);
}
return false;
}
/**
* We expect the caller to confirm that we are working with a valid keyspace and table.
*/
public boolean removeKeyFromDenylist(final String keyspace, final String table, final ByteBuffer key)
{
final String delete = String.format("DELETE FROM system_distributed.partition_denylist " +
"WHERE ks_name = '%s' " +
"AND table_name = '%s' " +
"AND key = 0x%s",
keyspace, table, ByteBufferUtil.bytesToHex(key));
try
{
process(delete, DatabaseDescriptor.getDenylistConsistencyLevel());
return refreshTableDenylist(keyspace, table);
}
catch (final RequestExecutionException e)
{
logger.error("Failed to remove key from denylist: [{}] in {}/{}", ByteBufferUtil.bytesToHex(key), keyspace, table, e);
}
return false;
}
/**
* We disallow denylisting partitions in certain critical keyspaces to prevent users from making their clusters
* inoperable.
*/
private boolean canDenylistKeyspace(final String keyspace)
{
return !SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(keyspace) &&
!SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace) &&
!SchemaConstants.TRACE_KEYSPACE_NAME.equals(keyspace) &&
!SchemaConstants.VIRTUAL_SCHEMA.equals(keyspace) &&
!SchemaConstants.VIRTUAL_VIEWS.equals(keyspace) &&
!SchemaConstants.AUTH_KEYSPACE_NAME.equals(keyspace);
}
public boolean isKeyPermitted(final String keyspace, final String table, final ByteBuffer key)
{
return isKeyPermitted(getTableId(keyspace, table), key);
}
public boolean isKeyPermitted(final TableId tid, final ByteBuffer key)
{
final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
// We have a few quick state checks to get out of the way first; this is hot path so we want to do these first if possible.
if (!DatabaseDescriptor.getPartitionDenylistEnabled() || tid == null || tmd == null || !canDenylistKeyspace(tmd.keyspace))
return true;
try
{
// If we don't have an entry for this table id, nothing in it is denylisted.
DenylistEntry entry = denylist.get(tid);
if (entry == null)
return true;
return !entry.keys.contains(key);
}
catch (final Exception e)
{
// In the event of an error accessing or populating the cache, assume it's not denylisted
logAccessFailure(tid, e);
return true;
}
}
private void logAccessFailure(final TableId tid, Throwable e)
{
final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
if (tmd == null)
logger.debug("Failed to access partition denylist cache for unknown table id {}", tid.toString(), e);
else
logger.debug("Failed to access partition denylist cache for {}/{}", tmd.keyspace, tmd.name, e);
}
/**
* @return number of denylisted keys in range
*/
public int getDeniedKeysInRangeCount(final String keyspace, final String table, final AbstractBounds<PartitionPosition> range)
{
return getDeniedKeysInRangeCount(getTableId(keyspace, table), range);
}
/**
* @return number of denylisted keys in range
*/
public int getDeniedKeysInRangeCount(final TableId tid, final AbstractBounds<PartitionPosition> range)
{
final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
if (!DatabaseDescriptor.getPartitionDenylistEnabled() || tid == null || tmd == null || !canDenylistKeyspace(tmd.keyspace))
return 0;
try
{
final DenylistEntry denylistEntry = denylist.get(tid);
if (denylistEntry == null || denylistEntry.tokens.size() == 0)
return 0;
final Token startToken = range.left.getToken();
final Token endToken = range.right.getToken();
// Normal case
if (startToken.compareTo(endToken) <= 0 || endToken.isMinimum())
{
NavigableSet<Token> subSet = denylistEntry.tokens.tailSet(startToken, PartitionPosition.Kind.MIN_BOUND == range.left.kind());
if (!endToken.isMinimum())
subSet = subSet.headSet(endToken, PartitionPosition.Kind.MAX_BOUND == range.right.kind());
return subSet.size();
}
// Wrap around case
return denylistEntry.tokens.tailSet(startToken, PartitionPosition.Kind.MIN_BOUND == range.left.kind()).size()
+ denylistEntry.tokens.headSet(endToken, PartitionPosition.Kind.MAX_BOUND == range.right.kind()).size();
}
catch (final Exception e)
{
logAccessFailure(tid, e);
return 0;
}
}
/**
* Get up to the configured allowable limit per table of denylisted keys
*/
private DenylistEntry getDenylistForTableFromCQL(final TableId tid)
{
return getDenylistForTableFromCQL(tid, DatabaseDescriptor.getDenylistMaxKeysPerTable());
}
/**
* Attempts to reload the DenylistEntry data from CQL for the given TableId and key count.
* @return empty denylist if we do not or cannot find the data, preserving the old value, otherwise the new value
*/
private DenylistEntry getDenylistForTableFromCQL(final TableId tid, int limit)
{
final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
if (tmd == null)
return null;
// We attempt to query just over our allowable max keys in order to check whether we have configured data beyond that limit and alert the user if so
final String readDenylist = String.format("SELECT * FROM %s.%s WHERE ks_name='%s' AND table_name='%s' LIMIT %d",
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE,
tmd.keyspace,
tmd.name,
limit + 1);
try
{
final UntypedResultSet results = process(readDenylist, DatabaseDescriptor.getDenylistConsistencyLevel());
// If there's no data in CQL we want to return an empty DenylistEntry so we don't continue using the old value in the cache
if (results == null || results.isEmpty())
return new DenylistEntry();
if (results.size() > limit)
{
// If our limit is < the standard per table we know we're at a global violation because we've constrained that request limit already.
boolean globalLimit = limit != DatabaseDescriptor.getDenylistMaxKeysPerTable();
String violationType = globalLimit ? "global" : "per-table";
int errorLimit = globalLimit ? DatabaseDescriptor.getDenylistMaxKeysTotal() : limit;
logger.error("Partition denylist for {}/{} has exceeded the {} allowance of ({}). Remaining keys were ignored; " +
"please reduce the total number of keys denied or increase the denylist_max_keys_per_table param in " +
"cassandra.yaml to avoid inconsistency in denied partitions across nodes.",
tmd.keyspace,
tmd.name,
violationType,
errorLimit);
}
final Set<ByteBuffer> keys = new HashSet<>();
final NavigableSet<Token> tokens = new TreeSet<>();
int processed = 0;
for (final UntypedResultSet.Row row : results)
{
final ByteBuffer key = row.getBlob("key");
keys.add(key);
tokens.add(StorageService.instance.getTokenMetadata().partitioner.getToken(key));
processed++;
if (processed >= limit)
break;
}
return new DenylistEntry(ImmutableSet.copyOf(keys), ImmutableSortedSet.copyOf(tokens));
}
catch (final RequestExecutionException e)
{
logger.error("Error reading partition_denylist table for {}/{}. Returning empty denylist.", tmd.keyspace, tmd.name, e);
return new DenylistEntry();
}
}
/**
* This method relies on {@link #getDenylistForTableFromCQL(TableId, int)} to pull a limited amount of keys
* on a per-table basis from CQL to load into the cache. We need to navigate both respecting the max cache size limit
* as well as respecting the per-table limit.
* @return non-null mapping of TableId to DenylistEntry
*/
private Map<TableId, DenylistEntry> getDenylistForAllTablesFromCQL()
{
// While we warn the user in this case, we continue with the reload anyway.
checkDenylistNodeAvailability();
final String allDeniedTables = String.format("SELECT DISTINCT ks_name, table_name FROM %s.%s",
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE);
try
{
final UntypedResultSet deniedTableResults = process(allDeniedTables, DatabaseDescriptor.getDenylistConsistencyLevel());
if (deniedTableResults == null || deniedTableResults.isEmpty())
return Collections.emptyMap();
int totalProcessed = 0 ;
final Map<TableId, DenylistEntry> results = new HashMap<>();
for (final UntypedResultSet.Row row : deniedTableResults)
{
final String ks = row.getString("ks_name");
final String table = row.getString("table_name");
final TableId tid = getTableId(ks, table);
if (DatabaseDescriptor.getDenylistMaxKeysTotal() - totalProcessed <= 0)
{
logger.error("Hit limit on allowable denylisted keys in total. Processed {} total entries. Not adding all entries to denylist for {}/{}." +
" Remove denylist entries in system_distributed.{} or increase your denylist_max_keys_total param in cassandra.yaml.",
totalProcessed,
ks,
table,
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE);
results.put(tid, new DenylistEntry());
}
else
{
// Determine whether we can get up to table max or we need a subset at edge condition of max overflow.
int allowedTableRecords = Math.min(DatabaseDescriptor.getDenylistMaxKeysPerTable(), DatabaseDescriptor.getDenylistMaxKeysTotal() - totalProcessed);
DenylistEntry tableDenylist = getDenylistForTableFromCQL(tid, allowedTableRecords);
if (tableDenylist != null)
totalProcessed += tableDenylist.keys.size();
results.put(tid, tableDenylist);
}
}
return results;
}
catch (final RequestExecutionException e)
{
logger.error("Error reading full partition denylist from "
+ SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE +
". Partition Denylisting will be compromised. Exception: " + e);
return Collections.emptyMap();
}
}
private boolean refreshTableDenylist(String keyspace, String table)
{
checkDenylistNodeAvailability();
final TableId tid = getTableId(keyspace, table);
if (tid == null)
{
logger.warn("Got denylist mutation for unknown ks/cf: {}/{}. Skipping refresh.", keyspace, table);
return false;
}
DenylistEntry newEntry = getDenylistForTableFromCQL(tid);
denylist.put(tid, newEntry);
return true;
}
private TableId getTableId(final String keyspace, final String table)
{
TableMetadata tmd = Schema.instance.getTableMetadata(keyspace, table);
return tmd == null ? null : tmd.id;
}
}