blob: ead01fbe241ea7cd1ca0965d240f8da96cb7b292 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.repair.CassandraKeyspaceRepairManager;
import org.apache.cassandra.db.view.ViewManager;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.metrics.KeyspaceMetrics;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaProvider;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
/**
* It represents a Keyspace.
*/
public class Keyspace
{
private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
public final KeyspaceMetrics metric;
// It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
static
{
if (DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized())
DatabaseDescriptor.createAllDirectories();
}
private volatile KeyspaceMetadata metadata;
//OpOrder is defined globally since we need to order writes across
//Keyspaces in the case of Views (batchlog of view mutations)
public static final OpOrder writeOrder = new OpOrder();
/* ColumnFamilyStore per column family */
private final ConcurrentMap<TableId, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
private volatile AbstractReplicationStrategy replicationStrategy;
public final ViewManager viewManager;
private final KeyspaceWriteHandler writeHandler;
private volatile ReplicationParams replicationParams;
private final KeyspaceRepairManager repairManager;
private final SchemaProvider schema;
private static volatile boolean initialized = false;
public static void setInitialized()
{
initialized = true;
}
public static Keyspace open(String keyspaceName)
{
assert initialized || SchemaConstants.isLocalSystemKeyspace(keyspaceName);
return open(keyspaceName, Schema.instance, true);
}
// to only be used by org.apache.cassandra.tools.Standalone* classes
public static Keyspace openWithoutSSTables(String keyspaceName)
{
return open(keyspaceName, Schema.instance, false);
}
@VisibleForTesting
static Keyspace open(String keyspaceName, SchemaProvider schema, boolean loadSSTables)
{
Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
if (keyspaceInstance == null)
{
// Instantiate the Keyspace while holding the Schema lock. This both ensures we only do it once per
// keyspace, and also ensures that Keyspace construction sees a consistent view of the schema.
synchronized (schema)
{
keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
if (keyspaceInstance == null)
{
// open and store the keyspace
keyspaceInstance = new Keyspace(keyspaceName, schema, loadSSTables);
schema.storeKeyspaceInstance(keyspaceInstance);
}
}
}
return keyspaceInstance;
}
public static Keyspace clear(String keyspaceName)
{
return clear(keyspaceName, Schema.instance);
}
public static Keyspace clear(String keyspaceName, Schema schema)
{
synchronized (schema)
{
Keyspace t = schema.removeKeyspaceInstance(keyspaceName);
if (t != null)
{
for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
t.unloadCf(cfs);
t.metric.release();
}
return t;
}
}
public static ColumnFamilyStore openAndGetStore(TableMetadataRef tableRef)
{
return open(tableRef.keyspace).getColumnFamilyStore(tableRef.id);
}
public static ColumnFamilyStore openAndGetStore(TableMetadata table)
{
return open(table.keyspace).getColumnFamilyStore(table.id);
}
/**
* Removes every SSTable in the directory from the appropriate Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
public static void removeUnreadableSSTables(File directory)
{
for (Keyspace keyspace : Keyspace.all())
{
for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores())
{
for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
cfs.maybeRemoveUnreadableSSTables(directory);
}
}
}
public void setMetadata(KeyspaceMetadata metadata)
{
this.metadata = metadata;
createReplicationStrategy(metadata);
}
public KeyspaceMetadata getMetadata()
{
return metadata;
}
public Collection<ColumnFamilyStore> getColumnFamilyStores()
{
return Collections.unmodifiableCollection(columnFamilyStores.values());
}
public ColumnFamilyStore getColumnFamilyStore(String cfName)
{
TableMetadata table = schema.getTableMetadata(getName(), cfName);
if (table == null)
throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", getName(), cfName));
return getColumnFamilyStore(table.id);
}
public ColumnFamilyStore getColumnFamilyStore(TableId id)
{
ColumnFamilyStore cfs = columnFamilyStores.get(id);
if (cfs == null)
throw new IllegalArgumentException("Unknown CF " + id);
return cfs;
}
public boolean hasColumnFamilyStore(TableId id)
{
return columnFamilyStores.containsKey(id);
}
/**
* Take a snapshot of the specific column family, or the entire set of column families
* if columnFamily is null with a given timestamp
*
* @param snapshotName the tag associated with the name of the snapshot. This value may not be null
* @param columnFamilyName the column family to snapshot or all on null
* @param skipFlush Skip blocking flush of memtable
* @param rateLimiter Rate limiter for hardlinks-per-second
* @throws IOException if the column family doesn't exist
*/
public void snapshot(String snapshotName, String columnFamilyName, boolean skipFlush, RateLimiter rateLimiter) throws IOException
{
assert snapshotName != null;
boolean tookSnapShot = false;
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
{
if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
{
tookSnapShot = true;
cfStore.snapshot(snapshotName, skipFlush, rateLimiter);
}
}
if ((columnFamilyName != null) && !tookSnapShot)
throw new IOException("Failed taking snapshot. Table " + columnFamilyName + " does not exist.");
}
/**
* Take a snapshot of the specific column family, or the entire set of column families
* if columnFamily is null with a given timestamp
*
* @param snapshotName the tag associated with the name of the snapshot. This value may not be null
* @param columnFamilyName the column family to snapshot or all on null
* @throws IOException if the column family doesn't exist
*/
public void snapshot(String snapshotName, String columnFamilyName) throws IOException
{
snapshot(snapshotName, columnFamilyName, false, null);
}
/**
* @param clientSuppliedName may be null.
* @return the name of the snapshot
*/
public static String getTimestampedSnapshotName(String clientSuppliedName)
{
String snapshotName = Long.toString(System.currentTimeMillis());
if (clientSuppliedName != null && !clientSuppliedName.equals(""))
{
snapshotName = snapshotName + "-" + clientSuppliedName;
}
return snapshotName;
}
public static String getTimestampedSnapshotNameWithPrefix(String clientSuppliedName, String prefix)
{
return prefix + "-" + getTimestampedSnapshotName(clientSuppliedName);
}
/**
* Check whether snapshots already exists for a given name.
*
* @param snapshotName the user supplied snapshot name
* @return true if the snapshot exists
*/
public boolean snapshotExists(String snapshotName)
{
assert snapshotName != null;
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
{
if (cfStore.snapshotExists(snapshotName))
return true;
}
return false;
}
/**
* Clear all the snapshots for a given keyspace.
*
* @param snapshotName the user supplied snapshot name. It empty or null,
* all the snapshots will be cleaned
*/
public static void clearSnapshot(String snapshotName, String keyspace)
{
RateLimiter clearSnapshotRateLimiter = DatabaseDescriptor.getSnapshotRateLimiter();
List<File> snapshotDirs = Directories.getKSChildDirectories(keyspace);
Directories.clearSnapshot(snapshotName, snapshotDirs, clearSnapshotRateLimiter);
}
/**
* @return A list of open SSTableReaders
*/
public List<SSTableReader> getAllSSTables(SSTableSet sstableSet)
{
List<SSTableReader> list = new ArrayList<>(columnFamilyStores.size());
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
Iterables.addAll(list, cfStore.getSSTables(sstableSet));
return list;
}
private Keyspace(String keyspaceName, SchemaProvider schema, boolean loadSSTables)
{
this.schema = schema;
metadata = schema.getKeyspaceMetadata(keyspaceName);
assert metadata != null : "Unknown keyspace " + keyspaceName;
if (metadata.isVirtual())
throw new IllegalStateException("Cannot initialize Keyspace with virtual metadata " + keyspaceName);
createReplicationStrategy(metadata);
this.metric = new KeyspaceMetrics(this);
this.viewManager = new ViewManager(this);
for (TableMetadata cfm : metadata.tablesAndViews())
{
logger.trace("Initializing {}.{}", getName(), cfm.name);
initCf(schema.getTableMetadataRef(cfm.id), loadSSTables);
}
this.viewManager.reload(false);
this.repairManager = new CassandraKeyspaceRepairManager(this);
this.writeHandler = new CassandraKeyspaceWriteHandler(this);
}
private Keyspace(KeyspaceMetadata metadata)
{
this.schema = Schema.instance;
this.metadata = metadata;
createReplicationStrategy(metadata);
this.metric = new KeyspaceMetrics(this);
this.viewManager = new ViewManager(this);
this.repairManager = new CassandraKeyspaceRepairManager(this);
this.writeHandler = new CassandraKeyspaceWriteHandler(this);
}
public KeyspaceRepairManager getRepairManager()
{
return repairManager;
}
public static Keyspace mockKS(KeyspaceMetadata metadata)
{
return new Keyspace(metadata);
}
private void createReplicationStrategy(KeyspaceMetadata ksm)
{
logger.info("Creating replication strategy " + ksm.name + " params " + ksm.params);
replicationStrategy = ksm.createReplicationStrategy();
if (!ksm.params.replication.equals(replicationParams))
{
logger.debug("New replication settings for keyspace {} - invalidating disk boundary caches", ksm.name);
columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries);
}
replicationParams = ksm.params.replication;
}
// best invoked on the compaction mananger.
public void dropCf(TableId tableId)
{
assert columnFamilyStores.containsKey(tableId);
ColumnFamilyStore cfs = columnFamilyStores.remove(tableId);
if (cfs == null)
return;
cfs.getCompactionStrategyManager().shutdown();
CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), (sstable) -> true, true);
// wait for any outstanding reads/writes that might affect the CFS
cfs.keyspace.writeOrder.awaitNewBarrier();
cfs.readOrdering.awaitNewBarrier();
unloadCf(cfs);
}
// disassociate a cfs from this keyspace instance.
private void unloadCf(ColumnFamilyStore cfs)
{
cfs.forceBlockingFlush();
cfs.invalidate();
}
/**
* Registers a custom cf instance with this keyspace.
* This is required for offline tools what use non-standard directories.
*/
public void initCfCustom(ColumnFamilyStore newCfs)
{
ColumnFamilyStore cfs = columnFamilyStores.get(newCfs.metadata.id);
if (cfs == null)
{
// CFS being created for the first time, either on server startup or new CF being added.
// We don't worry about races here; startup is safe, and adding multiple idential CFs
// simultaneously is a "don't do that" scenario.
ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(newCfs.metadata.id, newCfs);
// CFS mbean instantiation will error out before we hit this, but in case that changes...
if (oldCfs != null)
throw new IllegalStateException("added multiple mappings for cf id " + newCfs.metadata.id);
}
else
{
throw new IllegalStateException("CFS is already initialized: " + cfs.name);
}
}
public KeyspaceWriteHandler getWriteHandler()
{
return writeHandler;
}
/**
* adds a cf to internal structures, ends up creating disk files).
*/
public void initCf(TableMetadataRef metadata, boolean loadSSTables)
{
ColumnFamilyStore cfs = columnFamilyStores.get(metadata.id);
if (cfs == null)
{
// CFS being created for the first time, either on server startup or new CF being added.
// We don't worry about races here; startup is safe, and adding multiple idential CFs
// simultaneously is a "don't do that" scenario.
ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(metadata.id, ColumnFamilyStore.createColumnFamilyStore(this, metadata, loadSSTables));
// CFS mbean instantiation will error out before we hit this, but in case that changes...
if (oldCfs != null)
throw new IllegalStateException("added multiple mappings for cf id " + metadata.id);
}
else
{
// re-initializing an existing CF. This will happen if you cleared the schema
// on this node and it's getting repopulated from the rest of the cluster.
assert cfs.name.equals(metadata.name);
cfs.reload();
}
}
public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
return applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new CompletableFuture<>());
}
public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes, boolean isDroppable,
boolean isDeferrable)
{
return applyInternal(mutation, writeCommitLog, updateIndexes, isDroppable, isDeferrable, new CompletableFuture<>());
}
public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
apply(mutation, writeCommitLog, updateIndexes, true);
}
public void apply(final Mutation mutation,
final boolean writeCommitLog)
{
apply(mutation, writeCommitLog, true, true);
}
/**
* If apply is blocking, apply must not be deferred
* Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
* in a complete deadlock of the mutation stage. See CASSANDRA-12689.
*
* @param mutation the row to write. Must not be modified after calling apply, since commitlog append
* may happen concurrently, depending on the CL Executor type.
* @param makeDurable if true, don't return unless write has been made durable
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
* @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
*/
public void apply(final Mutation mutation,
final boolean makeDurable,
boolean updateIndexes,
boolean isDroppable)
{
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
}
/**
* This method appends a row to the global CommitLog, then updates memtables and indexes.
*
* @param mutation the row to write. Must not be modified after calling apply, since commitlog append
* may happen concurrently, depending on the CL Executor type.
* @param makeDurable if true, don't return unless write has been made durable
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
* @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
* @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred
*/
private CompletableFuture<?> applyInternal(final Mutation mutation,
final boolean makeDurable,
boolean updateIndexes,
boolean isDroppable,
boolean isDeferrable,
CompletableFuture<?> future)
{
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
Lock[] locks = null;
boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
if (requiresViewUpdate)
{
mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
// the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
Collection<TableId> tableIds = mutation.getTableIds();
Iterator<TableId> idIterator = tableIds.iterator();
locks = new Lock[tableIds.size()];
for (int i = 0; i < tableIds.size(); i++)
{
TableId tableId = idIterator.next();
int lockKey = Objects.hash(mutation.key().getKey(), tableId);
while (true)
{
Lock lock = null;
if (TEST_FAIL_MV_LOCKS_COUNT == 0)
lock = ViewManager.acquireLockFor(lockKey);
else
TEST_FAIL_MV_LOCKS_COUNT--;
if (lock == null)
{
//throw WTE only if request is droppable
if (isDroppable && (approxTime.isAfter(mutation.approxCreatedAtNanos + DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS))))
{
for (int j = 0; j < i; j++)
locks[j].unlock();
if (logger.isTraceEnabled())
logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(tableId).name);
Tracing.trace("Could not acquire MV lock");
if (future != null)
{
future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
return future;
}
else
throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
}
else if (isDeferrable)
{
for (int j = 0; j < i; j++)
locks[j].unlock();
// This view update can't happen right now. so rather than keep this thread busy
// we will re-apply ourself to the queue and try again later
final CompletableFuture<?> mark = future;
Stage.MUTATION.execute(() ->
applyInternal(mutation, makeDurable, true, isDroppable, true, mark)
);
return future;
}
else
{
// Retry lock on same thread, if mutation is not deferrable.
// Mutation is not deferrable, if applied from MutationStage and caller is waiting for future to finish
// If blocking caller defers future, this may lead to deadlock situation with all MutationStage workers
// being blocked by waiting for futures which will never be processed as all workers are blocked
try
{
// Wait a little bit before retrying to lock
Thread.sleep(10);
}
catch (InterruptedException e)
{
// Just continue
}
continue;
}
}
else
{
locks[i] = lock;
}
break;
}
}
long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
// Metrics are only collected for droppable write operations
// Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
if (isDroppable)
{
for(TableId tableId : tableIds)
columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, MILLISECONDS);
}
}
int nowInSec = FBUtilities.nowInSeconds();
try (WriteContext ctx = getWriteHandler().beginWrite(mutation, makeDurable))
{
for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id);
if (cfs == null)
{
logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
continue;
}
AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
if (requiresViewUpdate)
{
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, makeDurable, baseComplete);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s",
upd.metadata().toString()), t);
throw t;
}
}
UpdateTransaction indexTransaction = updateIndexes
? cfs.indexManager.newUpdateTransaction(upd, ctx, nowInSec)
: UpdateTransaction.NO_OP;
cfs.getWriteHandler().write(upd, ctx, indexTransaction);
if (requiresViewUpdate)
baseComplete.set(System.currentTimeMillis());
}
if (future != null) {
future.complete(null);
}
return future;
}
finally
{
if (locks != null)
{
for (Lock lock : locks)
if (lock != null)
lock.unlock();
}
}
}
public AbstractReplicationStrategy getReplicationStrategy()
{
return replicationStrategy;
}
public List<Future<?>> flush()
{
List<Future<?>> futures = new ArrayList<>(columnFamilyStores.size());
for (ColumnFamilyStore cfs : columnFamilyStores.values())
futures.add(cfs.forceFlush());
return futures;
}
public Iterable<ColumnFamilyStore> getValidColumnFamilies(boolean allowIndexes,
boolean autoAddIndexes,
String... cfNames) throws IOException
{
Set<ColumnFamilyStore> valid = new HashSet<>();
if (cfNames.length == 0)
{
// all stores are interesting
for (ColumnFamilyStore cfStore : getColumnFamilyStores())
{
valid.add(cfStore);
if (autoAddIndexes)
valid.addAll(getIndexColumnFamilyStores(cfStore));
}
return valid;
}
// include the specified stores and possibly the stores of any of their indexes
for (String cfName : cfNames)
{
if (SecondaryIndexManager.isIndexColumnFamily(cfName))
{
if (!allowIndexes)
{
logger.warn("Operation not allowed on secondary Index table ({})", cfName);
continue;
}
String baseName = SecondaryIndexManager.getParentCfsName(cfName);
String indexName = SecondaryIndexManager.getIndexName(cfName);
ColumnFamilyStore baseCfs = getColumnFamilyStore(baseName);
Index index = baseCfs.indexManager.getIndexByName(indexName);
if (index == null)
throw new IllegalArgumentException(String.format("Invalid index specified: %s/%s.",
baseCfs.metadata.name,
indexName));
if (index.getBackingTable().isPresent())
valid.add(index.getBackingTable().get());
}
else
{
ColumnFamilyStore cfStore = getColumnFamilyStore(cfName);
valid.add(cfStore);
if (autoAddIndexes)
valid.addAll(getIndexColumnFamilyStores(cfStore));
}
}
return valid;
}
private Set<ColumnFamilyStore> getIndexColumnFamilyStores(ColumnFamilyStore baseCfs)
{
Set<ColumnFamilyStore> stores = new HashSet<>();
for (ColumnFamilyStore indexCfs : baseCfs.indexManager.getAllIndexColumnFamilyStores())
{
logger.info("adding secondary index table {} to operation", indexCfs.metadata.name);
stores.add(indexCfs);
}
return stores;
}
public static Iterable<Keyspace> all()
{
return Iterables.transform(Schema.instance.getKeyspaces(), Keyspace::open);
}
/**
* @return a {@link Stream} of all existing/open {@link Keyspace} instances
*/
public static Stream<Keyspace> allExisting()
{
return Schema.instance.getKeyspaces().stream().map(Schema.instance::getKeyspaceInstance).filter(Objects::nonNull);
}
public static Iterable<Keyspace> nonSystem()
{
return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), Keyspace::open);
}
public static Iterable<Keyspace> nonLocalStrategy()
{
return Iterables.transform(Schema.instance.getNonLocalStrategyKeyspaces(), Keyspace::open);
}
public static Iterable<Keyspace> system()
{
return Iterables.transform(SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES, Keyspace::open);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "(name='" + getName() + "')";
}
public String getName()
{
return metadata.name;
}
}