blob: b1f576e41fe24be4547ae18554c2553eeef033e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.statements;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.db.KeyspaceNotDefinedException;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static java.lang.String.format;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotEmpty;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
/**
* The differents <code>DESCRIBE</code> statements parsed from a CQL statement.
*/
public abstract class DescribeStatement<T> extends CQLStatement.Raw implements CQLStatement
{
private static final String KS = "system";
private static final String CF = "describe";
/**
* The columns returned by the describe queries that only list elements names (e.g. DESCRIBE KEYSPACES, DESCRIBE TABLES...)
*/
private static final List<ColumnSpecification> LIST_METADATA =
ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("keyspace_name", true), UTF8Type.instance),
new ColumnSpecification(KS, CF, new ColumnIdentifier("type", true), UTF8Type.instance),
new ColumnSpecification(KS, CF, new ColumnIdentifier("name", true), UTF8Type.instance));
/**
* The columns returned by the describe queries that returns the CREATE STATEMENT for the different elements (e.g. DESCRIBE KEYSPACE, DESCRIBE TABLE ...)
*/
private static final List<ColumnSpecification> ELEMENT_METADATA =
ImmutableList.<ColumnSpecification>builder().addAll(LIST_METADATA)
.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("create_statement", true), UTF8Type.instance))
.build();
/**
* "Magic version" for the paging state.
*/
private static final int PAGING_STATE_VERSION = 0x0001;
static final String SCHEMA_CHANGED_WHILE_PAGING_MESSAGE = "The schema has changed since the previous page of the DESCRIBE statement result. " +
"Please retry the DESCRIBE statement.";
private boolean includeInternalDetails;
public final void withInternalDetails()
{
this.includeInternalDetails = true;
}
@Override
public final CQLStatement prepare(ClientState clientState) throws RequestValidationException
{
return this;
}
public final List<ColumnSpecification> getBindVariables()
{
return Collections.emptyList();
}
@Override
public final void authorize(ClientState state)
{
}
@Override
public final void validate(ClientState state)
{
}
public final AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.DESCRIBE);
}
@Override
public final ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
return executeLocally(state, options);
}
@Override
public ResultMessage executeLocally(QueryState state, QueryOptions options)
{
Keyspaces keyspaces = Schema.instance.distributedAndLocalKeyspaces();
UUID schemaVersion = Schema.instance.getVersion();
keyspaces = Keyspaces.builder()
.add(keyspaces)
.add(VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
.build();
PagingState pagingState = options.getPagingState();
// The paging implemented here uses some arbitray row number as the partition-key for paging,
// which is used to skip/limit the result from the Java Stream. This works good enough for
// reasonably sized schemas. Even a 'DESCRIBE SCHEMA' for an abnormally schema with 10000 tables
// completes within a few seconds. This seems good enough for now. Once Cassandra actually supports
// more than a few hundred tables, the implementation here should be reconsidered.
//
// Paging is only supported on row-level.
//
// The "partition key" in the paging-state contains a serialized object:
// (short) version, currently 0x0001
// (long) row offset
// (vint bytes) serialized schema hash (currently the result of Keyspaces.hashCode())
//
long offset = getOffset(pagingState, schemaVersion);
int pageSize = options.getPageSize();
Stream<? extends T> stream = describe(state.getClientState(), keyspaces);
if (offset > 0L)
stream = stream.skip(offset);
if (pageSize > 0)
stream = stream.limit(pageSize);
List<List<ByteBuffer>> rows = stream.map(e -> toRow(e, includeInternalDetails))
.collect(Collectors.toList());
ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata(state.getClientState()));
ResultSet result = new ResultSet(resultMetadata, rows);
if (pageSize > 0 && rows.size() == pageSize)
{
result.metadata.setHasMorePages(getPagingState(offset + pageSize, schemaVersion));
}
return new ResultMessage.Rows(result);
}
/**
* Returns the columns of the {@code ResultMetadata}
*/
protected abstract List<ColumnSpecification> metadata(ClientState state);
private PagingState getPagingState(long nextPageOffset, UUID schemaVersion)
{
try (DataOutputBuffer out = new DataOutputBuffer())
{
out.writeShort(PAGING_STATE_VERSION);
out.writeUTF(FBUtilities.getReleaseVersionString());
out.write(UUIDGen.decompose(schemaVersion));
out.writeLong(nextPageOffset);
return new PagingState(out.asNewBuffer(),
null,
Integer.MAX_VALUE,
Integer.MAX_VALUE);
}
catch (IOException e)
{
throw new InvalidRequestException("Invalid paging state.", e);
}
}
private long getOffset(PagingState pagingState, UUID schemaVersion)
{
if (pagingState == null)
return 0L;
try (DataInputBuffer in = new DataInputBuffer(pagingState.partitionKey, false))
{
checkTrue(in.readShort() == PAGING_STATE_VERSION, "Incompatible paging state");
final String pagingStateServerVersion = in.readUTF();
final String releaseVersion = FBUtilities.getReleaseVersionString();
checkTrue(pagingStateServerVersion.equals(releaseVersion),
"The server version of the paging state %s is different from the one of the server %s",
pagingStateServerVersion,
releaseVersion);
byte[] bytes = new byte[UUIDGen.UUID_LEN];
in.read(bytes);
UUID version = UUIDGen.getUUID(ByteBuffer.wrap(bytes));
checkTrue(schemaVersion.equals(version), SCHEMA_CHANGED_WHILE_PAGING_MESSAGE);
return in.readLong();
}
catch (IOException e)
{
throw new InvalidRequestException("Invalid paging state.", e);
}
}
protected abstract List<ByteBuffer> toRow(T element, boolean withInternals);
/**
* Returns the schema elements that must be part of the output.
*/
protected abstract Stream<? extends T> describe(ClientState state, Keyspaces keyspaces);
/**
* Returns the metadata for the given keyspace or throws a {@link KeyspaceNotDefinedException} exception.
*/
private static KeyspaceMetadata validateKeyspace(String ks, Keyspaces keyspaces)
{
return keyspaces.get(ks)
.orElseThrow(() -> new KeyspaceNotDefinedException(format("'%s' not found in keyspaces", ks)));
}
/**
* {@code DescribeStatement} implementation used for describe queries that only list elements names.
*/
public static final class Listing extends DescribeStatement<SchemaElement>
{
private final java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider;
public Listing(java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider)
{
this.elementsProvider = elementsProvider;
}
@Override
protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
{
String keyspace = state.getRawKeyspace();
Stream<KeyspaceMetadata> stream = keyspace == null ? keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR)
: Stream.of(validateKeyspace(keyspace, keyspaces));
return stream.flatMap(k -> elementsProvider.apply(k).sorted(SchemaElement.NAME_COMPARATOR));
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
return LIST_METADATA;
}
@Override
protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
{
return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
bytes(element.elementType().toString()),
bytes(element.elementNameQuotedIfNeeded()));
}
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE TABLES}.
*/
public static DescribeStatement<SchemaElement> tables()
{
return new Listing(ks -> ks.tables.stream());
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE TYPES}.
*/
public static DescribeStatement<SchemaElement> types()
{
return new Listing(ks -> ks.types.stream());
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTIONS}.
*/
public static DescribeStatement<SchemaElement> functions()
{
return new Listing(ks -> ks.functions.udfs());
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE AGGREGATES}.
*/
public static DescribeStatement<SchemaElement> aggregates()
{
return new Listing(ks -> ks.functions.udas());
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACES}.
*/
public static DescribeStatement<SchemaElement> keyspaces()
{
return new DescribeStatement<SchemaElement>()
{
@Override
protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
{
return keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR);
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
return LIST_METADATA;
}
@Override
protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
{
return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
bytes(element.elementType().toString()),
bytes(element.elementNameQuotedIfNeeded()));
}
};
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE [FULL] SCHEMA}.
*/
public static DescribeStatement<SchemaElement> schema(boolean includeSystemKeyspaces)
{
return new DescribeStatement<SchemaElement>()
{
@Override
protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
{
return keyspaces.stream()
.filter(ks -> includeSystemKeyspaces || !SchemaConstants.isSystemKeyspace(ks.name))
.sorted(SchemaElement.NAME_COMPARATOR)
.flatMap(ks -> getKeyspaceElements(ks, false));
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
return ELEMENT_METADATA;
}
@Override
protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
{
return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
bytes(element.elementType().toString()),
bytes(element.elementNameQuotedIfNeeded()),
bytes(element.toCqlString(withInternals, false)));
}
};
}
/**
* {@code DescribeStatement} implementation used for describe queries for a single schema element.
*/
public static class Element extends DescribeStatement<SchemaElement>
{
/**
* The keyspace name
*/
private final String keyspace;
/**
* The element name
*/
private final String name;
private final BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider;
public Element(String keyspace, String name, BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider)
{
this.keyspace = keyspace;
this.name = name;
this.elementsProvider = elementsProvider;
}
@Override
protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
{
String ks = keyspace == null ? checkNotNull(state.getRawKeyspace(), "No keyspace specified and no current keyspace")
: keyspace;
return elementsProvider.apply(validateKeyspace(ks, keyspaces), name);
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
return ELEMENT_METADATA;
}
@Override
protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
{
return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
bytes(element.elementType().toString()),
bytes(element.elementNameQuotedIfNeeded()),
bytes(element.toCqlString(withInternals, false)));
}
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACE}.
*/
public static DescribeStatement<SchemaElement> keyspace(String keyspace, boolean onlyKeyspaceDefinition)
{
return new Element(keyspace, null, (ks, t) -> getKeyspaceElements(ks, onlyKeyspaceDefinition));
}
private static Stream<? extends SchemaElement> getKeyspaceElements(KeyspaceMetadata ks, boolean onlyKeyspace)
{
Stream<? extends SchemaElement> s = Stream.of(ks);
if (!onlyKeyspace)
{
s = Stream.concat(s, ks.types.sortedStream());
s = Stream.concat(s, ks.functions.udfs().sorted(SchemaElement.NAME_COMPARATOR));
s = Stream.concat(s, ks.functions.udas().sorted(SchemaElement.NAME_COMPARATOR));
s = Stream.concat(s, ks.tables.stream().sorted(SchemaElement.NAME_COMPARATOR)
.flatMap(tm -> getTableElements(ks, tm)));
}
return s;
}
private static Stream<? extends SchemaElement> getTableElements(KeyspaceMetadata ks, TableMetadata table)
{
Stream<? extends SchemaElement> s = Stream.of(table);
s = Stream.concat(s, table.indexes.stream()
.map(i -> toDescribable(table, i))
.sorted(SchemaElement.NAME_COMPARATOR));
s = Stream.concat(s, ks.views.stream(table.id)
.sorted(SchemaElement.NAME_COMPARATOR));
return s;
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE TABLE}.
*/
public static DescribeStatement<SchemaElement> table(String keyspace, String name)
{
return new Element(keyspace, name, (ks, t) -> {
TableMetadata table = checkNotNull(ks.getTableOrViewNullable(t),
"Table '%s' not found in keyspace '%s'", t, ks.name);
return Stream.concat(Stream.of(table), table.indexes.stream()
.map(index -> toDescribable(table, index))
.sorted(SchemaElement.NAME_COMPARATOR));
});
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE INDEX}.
*/
public static DescribeStatement<SchemaElement> index(String keyspace, String name)
{
return new Element(keyspace, name, (ks, index) -> {
TableMetadata tm = ks.findIndexedTable(index)
.orElseThrow(() -> invalidRequest("Table for existing index '%s' not found in '%s'",
index,
ks.name));
return tm.indexes.get(index)
.map(i -> toDescribable(tm, i))
.map(Stream::of)
.orElseThrow(() -> invalidRequest("Index '%s' not found in '%s'", index, ks.name));
});
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE MATERIALIZED VIEW}.
*/
public static DescribeStatement<SchemaElement> view(String keyspace, String name)
{
return new Element(keyspace, name, (ks, view) -> {
return ks.views.get(view)
.map(Stream::of)
.orElseThrow(() -> invalidRequest("Materialized view '%s' not found in '%s'", view, ks.name));
});
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE TYPE}.
*/
public static DescribeStatement<SchemaElement> type(String keyspace, String name)
{
return new Element(keyspace, name, (ks, type) -> {
return ks.types.get(ByteBufferUtil.bytes(type))
.map(Stream::of)
.orElseThrow(() -> invalidRequest("User defined type '%s' not found in '%s'",
type,
ks.name));
});
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
*/
public static DescribeStatement<SchemaElement> function(String keyspace, String name)
{
return new Element(keyspace, name, (ks, n) -> {
return checkNotEmpty(ks.functions.getUdfs(new FunctionName(ks.name, n)),
"User defined function '%s' not found in '%s'", n, ks.name).stream()
.sorted(SchemaElement.NAME_COMPARATOR);
});
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
*/
public static DescribeStatement<SchemaElement> aggregate(String keyspace, String name)
{
return new Element(keyspace, name, (ks, n) -> {
return checkNotEmpty(ks.functions.getUdas(new FunctionName(ks.name, n)),
"User defined aggregate '%s' not found in '%s'", n, ks.name).stream()
.sorted(SchemaElement.NAME_COMPARATOR);
});
}
private static SchemaElement toDescribable(TableMetadata table, IndexMetadata index)
{
return new SchemaElement()
{
@Override
public SchemaElementType elementType()
{
return SchemaElementType.INDEX;
}
@Override
public String elementKeyspace()
{
return table.keyspace;
}
@Override
public String elementName()
{
return index.name;
}
@Override
public String toCqlString(boolean withInternals, boolean ifNotExists)
{
return index.toCqlString(table, ifNotExists);
}
};
}
/**
* Creates a {@link DescribeStatement} for the generic {@code DESCRIBE ...}.
*/
public static DescribeStatement<SchemaElement> generic(String keyspace, String name)
{
return new DescribeStatement<SchemaElement>()
{
private DescribeStatement<SchemaElement> delegate;
private DescribeStatement<SchemaElement> resolve(ClientState state, Keyspaces keyspaces)
{
String ks = keyspace;
// from cqlsh help: "keyspace or a table or an index or a materialized view (in this order)."
if (keyspace == null)
{
if (keyspaces.containsKeyspace(name))
return keyspace(name, false);
String rawKeyspace = state.getRawKeyspace();
ks = rawKeyspace == null ? name : rawKeyspace;
}
KeyspaceMetadata keyspaceMetadata = validateKeyspace(ks, keyspaces);
if (keyspaceMetadata.tables.getNullable(name) != null)
return table(ks, name);
Optional<TableMetadata> indexed = keyspaceMetadata.findIndexedTable(name);
if (indexed.isPresent())
{
Optional<IndexMetadata> index = indexed.get().indexes.get(name);
if (index.isPresent())
return index(ks, name);
}
if (keyspaceMetadata.views.getNullable(name) != null)
return view(ks, name);
throw invalidRequest("'%s' not found in keyspace '%s'", name, ks);
}
@Override
protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
{
delegate = resolve(state, keyspaces);
return delegate.describe(state, keyspaces);
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
return delegate.metadata(state);
}
@Override
protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
{
return delegate.toRow(element, withInternals);
}
};
}
/**
* Creates a {@link DescribeStatement} for {@code DESCRIBE CLUSTER}.
*/
public static DescribeStatement<List<Object>> cluster()
{
return new DescribeStatement<List<Object>>()
{
/**
* The column index of the cluster name
*/
private static final int CLUSTER_NAME_INDEX = 0;
/**
* The column index of the partitioner name
*/
private static final int PARTITIONER_NAME_INDEX = 1;
/**
* The column index of the snitch class
*/
private static final int SNITCH_CLASS_INDEX = 2;
/**
* The range ownerships index
*/
private static final int RANGE_OWNERSHIPS_INDEX = 3;
@Override
protected Stream<List<Object>> describe(ClientState state, Keyspaces keyspaces)
{
List<Object> list = new ArrayList<Object>();
list.add(DatabaseDescriptor.getClusterName());
list.add(trimIfPresent(DatabaseDescriptor.getPartitionerName(), "org.apache.cassandra.dht."));
list.add(trimIfPresent(DatabaseDescriptor.getEndpointSnitch().getClass().getName(),
"org.apache.cassandra.locator."));
String useKs = state.getRawKeyspace();
if (mustReturnsRangeOwnerships(useKs))
{
list.add(StorageService.instance.getRangeToAddressMap(useKs)
.entrySet()
.stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.collect(Collectors.toMap(e -> e.getKey().right.toString(),
e -> e.getValue()
.stream()
.map(r -> r.endpoint().toString())
.collect(Collectors.toList()))));
}
return Stream.of(list);
}
private boolean mustReturnsRangeOwnerships(String useKs)
{
return useKs != null && !SchemaConstants.isLocalSystemKeyspace(useKs) && !SchemaConstants.isSystemKeyspace(useKs);
}
@Override
protected List<ColumnSpecification> metadata(ClientState state)
{
ImmutableList.Builder<ColumnSpecification> builder = ImmutableList.builder();
builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("cluster", true), UTF8Type.instance),
new ColumnSpecification(KS, CF, new ColumnIdentifier("partitioner", true), UTF8Type.instance),
new ColumnSpecification(KS, CF, new ColumnIdentifier("snitch", true), UTF8Type.instance));
if (mustReturnsRangeOwnerships(state.getRawKeyspace()))
builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("range_ownership", true), MapType.getInstance(UTF8Type.instance,
ListType.getInstance(UTF8Type.instance, false), false)));
return builder.build();
}
@Override
protected List<ByteBuffer> toRow(List<Object> elements, boolean withInternals)
{
ImmutableList.Builder<ByteBuffer> builder = ImmutableList.builder();
builder.add(UTF8Type.instance.decompose((String) elements.get(CLUSTER_NAME_INDEX)),
UTF8Type.instance.decompose((String) elements.get(PARTITIONER_NAME_INDEX)),
UTF8Type.instance.decompose((String) elements.get(SNITCH_CLASS_INDEX)));
if (elements.size() > 3)
{
MapType<String, List<String>> rangeOwnershipType = MapType.getInstance(UTF8Type.instance,
ListType.getInstance(UTF8Type.instance, false),
false);
builder.add(rangeOwnershipType.decompose((Map<String, List<String>>) elements.get(RANGE_OWNERSHIPS_INDEX)));
}
return builder.build();
}
private String trimIfPresent(String src, String begin)
{
if (src.startsWith(begin))
return src.substring(begin.length());
return src;
}
};
}
}