| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.cql3.selection; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.cassandra.schema.CQLTypeParser; |
| import org.apache.cassandra.schema.KeyspaceMetadata; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.cql3.ColumnIdentifier; |
| import org.apache.cassandra.cql3.ColumnSpecification; |
| import org.apache.cassandra.cql3.QueryOptions; |
| import org.apache.cassandra.cql3.functions.Function; |
| import org.apache.cassandra.db.TypeSizes; |
| import org.apache.cassandra.db.context.CounterContext; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.exceptions.InvalidRequestException; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.transport.ProtocolVersion; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| |
| import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; |
| |
| /** |
| * A <code>Selector</code> is used to convert the data returned by the storage engine into the data requested by the |
| * user. They correspond to the <selector> elements from the select clause. |
| * <p>Since the introduction of aggregation, <code>Selector</code>s cannot be called anymore by multiple threads |
| * as they have an internal state.</p> |
| */ |
| public abstract class Selector |
| { |
| protected static abstract class SelectorDeserializer |
| { |
| protected abstract Selector deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException; |
| |
| protected final AbstractType<?> readType(TableMetadata metadata, DataInputPlus in) throws IOException |
| { |
| KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(metadata.keyspace); |
| return readType(keyspace, in); |
| } |
| |
| protected final AbstractType<?> readType(KeyspaceMetadata keyspace, DataInputPlus in) throws IOException |
| { |
| String cqlType = in.readUTF(); |
| return CQLTypeParser.parse(keyspace.name, cqlType, keyspace.types); |
| } |
| } |
| |
| /** |
| * The <code>Selector</code> kinds. |
| */ |
| public static enum Kind |
| { |
| SIMPLE_SELECTOR(SimpleSelector.deserializer), |
| TERM_SELECTOR(TermSelector.deserializer), |
| WRITETIME_OR_TTL_SELECTOR(WritetimeOrTTLSelector.deserializer), |
| LIST_SELECTOR(ListSelector.deserializer), |
| SET_SELECTOR(SetSelector.deserializer), |
| MAP_SELECTOR(MapSelector.deserializer), |
| TUPLE_SELECTOR(TupleSelector.deserializer), |
| USER_TYPE_SELECTOR(UserTypeSelector.deserializer), |
| FIELD_SELECTOR(FieldSelector.deserializer), |
| SCALAR_FUNCTION_SELECTOR(ScalarFunctionSelector.deserializer), |
| AGGREGATE_FUNCTION_SELECTOR(AggregateFunctionSelector.deserializer), |
| ELEMENT_SELECTOR(ElementsSelector.ElementSelector.deserializer), |
| SLICE_SELECTOR(ElementsSelector.SliceSelector.deserializer); |
| |
| private final SelectorDeserializer deserializer; |
| |
| Kind(SelectorDeserializer deserializer) |
| { |
| this.deserializer = deserializer; |
| } |
| } |
| |
| /** |
| * A factory for <code>Selector</code> instances. |
| */ |
| public static abstract class Factory |
| { |
| public void addFunctionsTo(List<Function> functions) |
| { |
| } |
| |
| /** |
| * Returns the column specification corresponding to the output value of the selector instances created by |
| * this factory. |
| * |
| * @param table the table meta data |
| * @return a column specification |
| */ |
| public ColumnSpecification getColumnSpecification(TableMetadata table) |
| { |
| return new ColumnSpecification(table.keyspace, |
| table.name, |
| new ColumnIdentifier(getColumnName(), true), // note that the name is not necessarily |
| // a true column name so we shouldn't intern it |
| getReturnType()); |
| } |
| |
| /** |
| * Creates a new <code>Selector</code> instance. |
| * |
| * @param options the options of the query for which the instance is created (some selector |
| * depends on the bound values in particular). |
| * @return a new <code>Selector</code> instance |
| */ |
| public abstract Selector newInstance(QueryOptions options); |
| |
| /** |
| * Checks if this factory creates selectors instances that creates aggregates. |
| * |
| * @return <code>true</code> if this factory creates selectors instances that creates aggregates, |
| * <code>false</code> otherwise |
| */ |
| public boolean isAggregateSelectorFactory() |
| { |
| return false; |
| } |
| |
| /** |
| * Checks if this factory creates <code>writetime</code> selectors instances. |
| * |
| * @return <code>true</code> if this factory creates <code>writetime</code> selectors instances, |
| * <code>false</code> otherwise |
| */ |
| public boolean isWritetimeSelectorFactory() |
| { |
| return false; |
| } |
| |
| /** |
| * Checks if this factory creates <code>TTL</code> selectors instances. |
| * |
| * @return <code>true</code> if this factory creates <code>TTL</code> selectors instances, |
| * <code>false</code> otherwise |
| */ |
| public boolean isTTLSelectorFactory() |
| { |
| return false; |
| } |
| |
| /** |
| * Checks if this factory creates <code>Selector</code>s that simply return a column value. |
| * |
| * @param index the column index |
| * @return <code>true</code> if this factory creates <code>Selector</code>s that simply return a column value, |
| * <code>false</code> otherwise. |
| */ |
| public boolean isSimpleSelectorFactory() |
| { |
| return false; |
| } |
| |
| /** |
| * Checks if this factory creates <code>Selector</code>s that simply return the specified column. |
| * |
| * @param index the column index |
| * @return <code>true</code> if this factory creates <code>Selector</code>s that simply return |
| * the specified column, <code>false</code> otherwise. |
| */ |
| public boolean isSimpleSelectorFactoryFor(int index) |
| { |
| return false; |
| } |
| |
| /** |
| * Returns the name of the column corresponding to the output value of the selector instances created by |
| * this factory. |
| * |
| * @return a column name |
| */ |
| protected abstract String getColumnName(); |
| |
| /** |
| * Returns the type of the values returned by the selector instances created by this factory. |
| * |
| * @return the selector output type |
| */ |
| protected abstract AbstractType<?> getReturnType(); |
| |
| /** |
| * Record a mapping between the ColumnDefinitions that are used by the selector |
| * instances created by this factory and a column in the ResultSet.Metadata |
| * returned with a query. In most cases, this is likely to be a 1:1 mapping, |
| * but some selector instances may utilise multiple columns (or none at all) |
| * to produce a value (i.e. functions). |
| * |
| * @param mapping the instance of the column mapping belonging to the current query's Selection |
| * @param resultsColumn the column in the ResultSet.Metadata to which the ColumnDefinitions used |
| * by the Selector are to be mapped |
| */ |
| protected abstract void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultsColumn); |
| |
| /** |
| * Checks if all the columns fetched by the selector created by this factory are known |
| * @return {@code true} if all the columns fetched by the selector created by this factory are known, |
| * {@code false} otherwise. |
| */ |
| abstract boolean areAllFetchedColumnsKnown(); |
| |
| /** |
| * Adds the columns fetched by the selector created by this factory to the provided builder, assuming the |
| * factory is terminal (i.e. that {@code isTerminal() == true}). |
| * |
| * @param builder the column builder to add fetched columns (and potential subselection) to. |
| * @throws AssertionError if the method is called on a factory where {@code isTerminal()} returns {@code false}. |
| */ |
| abstract void addFetchedColumns(ColumnFilter.Builder builder); |
| } |
| |
| public static class Serializer |
| { |
| public void serialize(Selector selector, DataOutputPlus out, int version) throws IOException |
| { |
| out.writeByte(selector.kind().ordinal()); |
| selector.serialize(out, version); |
| } |
| |
| public Selector deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException |
| { |
| Kind kind = Kind.values()[in.readUnsignedByte()]; |
| return kind.deserializer.deserialize(in, version, metadata); |
| } |
| |
| public int serializedSize(Selector selector, int version) |
| { |
| return TypeSizes.sizeof((byte) selector.kind().ordinal()) + selector.serializedSize(version); |
| } |
| } |
| |
| /** |
| * The {@code Selector} serializer. |
| */ |
| public static final Serializer serializer = new Serializer(); |
| |
| /** |
| * The {@code Selector} kind. |
| */ |
| private final Kind kind; |
| |
| /** |
| * Returns the {@code Selector} kind. |
| * @return the {@code Selector} kind |
| */ |
| public final Kind kind() |
| { |
| return kind; |
| } |
| |
| protected Selector(Kind kind) |
| { |
| this.kind = kind; |
| } |
| |
| /** |
| * Add to the provided builder the column (and potential subselections) to fetch for this |
| * selection. |
| * |
| * @param builder the builder to add columns and subselections to. |
| */ |
| public abstract void addFetchedColumns(ColumnFilter.Builder builder); |
| |
| /** |
| * A row of data that need to be processed by a {@code Selector} |
| */ |
| public static final class InputRow |
| { |
| private ByteBuffer[] values; |
| private final long[] timestamps; |
| private final int[] ttls; |
| private int index; |
| |
| public InputRow(int size, boolean collectTimestamps, boolean collectTTLs) |
| { |
| this.values = new ByteBuffer[size]; |
| |
| if (collectTimestamps) |
| { |
| this.timestamps = new long[size]; |
| // We use MIN_VALUE to indicate no timestamp |
| Arrays.fill(timestamps, Long.MIN_VALUE); |
| } |
| else |
| { |
| timestamps = null; |
| } |
| |
| if (collectTTLs) |
| { |
| this.ttls = new int[size]; |
| // We use -1 to indicate no ttl |
| Arrays.fill(ttls, -1); |
| } |
| else |
| { |
| ttls = null; |
| } |
| } |
| |
| public void add(ByteBuffer v) |
| { |
| values[index] = v; |
| |
| if (timestamps != null) |
| timestamps[index] = Long.MIN_VALUE; |
| |
| if (ttls != null) |
| ttls[index] = -1; |
| |
| index++; |
| } |
| |
| public void add(Cell<?> c, int nowInSec) |
| { |
| if (c == null) |
| { |
| add(null); |
| return; |
| } |
| |
| values[index] = value(c); |
| |
| if (timestamps != null) |
| timestamps[index] = c.timestamp(); |
| |
| if (ttls != null) |
| ttls[index] = remainingTTL(c, nowInSec); |
| |
| index++; |
| } |
| |
| private int remainingTTL(Cell<?> c, int nowInSec) |
| { |
| if (!c.isExpiring()) |
| return -1; |
| |
| int remaining = c.localDeletionTime() - nowInSec; |
| return remaining >= 0 ? remaining : -1; |
| } |
| |
| private <V> ByteBuffer value(Cell<V> c) |
| { |
| return c.isCounterCell() |
| ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value(), c.accessor())) |
| : c.buffer(); |
| } |
| |
| /** |
| * Return the value of the column with the specified index. |
| * |
| * @param index the column index |
| * @return the value of the column with the specified index |
| */ |
| public ByteBuffer getValue(int index) |
| { |
| return values[index]; |
| } |
| |
| /** |
| * Reset the row internal state. |
| * <p>If the reset is not a deep one only the index will be reset. If the reset is a deep one a new |
| * array will be created to store the column values. This allow to reduce object creation when it is not |
| * necessary.</p> |
| * |
| * @param deep {@code true} if the reset must be a deep one. |
| */ |
| public void reset(boolean deep) |
| { |
| index = 0; |
| if (deep) |
| values = new ByteBuffer[values.length]; |
| } |
| |
| /** |
| * Return the timestamp of the column with the specified index. |
| * |
| * @param index the column index |
| * @return the timestamp of the column with the specified index |
| */ |
| public long getTimestamp(int index) |
| { |
| return timestamps[index]; |
| } |
| |
| /** |
| * Return the ttl of the column with the specified index. |
| * |
| * @param index the column index |
| * @return the ttl of the column with the specified index |
| */ |
| public int getTtl(int index) |
| { |
| return ttls[index]; |
| } |
| |
| /** |
| * Returns the column values as list. |
| * <p>This content of the list will be shared with the {@code InputRow} unless a deep reset has been done.</p> |
| * @return the column values as list. |
| */ |
| public List<ByteBuffer> getValues() |
| { |
| return Arrays.asList(values); |
| } |
| } |
| |
| /** |
| * Add the current value from the specified <code>ResultSetBuilder</code>. |
| * |
| * @param protocolVersion protocol version used for serialization |
| * @param input the input row |
| * @throws InvalidRequestException if a problem occurs while adding the input row |
| */ |
| public abstract void addInput(ProtocolVersion protocolVersion, InputRow input); |
| |
| /** |
| * Returns the selector output. |
| * |
| * @param protocolVersion protocol version used for serialization |
| * @return the selector output |
| * @throws InvalidRequestException if a problem occurs while computing the output value |
| */ |
| public abstract ByteBuffer getOutput(ProtocolVersion protocolVersion) throws InvalidRequestException; |
| |
| /** |
| * Returns the <code>Selector</code> output type. |
| * |
| * @return the <code>Selector</code> output type. |
| */ |
| public abstract AbstractType<?> getType(); |
| |
| /** |
| * Reset the internal state of this <code>Selector</code>. |
| */ |
| public abstract void reset(); |
| |
| /** |
| * A selector is terminal if it doesn't require any input for it's output to be computed, i.e. if {@link #getOutput} |
| * result doesn't depend of {@link #addInput}. This is typically the case of a constant value or functions on constant |
| * values. |
| */ |
| public boolean isTerminal() |
| { |
| return false; |
| } |
| |
| /** |
| * Checks that this selector is valid for GROUP BY clause. |
| */ |
| public void validateForGroupBy() |
| { |
| throw invalidRequest("Only column names and monotonic scalar functions are supported in the GROUP BY clause."); |
| } |
| |
| protected abstract int serializedSize(int version); |
| |
| protected abstract void serialize(DataOutputPlus out, int version) throws IOException; |
| |
| protected static void writeType(DataOutputPlus out, AbstractType<?> type) throws IOException |
| { |
| out.writeUTF(type.asCQL3Type().toString()); |
| } |
| |
| protected static int sizeOf(AbstractType<?> type) |
| { |
| return TypeSizes.sizeof(type.asCQL3Type().toString()); |
| } |
| } |