blob: a837b0773dda2262884054624adba15ee7a23983 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.schema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Preconditions;
import org.apache.cassandra.auth.AuthKeyspace;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MetadataValue;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.db.TypeSizes.sizeof;
/**
* Immutable snapshot of the current schema along with its version.
*/
public class DistributedSchema implements MetadataValue<DistributedSchema>
{
public static final Serializer serializer = new Serializer();
public static final DistributedSchema empty()
{
return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY);
}
public static DistributedSchema first(Set<String> knownDatacenters)
{
if (knownDatacenters.isEmpty())
{
if (DatabaseDescriptor.getLocalDataCenter() != null)
knownDatacenters = Collections.singleton(DatabaseDescriptor.getLocalDataCenter());
else
knownDatacenters = Collections.singleton("DC1");
}
return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)), Epoch.FIRST);
}
private final Keyspaces keyspaces;
private final Epoch epoch;
private final UUID version;
private final Map<String, Keyspace> keyspaceInstances = new HashMap<>();
public DistributedSchema(Keyspaces keyspaces)
{
this(keyspaces, Epoch.EMPTY);
}
public DistributedSchema(Keyspaces keyspaces, Epoch epoch)
{
Objects.requireNonNull(keyspaces);
this.keyspaces = keyspaces;
this.epoch = epoch;
this.version = new UUID(0, epoch.getEpoch());
validate();
}
@Override
public DistributedSchema withLastModified(Epoch epoch)
{
return new DistributedSchema(keyspaces, epoch);
}
@Override
public Epoch lastModified()
{
return epoch;
}
public Keyspace getKeyspace(String keyspace)
{
return keyspaceInstances.get(keyspace);
}
public KeyspaceMetadata getKeyspaceMetadata(String keyspace)
{
return keyspaces.get(keyspace).get();
}
public static DistributedSchema fromSystemTables(Keyspaces keyspaces, Set<String> knownDatacenters)
{
if (!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME))
keyspaces = keyspaces.withAddedOrReplaced(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters),
TraceKeyspace.metadata(),
SystemDistributedKeyspace.metadata(),
AuthKeyspace.metadata()));
return new DistributedSchema(keyspaces, Epoch.UPGRADE_GOSSIP);
}
public void initializeKeyspaceInstances(DistributedSchema prev)
{
initializeKeyspaceInstances(prev, true);
}
public void initializeKeyspaceInstances(DistributedSchema prev, boolean loadSSTables)
{
keyspaceInstances.putAll(prev.keyspaceInstances);
// If there are keyspaces in schema, but none of them are initialised, we're in first boot. Initialise all.
if (!prev.isEmpty() && prev.keyspaceInstances.isEmpty())
prev = DistributedSchema.empty();
Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), getKeyspaces());
SchemaChangeNotifier schemaChangeNotifier = Schema.instance.schemaChangeNotifier();
schemaChangeNotifier.notifyPreChanges(new SchemaTransformation.SchemaTransformationResult(prev, this, ksDiff));
ksDiff.dropped.forEach(metadata -> {
schemaChangeNotifier.notifyKeyspaceDropped(metadata, loadSSTables);
dropKeyspace(metadata, true);
});
ksDiff.created.forEach(metadata -> {
schemaChangeNotifier.notifyKeyspaceCreated(metadata);
keyspaceInstances.put(metadata.name, new Keyspace(Schema.instance, metadata, loadSSTables));
});
ksDiff.altered.forEach(delta -> {
boolean initialized = Keyspace.isInitialized();
Keyspace keyspace = initialized ? keyspaceInstances.get(delta.before.name) : null;
if (initialized)
{
assert keyspace != null : String.format("Keyspace %s is not initialized. Initialized keyspaces: %s.", delta.before.name, keyspaceInstances.keySet());
assert delta.before.name.equals(delta.after.name);
// drop tables and views
delta.views.dropped.forEach(v -> dropView(keyspace, v, loadSSTables));
delta.tables.dropped.forEach(t -> dropTable(keyspace, t, loadSSTables));
// add tables and views
delta.tables.created.forEach(t -> createTable(keyspace, t, loadSSTables));
delta.views.created.forEach(v -> createView(keyspace, v));
// update tables and views
delta.tables.altered.forEach(diff -> alterTable(keyspace, diff.after));
delta.views.altered.forEach(diff -> alterView(keyspace, diff.after));
schemaChangeNotifier.notifyKeyspaceAltered(delta, loadSSTables);
// deal with all added, and altered views
keyspace.viewManager.reload(keyspaces.get(keyspace.getName()).get());
}
SchemaDiagnostics.keyspaceAltered(Schema.instance, delta);
});
// Avoid system table side effects during initialization
if (epoch.isEqualOrAfter(Epoch.FIRST))
{
Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(ksDiff, FBUtilities.timestampMicros());
SchemaKeyspace.applyChanges(mutations);
}
}
public static void maybeRebuildViews(DistributedSchema prev, DistributedSchema current)
{
Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), current.getKeyspaces());
if (ksDiff.isEmpty() || ksDiff.altered.isEmpty())
return;
ksDiff.altered.forEach(delta -> {
if (delta.views.isEmpty())
return;
boolean initialized = Keyspace.isInitialized();
Keyspace keyspace = initialized ? current.keyspaceInstances.get(delta.after.name) : null;
if (keyspace != null)
keyspace.viewManager.buildViews();
});
}
private void dropView(Keyspace keyspace, ViewMetadata metadata, boolean dropData)
{
keyspace.viewManager.dropView(metadata.name());
dropTable(keyspace, metadata.metadata, dropData);
}
// TODO: handle drops after snapshots
private void dropKeyspace(KeyspaceMetadata keyspaceMetadata, boolean dropData)
{
SchemaDiagnostics.keyspaceDropping(Schema.instance, keyspaceMetadata);
boolean initialized = Keyspace.isInitialized();
Keyspace keyspace = initialized ? Keyspace.open(keyspaceMetadata.name) : null;
if (initialized)
{
if (keyspace == null)
return;
keyspaceMetadata.views.forEach(v -> dropView(keyspace, v, dropData));
keyspaceMetadata.tables.forEach(t -> dropTable(keyspace, t, dropData));
// remove the keyspace from the static instances
Keyspace unloadedKeyspace = keyspaceInstances.remove(keyspaceMetadata.name);
unloadedKeyspace.unload(true);
SchemaDiagnostics.metadataRemoved(Schema.instance, keyspaceMetadata);
assert unloadedKeyspace == keyspace;
Keyspace.writeOrder.awaitNewBarrier();
}
else
{
keyspace.unload(true);
SchemaDiagnostics.metadataRemoved(Schema.instance, keyspaceMetadata);
}
SchemaDiagnostics.keyspaceDropped(Schema.instance, keyspaceMetadata);
}
/**
*
* @param keyspace
* @param metadata
*/
private void dropTable(Keyspace keyspace, TableMetadata metadata, boolean dropData)
{
SchemaDiagnostics.tableDropping(Schema.instance, metadata);
keyspace.dropCf(metadata.id, dropData);
SchemaDiagnostics.tableDropped(Schema.instance, metadata);
}
private void createTable(Keyspace keyspace, TableMetadata table, boolean loadSSTables)
{
SchemaDiagnostics.tableCreating(Schema.instance, table);
keyspace.initCf(table, loadSSTables);
SchemaDiagnostics.tableCreated(Schema.instance, table);
}
private void createView(Keyspace keyspace, ViewMetadata view)
{
SchemaDiagnostics.tableCreating(Schema.instance, view.metadata);
keyspace.initCf(view.metadata, true);
SchemaDiagnostics.tableCreated(Schema.instance, view.metadata);
}
private void alterTable(Keyspace keyspace, TableMetadata updated)
{
SchemaDiagnostics.tableAltering(Schema.instance, updated);
keyspace.getColumnFamilyStore(updated.name).reload(updated);
SchemaDiagnostics.tableAltered(Schema.instance, updated);
}
private void alterView(Keyspace keyspace, ViewMetadata updated)
{
SchemaDiagnostics.tableAltering(Schema.instance, updated.metadata);
keyspace.getColumnFamilyStore(updated.name()).reload(updated.metadata);
SchemaDiagnostics.tableAltered(Schema.instance, updated.metadata);
}
public Keyspaces getKeyspaces()
{
return keyspaces;
}
public boolean isEmpty()
{
return epoch.is(Epoch.EMPTY);
}
public UUID getVersion()
{
return version;
}
/**
* Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
* or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion} schema.
*/
public static String schemaVersionToString(UUID version)
{
return version == null
? "unknown"
: SchemaConstants.emptyVersion.equals(version)
? "(empty)"
: version.toString();
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DistributedSchema schema = (DistributedSchema) o;
return keyspaces.equals(schema.keyspaces) && version.equals(schema.version);
}
@Override
public int hashCode()
{
return Objects.hash(keyspaces, version);
}
private void validate()
{
keyspaces.forEach(ksm -> {
ksm.tables.forEach(tm -> Preconditions.checkArgument(tm.keyspace.equals(ksm.name), "Table %s metadata points to keyspace %s while defined in keyspace %s", tm.name, tm.keyspace, ksm.name));
ksm.views.forEach(vm -> Preconditions.checkArgument(vm.keyspace().equals(ksm.name), "View %s metadata points to keyspace %s while defined in keyspace %s", vm.name(), vm.keyspace(), ksm.name));
ksm.types.forEach(ut -> Preconditions.checkArgument(ut.keyspace.equals(ksm.name), "Type %s points to keyspace %s while defined in keyspace %s", ut.name, ut.keyspace, ksm.name));
ksm.userFunctions.forEach(f -> Preconditions.checkArgument(f.name().keyspace.equals(ksm.name), "Function %s points to keyspace %s while defined in keyspace %s", f.name().name, f.name().keyspace, ksm.name));
});
}
public static class Serializer implements MetadataSerializer<DistributedSchema>
{
public void serialize(DistributedSchema t, DataOutputPlus out, Version version) throws IOException
{
Epoch.serializer.serialize(t.epoch, out, version);
out.writeInt(t.keyspaces.size());
for (KeyspaceMetadata ksm : t.keyspaces)
KeyspaceMetadata.serializer.serialize(ksm, out, version);
}
public DistributedSchema deserialize(DataInputPlus in, Version version) throws IOException
{
Epoch basedOnEpoch = Epoch.serializer.deserialize(in, version);
int ksCount = in.readInt();
List<KeyspaceMetadata> ksms = new ArrayList<>(ksCount);
for (int i = 0; i < ksCount; i++)
ksms.add(KeyspaceMetadata.serializer.deserialize(in, version));
return new DistributedSchema(Keyspaces.of(ksms.toArray(new KeyspaceMetadata[ksCount])), basedOnEpoch);
}
public long serializedSize(DistributedSchema t, Version version)
{
long size = Epoch.serializer.serializedSize(t.epoch, version);
size += sizeof(t.keyspaces.size());
for (KeyspaceMetadata ksm : t.keyspaces)
size += KeyspaceMetadata.serializer.serializedSize(ksm, version);
return size;
}
}
@Override
public String toString()
{
return "DistributedSchema{" +
"keyspaces=" + keyspaces +
", epoch=" + epoch +
", version=" + version +
", keyspaceInstances=" + keyspaceInstances +
'}';
}
}