blob: ed7a9a11907a8a7887a54d30644ef6d824cd8c85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.statements.schema;
import java.util.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.RawSelector;
import org.apache.cassandra.cql3.selection.Selectable;
import org.apache.cassandra.cql3.statements.StatementType;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
import org.apache.cassandra.transport.Event.SchemaChange.Target;
import static java.lang.String.join;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static org.apache.cassandra.config.CassandraRelevantProperties.MV_ALLOW_FILTERING_NONKEY_COLUMNS_UNSAFE;
public final class CreateViewStatement extends AlterSchemaStatement
{
private final String tableName;
private final String viewName;
private final List<RawSelector> rawColumns;
private final List<ColumnIdentifier> partitionKeyColumns;
private final List<ColumnIdentifier> clusteringColumns;
private final WhereClause whereClause;
private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder;
private final TableAttributes attrs;
private final boolean ifNotExists;
private ClientState state;
public CreateViewStatement(String keyspaceName,
String tableName,
String viewName,
List<RawSelector> rawColumns,
List<ColumnIdentifier> partitionKeyColumns,
List<ColumnIdentifier> clusteringColumns,
WhereClause whereClause,
LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder,
TableAttributes attrs,
boolean ifNotExists)
{
super(keyspaceName);
this.tableName = tableName;
this.viewName = viewName;
this.rawColumns = rawColumns;
this.partitionKeyColumns = partitionKeyColumns;
this.clusteringColumns = clusteringColumns;
this.whereClause = whereClause;
this.clusteringOrder = clusteringOrder;
this.attrs = attrs;
this.ifNotExists = ifNotExists;
}
@Override
public void validate(ClientState state)
{
super.validate(state);
// save the query state to use it for guardrails validation in #apply
this.state = state;
}
public Keyspaces apply(Keyspaces schema)
{
if (!DatabaseDescriptor.getMaterializedViewsEnabled())
throw ire("Materialized views are disabled. Enable in cassandra.yaml to use.");
/*
* Basic dependency validations
*/
KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
if (null == keyspace)
throw ire("Keyspace '%s' doesn't exist", keyspaceName);
if (keyspace.createReplicationStrategy().hasTransientReplicas())
throw new InvalidRequestException("Materialized views are not supported on transiently replicated keyspaces");
TableMetadata table = keyspace.tables.getNullable(tableName);
if (null == table)
throw ire("Base table '%s' doesn't exist", tableName);
if (keyspace.hasTable(viewName))
throw ire("Cannot create materialized view '%s' - a table with the same name already exists", viewName);
if (keyspace.hasView(viewName))
{
if (ifNotExists)
return schema;
throw new AlreadyExistsException(keyspaceName, viewName);
}
/*
* Base table validation
*/
if (table.isCounter())
throw ire("Materialized views are not supported on counter tables");
if (table.isView())
throw ire("Materialized views cannot be created against other materialized views");
// Guardrails on table properties
Guardrails.tableProperties.guard(attrs.updatedProperties(), attrs::removeProperty, state);
// Guardrail to limit number of mvs per table
Iterable<ViewMetadata> tableViews = keyspace.views.forTable(table.id);
Guardrails.materializedViewsPerTable.guard(Iterables.size(tableViews) + 1,
String.format("%s on table %s", viewName, table.name),
false,
state);
if (table.params.gcGraceSeconds == 0)
{
throw ire("Cannot create materialized view '%s' for base table " +
"'%s' with gc_grace_seconds of 0, since this value is " +
"used to TTL undelivered updates. Setting gc_grace_seconds" +
" too low might cause undelivered updates to expire " +
"before being replayed.",
viewName, tableName);
}
/*
* Process SELECT clause
*/
Set<ColumnIdentifier> selectedColumns = new HashSet<>();
if (rawColumns.isEmpty()) // SELECT *
table.columns().forEach(c -> selectedColumns.add(c.name));
rawColumns.forEach(selector ->
{
if (null != selector.alias)
throw ire("Cannot use aliases when defining a materialized view (got %s)", selector);
if (!(selector.selectable instanceof Selectable.RawIdentifier))
throw ire("Can only select columns by name when defining a materialized view (got %s)", selector.selectable);
// will throw IRE if the column doesn't exist in the base table
Selectable.RawIdentifier rawIdentifier = (Selectable.RawIdentifier) selector.selectable;
ColumnMetadata column = rawIdentifier.columnMetadata(table);
selectedColumns.add(column.name);
});
selectedColumns.stream()
.map(table::getColumn)
.filter(ColumnMetadata::isStatic)
.findAny()
.ifPresent(c -> { throw ire("Cannot include static column '%s' in materialized view '%s'", c, viewName); });
/*
* Process PRIMARY KEY columns and CLUSTERING ORDER BY clause
*/
if (partitionKeyColumns.isEmpty())
throw ire("Must provide at least one partition key column for materialized view '%s'", viewName);
HashSet<ColumnIdentifier> primaryKeyColumns = new HashSet<>();
concat(partitionKeyColumns, clusteringColumns).forEach(name ->
{
ColumnMetadata column = table.getColumn(name);
if (null == column || !selectedColumns.contains(name))
throw ire("Unknown column '%s' referenced in PRIMARY KEY for materialized view '%s'", name, viewName);
if (!primaryKeyColumns.add(name))
throw ire("Duplicate column '%s' in PRIMARY KEY clause for materialized view '%s'", name, viewName);
AbstractType<?> type = column.type;
if (type.isMultiCell())
{
if (type.isCollection())
throw ire("Invalid non-frozen collection type '%s' for PRIMARY KEY column '%s'", type, name);
else
throw ire("Invalid non-frozen user-defined type '%s' for PRIMARY KEY column '%s'", type, name);
}
if (type.isCounter())
throw ire("counter type is not supported for PRIMARY KEY column '%s'", name);
if (type.referencesDuration())
throw ire("duration type is not supported for PRIMARY KEY column '%s'", name);
});
// If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
if (!clusteringOrder.isEmpty() && !clusteringColumns.equals(new ArrayList<>(clusteringOrder.keySet())))
throw ire("Clustering key columns must exactly match columns in CLUSTERING ORDER BY directive");
/*
* We need to include all of the primary key columns from the base table in order to make sure that we do not
* overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
* the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
* used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
* that they include all of the columns. We provide them with a list of all of the columns left to include.
*/
List<ColumnIdentifier> missingPrimaryKeyColumns =
Lists.newArrayList(filter(transform(table.primaryKeyColumns(), c -> c.name), c -> !primaryKeyColumns.contains(c)));
if (!missingPrimaryKeyColumns.isEmpty())
{
throw ire("Cannot create materialized view '%s' without primary key columns %s from base table '%s'",
viewName, join(", ", transform(missingPrimaryKeyColumns, ColumnIdentifier::toString)), tableName);
}
Set<ColumnIdentifier> regularBaseTableColumnsInViewPrimaryKey = new HashSet<>(primaryKeyColumns);
transform(table.primaryKeyColumns(), c -> c.name).forEach(regularBaseTableColumnsInViewPrimaryKey::remove);
if (regularBaseTableColumnsInViewPrimaryKey.size() > 1)
{
throw ire("Cannot include more than one non-primary key column in materialized view primary key (got %s)",
join(", ", transform(regularBaseTableColumnsInViewPrimaryKey, ColumnIdentifier::toString)));
}
/*
* Process WHERE clause
*/
if (whereClause.containsTokenRelations())
throw new InvalidRequestException("Cannot use token relation when defining a materialized view");
if (whereClause.containsCustomExpressions())
throw ire("WHERE clause for materialized view '%s' cannot contain custom index expressions", viewName);
StatementRestrictions restrictions =
new StatementRestrictions(state,
StatementType.SELECT,
table,
whereClause,
VariableSpecifications.empty(),
false,
false,
true,
true);
List<ColumnIdentifier> nonRestrictedPrimaryKeyColumns =
Lists.newArrayList(filter(primaryKeyColumns, name -> !restrictions.isRestricted(table.getColumn(name))));
if (!nonRestrictedPrimaryKeyColumns.isEmpty())
{
throw ire("Primary key columns %s must be restricted with 'IS NOT NULL' or otherwise",
join(", ", transform(nonRestrictedPrimaryKeyColumns, ColumnIdentifier::toString)));
}
// See CASSANDRA-13798
Set<ColumnMetadata> restrictedNonPrimaryKeyColumns = restrictions.nonPKRestrictedColumns(false);
if (!restrictedNonPrimaryKeyColumns.isEmpty() && !MV_ALLOW_FILTERING_NONKEY_COLUMNS_UNSAFE.getBoolean())
{
throw ire("Non-primary key columns can only be restricted with 'IS NOT NULL' (got: %s restricted illegally)",
join(",", transform(restrictedNonPrimaryKeyColumns, ColumnMetadata::toString)));
}
/*
* Validate WITH params
*/
attrs.validate();
if (attrs.hasOption(TableParams.Option.DEFAULT_TIME_TO_LIVE)
&& attrs.getInt(TableParams.Option.DEFAULT_TIME_TO_LIVE.toString(), 0) != 0)
{
throw ire("Cannot set default_time_to_live for a materialized view. " +
"Data in a materialized view always expire at the same time than " +
"the corresponding data in the parent table.");
}
/*
* Build the thing
*/
TableMetadata.Builder builder = TableMetadata.builder(keyspaceName, viewName);
if (attrs.hasProperty(TableAttributes.ID))
builder.id(attrs.getId());
builder.params(attrs.asNewTableParams())
.kind(TableMetadata.Kind.VIEW);
partitionKeyColumns.stream()
.map(table::getColumn)
.forEach(column -> builder.addPartitionKeyColumn(column.name, getType(column), column.getMask()));
clusteringColumns.stream()
.map(table::getColumn)
.forEach(column -> builder.addClusteringColumn(column.name, getType(column), column.getMask()));
selectedColumns.stream()
.filter(name -> !primaryKeyColumns.contains(name))
.map(table::getColumn)
.forEach(column -> builder.addRegularColumn(column.name, getType(column), column.getMask()));
ViewMetadata view = new ViewMetadata(table.id, table.name, rawColumns.isEmpty(), whereClause, builder.build());
view.metadata.validate();
return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.with(view)));
}
SchemaChange schemaChangeEvent(KeyspacesDiff diff)
{
return new SchemaChange(Change.CREATED, Target.TABLE, keyspaceName, viewName);
}
public void authorize(ClientState client)
{
client.ensureTablePermission(keyspaceName, tableName, Permission.ALTER);
}
private AbstractType<?> getType(ColumnMetadata column)
{
AbstractType<?> type = column.type;
if (clusteringOrder.containsKey(column.name))
{
boolean reverse = !clusteringOrder.get(column.name);
if (type.isReversed() && !reverse)
return ((ReversedType<?>) type).baseType;
if (!type.isReversed() && reverse)
return ReversedType.getInstance(type);
}
return type;
}
@Override
Set<String> clientWarnings(KeyspacesDiff diff)
{
return ImmutableSet.of(View.USAGE_WARNING);
}
@Override
public AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.CREATE_VIEW, keyspaceName, viewName);
}
public String toString()
{
return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, viewName);
}
public final static class Raw extends CQLStatement.Raw
{
private final QualifiedName tableName;
private final QualifiedName viewName;
private final boolean ifNotExists;
private final List<RawSelector> rawColumns;
private final List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
private List<ColumnIdentifier> partitionKeyColumns;
private final WhereClause whereClause;
private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder = new LinkedHashMap<>();
public final TableAttributes attrs = new TableAttributes();
public Raw(QualifiedName tableName, QualifiedName viewName, List<RawSelector> rawColumns, WhereClause whereClause, boolean ifNotExists)
{
this.tableName = tableName;
this.viewName = viewName;
this.rawColumns = rawColumns;
this.whereClause = whereClause;
this.ifNotExists = ifNotExists;
}
public CreateViewStatement prepare(ClientState state)
{
String keyspaceName = viewName.hasKeyspace() ? viewName.getKeyspace() : state.getKeyspace();
if (tableName.hasKeyspace() && !keyspaceName.equals(tableName.getKeyspace()))
throw ire("Cannot create a materialized view on a table in a different keyspace");
if (!bindVariables.isEmpty())
throw ire("Bind variables are not allowed in CREATE MATERIALIZED VIEW statements");
if (null == partitionKeyColumns)
throw ire("No PRIMARY KEY specifed for view '%s' (exactly one required)", viewName);
return new CreateViewStatement(keyspaceName,
tableName.getName(),
viewName.getName(),
rawColumns,
partitionKeyColumns,
clusteringColumns,
whereClause,
clusteringOrder,
attrs,
ifNotExists);
}
public void setPartitionKeyColumns(List<ColumnIdentifier> columns)
{
partitionKeyColumns = columns;
}
public void markClusteringColumn(ColumnIdentifier column)
{
clusteringColumns.add(column);
}
public void extendClusteringOrder(ColumnIdentifier column, boolean ascending)
{
if (null != clusteringOrder.put(column, ascending))
throw ire("Duplicate column '%s' in CLUSTERING ORDER BY clause for view '%s'", column, viewName);
}
}
}