blob: b8a27af3bab545d6616f450c55b60b614a569193 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.statements.schema;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.ReplicationFactor;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_ALTER_RF_DURING_RANGE_MOVEMENT;
import static org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_UNSAFE_TRANSIENT_CHANGES;
public final class AlterKeyspaceStatement extends AlterSchemaStatement
{
private static final boolean allow_alter_rf_during_range_movement = ALLOW_ALTER_RF_DURING_RANGE_MOVEMENT.getBoolean();
private static final boolean allow_unsafe_transient_changes = ALLOW_UNSAFE_TRANSIENT_CHANGES.getBoolean();
private final HashSet<String> clientWarnings = new HashSet<>();
private final KeyspaceAttributes attrs;
private final boolean ifExists;
public AlterKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs, boolean ifExists)
{
super(keyspaceName);
this.attrs = attrs;
this.ifExists = ifExists;
}
public Keyspaces apply(Keyspaces schema)
{
attrs.validate();
KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
if (null == keyspace)
{
if (!ifExists)
throw ire("Keyspace '%s' doesn't exist", keyspaceName);
return schema;
}
KeyspaceMetadata newKeyspace = keyspace.withSwapped(attrs.asAlteredKeyspaceParams(keyspace.params));
if (attrs.getReplicationStrategyClass() != null && attrs.getReplicationStrategyClass().equals(SimpleStrategy.class.getSimpleName()))
Guardrails.simpleStrategyEnabled.ensureEnabled(state);
if (newKeyspace.params.replication.klass.equals(LocalStrategy.class))
throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
newKeyspace.params.validate(keyspaceName, state);
validateNoRangeMovements();
validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy());
Keyspaces res = schema.withAddedOrUpdated(newKeyspace);
return res;
}
SchemaChange schemaChangeEvent(KeyspacesDiff diff)
{
return new SchemaChange(Change.UPDATED, keyspaceName);
}
public void authorize(ClientState client)
{
client.ensureKeyspacePermission(keyspaceName, Permission.ALTER);
}
@Override
Set<String> clientWarnings(KeyspacesDiff diff)
{
if (diff.isEmpty())
return clientWarnings;
KeyspaceDiff keyspaceDiff = diff.altered.get(0);
AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy();
AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy();
if (before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas)
clientWarnings.add("When increasing replication factor you need to run a full (-full) repair to distribute the data.");
return clientWarnings;
}
private void validateNoRangeMovements()
{
if (allow_alter_rf_during_range_movement)
return;
Stream<InetAddressAndPort> unreachableNotAdministrativelyInactive =
Gossiper.instance.getUnreachableMembers().stream().filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
!Gossiper.instance.isAdministrativelyInactiveState(endpoint));
Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(),
unreachableNotAdministrativelyInactive);
List<InetAddressAndPort> notNormalEndpoints = endpoints.filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState())
.collect(Collectors.toList());
if (!notNormalEndpoints.isEmpty())
{
throw new ConfigurationException("Cannot alter RF while some endpoints are not in normal state (no range movements): " + notNormalEndpoints);
}
}
private void validateTransientReplication(AbstractReplicationStrategy oldStrategy, AbstractReplicationStrategy newStrategy)
{
//If there is no read traffic there are some extra alterations you can safely make, but this is so atypical
//that a good default is to not allow unsafe changes
if (allow_unsafe_transient_changes)
return;
ReplicationFactor oldRF = oldStrategy.getReplicationFactor();
ReplicationFactor newRF = newStrategy.getReplicationFactor();
int oldTrans = oldRF.transientReplicas();
int oldFull = oldRF.fullReplicas;
int newTrans = newRF.transientReplicas();
int newFull = newRF.fullReplicas;
if (newTrans > 0)
{
if (DatabaseDescriptor.getNumTokens() > 1)
throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
Keyspace ks = Keyspace.open(keyspaceName);
for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
{
if (cfs.viewManager.hasViews())
{
throw new ConfigurationException("Cannot use transient replication on keyspaces using materialized views");
}
if (cfs.indexManager.hasIndexes())
{
throw new ConfigurationException("Cannot use transient replication on keyspaces using secondary indexes");
}
}
}
//This is true right now because the transition from transient -> full lacks the pending state
//necessary for correctness. What would happen if we allowed this is that we would attempt
//to read from a transient replica as if it were a full replica.
if (oldFull > newFull && oldTrans > 0)
throw new ConfigurationException("Can't add full replicas if there are any transient replicas. You must first remove all transient replicas, then change the # of full replicas, then add back the transient replicas");
//Don't increase transient replication factor by more than one at a time if changing number of replicas
//Just like with changing full replicas it's not safe to do this as you could read from too many replicas
//that don't have the necessary data. W/O transient replication this alteration was allowed and it's not clear
//if it should be.
//This is structured so you can convert as many full replicas to transient replicas as you want.
boolean numReplicasChanged = oldTrans + oldFull != newTrans + newFull;
if (numReplicasChanged && (newTrans > oldTrans && newTrans != oldTrans + 1))
throw new ConfigurationException("Can only safely increase number of transients one at a time with incremental repair run in between each time");
}
@Override
public AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.ALTER_KEYSPACE, keyspaceName);
}
public String toString()
{
return String.format("%s (%s)", getClass().getSimpleName(), keyspaceName);
}
public static final class Raw extends CQLStatement.Raw
{
private final String keyspaceName;
private final KeyspaceAttributes attrs;
private final boolean ifExists;
public Raw(String keyspaceName, KeyspaceAttributes attrs, boolean ifExists)
{
this.keyspaceName = keyspaceName;
this.attrs = attrs;
this.ifExists = ifExists;
}
public AlterKeyspaceStatement prepare(ClientState state)
{
return new AlterKeyspaceStatement(keyspaceName, attrs, ifExists);
}
}
}