blob: 7e9b716a15e28e803ada0655b92b13136fbe99a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.aggregation.GroupMaker;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class Selection
{
/**
* A predicate that returns <code>true</code> for static columns.
*/
private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = new Predicate<ColumnDefinition>()
{
public boolean apply(ColumnDefinition def)
{
return def.isStatic();
}
};
private final CFMetaData cfm;
private final List<ColumnDefinition> columns;
private final SelectionColumnMapping columnMapping;
private final ResultSet.ResultMetadata metadata;
private final boolean collectTimestamps;
private final boolean collectTTLs;
// Columns used to order the result set for multi-partition queries
private Map<ColumnDefinition, Integer> orderingIndex;
protected Selection(CFMetaData cfm,
List<ColumnDefinition> columns,
SelectionColumnMapping columnMapping,
boolean collectTimestamps,
boolean collectTTLs)
{
this.cfm = cfm;
this.columns = columns;
this.columnMapping = columnMapping;
this.metadata = new ResultSet.ResultMetadata(columnMapping.getColumnSpecifications());
this.collectTimestamps = collectTimestamps;
this.collectTTLs = collectTTLs;
}
// Overriden by SimpleSelection when appropriate.
public boolean isWildcard()
{
return false;
}
/**
* Checks if this selection contains static columns.
* @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise;
*/
public boolean containsStaticColumns()
{
if (cfm.isStaticCompactTable() || !cfm.hasStaticColumns())
return false;
if (isWildcard())
return true;
return !Iterables.isEmpty(Iterables.filter(columns, STATIC_COLUMN_FILTER));
}
/**
* Checks if this selection contains only static columns.
* @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise;
*/
public boolean containsOnlyStaticColumns()
{
if (!containsStaticColumns())
return false;
if (isWildcard())
return false;
for (ColumnDefinition def : getColumns())
{
if (!def.isPartitionKey() && !def.isStatic())
return false;
}
return true;
}
/**
* Checks if this selection contains a complex column.
*
* @return <code>true</code> if this selection contains a multicell collection or UDT, <code>false</code> otherwise.
*/
public boolean containsAComplexColumn()
{
for (ColumnDefinition def : getColumns())
if (def.isComplex())
return true;
return false;
}
public Map<ColumnDefinition, Integer> getOrderingIndex(boolean isJson)
{
if (!isJson)
return orderingIndex;
// If we order post-query in json, the first and only column that we ship to the client is the json column.
// In that case, we should keep ordering columns around to perform the ordering, then these columns will
// be placed after the json column. As a consequence of where the colums are placed, we should give the
// ordering index a value based on their position in the json encoding and discard the original index.
// (CASSANDRA-14286)
int columnIndex = 1;
Map<ColumnDefinition, Integer> jsonOrderingIndex = new LinkedHashMap<>(orderingIndex.size());
for (ColumnDefinition column : orderingIndex.keySet())
jsonOrderingIndex.put(column, columnIndex++);
return jsonOrderingIndex;
}
public ResultSet.ResultMetadata getResultMetadata(boolean isJson)
{
if (!isJson)
return metadata;
ColumnSpecification firstColumn = metadata.names.get(0);
ColumnSpecification jsonSpec = new ColumnSpecification(firstColumn.ksName, firstColumn.cfName, Json.JSON_COLUMN_ID, UTF8Type.instance);
ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(Lists.newArrayList(jsonSpec));
if (orderingIndex != null)
{
for (ColumnDefinition orderingColumn : orderingIndex.keySet())
resultMetadata.addNonSerializedColumn(orderingColumn);
}
return resultMetadata;
}
public static Selection wildcard(CFMetaData cfm)
{
List<ColumnDefinition> all = new ArrayList<>(cfm.allColumns().size());
Iterators.addAll(all, cfm.allColumnsInSelectOrder());
return new SimpleSelection(cfm, all, true);
}
public static Selection wildcardWithGroupBy(CFMetaData cfm, VariableSpecifications boundNames)
{
List<RawSelector> rawSelectors = new ArrayList<>(cfm.allColumns().size());
Iterator<ColumnDefinition> iter = cfm.allColumnsInSelectOrder();
while (iter.hasNext())
{
ColumnDefinition.Raw raw = ColumnDefinition.Raw.forColumn(iter.next());
rawSelectors.add(new RawSelector(raw, null));
}
return fromSelectors(cfm, rawSelectors, boundNames, true);
}
public static Selection forColumns(CFMetaData cfm, List<ColumnDefinition> columns)
{
return new SimpleSelection(cfm, columns, false);
}
public void addColumnForOrdering(ColumnDefinition c)
{
if (orderingIndex == null)
orderingIndex = new LinkedHashMap<>();
int index = getResultSetIndex(c);
if (index < 0)
index = addOrderingColumn(c);
orderingIndex.put(c, index);
}
protected int addOrderingColumn(ColumnDefinition c)
{
columns.add(c);
metadata.addNonSerializedColumn(c);
return columns.size() - 1;
}
public void addFunctionsTo(List<Function> functions)
{
}
private static boolean processesSelection(List<RawSelector> rawSelectors)
{
for (RawSelector rawSelector : rawSelectors)
{
if (rawSelector.processesSelection())
return true;
}
return false;
}
public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors, VariableSpecifications boundNames, boolean hasGroupBy)
{
List<ColumnDefinition> defs = new ArrayList<>();
SelectorFactories factories =
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), null, cfm, defs, boundNames);
SelectionColumnMapping mapping = collectColumnMappings(cfm, rawSelectors, factories);
return (processesSelection(rawSelectors) || rawSelectors.size() != defs.size() || hasGroupBy)
? new SelectionWithProcessing(cfm, defs, mapping, factories)
: new SimpleSelection(cfm, defs, mapping, false);
}
/**
* Returns the index of the specified column within the resultset
* @param c the column
* @return the index of the specified column within the resultset or -1
*/
public int getResultSetIndex(ColumnDefinition c)
{
return getColumnIndex(c);
}
/**
* Returns the index of the specified column
* @param c the column
* @return the index of the specified column or -1
*/
protected final int getColumnIndex(ColumnDefinition c)
{
for (int i = 0, m = columns.size(); i < m; i++)
if (columns.get(i).name.equals(c.name))
return i;
return -1;
}
private static SelectionColumnMapping collectColumnMappings(CFMetaData cfm,
List<RawSelector> rawSelectors,
SelectorFactories factories)
{
SelectionColumnMapping selectionColumns = SelectionColumnMapping.newMapping();
Iterator<RawSelector> iter = rawSelectors.iterator();
for (Selector.Factory factory : factories)
{
ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
ColumnIdentifier alias = iter.next().alias;
factory.addColumnMapping(selectionColumns,
alias == null ? colSpec : colSpec.withAlias(alias));
}
return selectionColumns;
}
protected abstract Selectors newSelectors(QueryOptions options) throws InvalidRequestException;
/**
* @return the list of CQL3 columns value this SelectionClause needs.
*/
public List<ColumnDefinition> getColumns()
{
return columns;
}
/**
* @return the mappings between resultset columns and the underlying columns
*/
public SelectionColumns getColumnMapping()
{
return columnMapping;
}
public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson)
{
return new ResultSetBuilder(options, isJson);
}
public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson, AggregationSpecification aggregationSpec)
{
return aggregationSpec == null ? new ResultSetBuilder(options, isJson)
: new ResultSetBuilder(options, isJson, aggregationSpec.newGroupMaker());
}
public abstract boolean isAggregate();
@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("columns", columns)
.add("columnMapping", columnMapping)
.add("metadata", metadata)
.add("collectTimestamps", collectTimestamps)
.add("collectTTLs", collectTTLs)
.toString();
}
public static List<ByteBuffer> rowToJson(List<ByteBuffer> row, ProtocolVersion protocolVersion, ResultSet.ResultMetadata metadata)
{
StringBuilder sb = new StringBuilder("{");
for (int i = 0; i < metadata.getColumnCount(); i++)
{
if (i > 0)
sb.append(", ");
ColumnSpecification spec = metadata.names.get(i);
String columnName = spec.name.toString();
if (!columnName.equals(columnName.toLowerCase(Locale.US)))
columnName = "\"" + columnName + "\"";
ByteBuffer buffer = row.get(i);
sb.append('"');
sb.append(Json.quoteAsJsonString(columnName));
sb.append("\": ");
if (buffer == null)
sb.append("null");
else
sb.append(spec.type.toJSONString(buffer, protocolVersion));
}
sb.append("}");
List<ByteBuffer> jsonRow = new ArrayList<>();
jsonRow.add(UTF8Type.instance.getSerializer().serialize(sb.toString()));
return jsonRow;
}
public class ResultSetBuilder
{
private final ResultSet resultSet;
private final ProtocolVersion protocolVersion;
/**
* As multiple thread can access a <code>Selection</code> instance each <code>ResultSetBuilder</code> will use
* its own <code>Selectors</code> instance.
*/
private final Selectors selectors;
/**
* The <code>GroupMaker</code> used to build the aggregates.
*/
private final GroupMaker groupMaker;
/*
* We'll build CQL3 row one by one.
* The currentRow is the values for the (CQL3) columns we've fetched.
* We also collect timestamps and ttls for the case where the writetime and
* ttl functions are used. Note that we might collect timestamp and/or ttls
* we don't care about, but since the array below are allocated just once,
* it doesn't matter performance wise.
*/
List<ByteBuffer> current;
final long[] timestamps;
final int[] ttls;
private final boolean isJson;
private ResultSetBuilder(QueryOptions options, boolean isJson)
{
this(options, isJson, null);
}
private ResultSetBuilder(QueryOptions options, boolean isJson, GroupMaker groupMaker)
{
this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), new ArrayList<List<ByteBuffer>>());
this.protocolVersion = options.getProtocolVersion();
this.selectors = newSelectors(options);
this.groupMaker = groupMaker;
this.timestamps = collectTimestamps ? new long[columns.size()] : null;
this.ttls = collectTTLs ? new int[columns.size()] : null;
this.isJson = isJson;
// We use MIN_VALUE to indicate no timestamp and -1 for no ttl
if (timestamps != null)
Arrays.fill(timestamps, Long.MIN_VALUE);
if (ttls != null)
Arrays.fill(ttls, -1);
}
public void add(ByteBuffer v)
{
current.add(v);
}
public void add(Cell c, int nowInSec)
{
if (c == null)
{
current.add(null);
return;
}
current.add(value(c));
if (timestamps != null)
timestamps[current.size() - 1] = c.timestamp();
if (ttls != null)
ttls[current.size() - 1] = remainingTTL(c, nowInSec);
}
private int remainingTTL(Cell c, int nowInSec)
{
if (!c.isExpiring())
return -1;
int remaining = c.localDeletionTime() - nowInSec;
return remaining >= 0 ? remaining : -1;
}
private ByteBuffer value(Cell c)
{
return c.isCounterCell()
? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
: c.value();
}
/**
* Notifies this <code>Builder</code> that a new row is being processed.
*
* @param partitionKey the partition key of the new row
* @param clustering the clustering of the new row
*/
public void newRow(DecoratedKey partitionKey, Clustering clustering)
{
// The groupMaker needs to be called for each row
boolean isNewAggregate = groupMaker == null || groupMaker.isNewGroup(partitionKey, clustering);
if (current != null)
{
selectors.addInputRow(protocolVersion, this);
if (isNewAggregate)
{
resultSet.addRow(getOutputRow());
selectors.reset();
}
}
current = new ArrayList<>(columns.size());
// Timestamps and TTLs are arrays per row, we must null them out between rows
if (timestamps != null)
Arrays.fill(timestamps, Long.MIN_VALUE);
if (ttls != null)
Arrays.fill(ttls, -1);
}
/**
* Builds the <code>ResultSet</code>
*/
public ResultSet build()
{
if (current != null)
{
selectors.addInputRow(protocolVersion, this);
resultSet.addRow(getOutputRow());
selectors.reset();
current = null;
}
// For aggregates we need to return a row even it no records have been found
if (resultSet.isEmpty() && groupMaker != null && groupMaker.returnAtLeastOneRow())
resultSet.addRow(getOutputRow());
return resultSet;
}
private List<ByteBuffer> getOutputRow()
{
List<ByteBuffer> outputRow = selectors.getOutputRow(protocolVersion);
if (isJson)
{
// Keep all columns around for possible post-query ordering. (CASSANDRA-14286)
List<ByteBuffer> jsonRow = rowToJson(outputRow, protocolVersion, metadata);
// Keep ordering columns around for possible post-query ordering. (CASSANDRA-14286)
if (orderingIndex != null)
{
for (Integer orderingColumnIndex : orderingIndex.values())
jsonRow.add(outputRow.get(orderingColumnIndex));
}
outputRow = jsonRow;
}
return outputRow;
}
}
private static interface Selectors
{
public boolean isAggregate();
/**
* Adds the current row of the specified <code>ResultSetBuilder</code>.
*
* @param protocolVersion
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException
*/
public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion) throws InvalidRequestException;
public void reset();
}
// Special cased selection for when only columns are selected.
private static class SimpleSelection extends Selection
{
private final boolean isWildcard;
public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> columns, boolean isWildcard)
{
this(cfm, columns, SelectionColumnMapping.simpleMapping(columns), isWildcard);
}
public SimpleSelection(CFMetaData cfm,
List<ColumnDefinition> columns,
SelectionColumnMapping metadata,
boolean isWildcard)
{
/*
* In theory, even a simple selection could have multiple time the same column, so we
* could filter those duplicate out of columns. But since we're very unlikely to
* get much duplicate in practice, it's more efficient not to bother.
*/
super(cfm, columns, metadata, false, false);
this.isWildcard = isWildcard;
}
@Override
public boolean isWildcard()
{
return isWildcard;
}
public boolean isAggregate()
{
return false;
}
protected Selectors newSelectors(QueryOptions options)
{
return new Selectors()
{
private List<ByteBuffer> current;
public void reset()
{
current = null;
}
public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion)
{
return current;
}
public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
current = rs.current;
}
public boolean isAggregate()
{
return false;
}
};
}
}
private static class SelectionWithProcessing extends Selection
{
private final SelectorFactories factories;
public SelectionWithProcessing(CFMetaData cfm,
List<ColumnDefinition> columns,
SelectionColumnMapping metadata,
SelectorFactories factories) throws InvalidRequestException
{
super(cfm,
columns,
metadata,
factories.containsWritetimeSelectorFactory(),
factories.containsTTLSelectorFactory());
this.factories = factories;
}
@Override
public void addFunctionsTo(List<Function> functions)
{
factories.addFunctionsTo(functions);
}
@Override
public int getResultSetIndex(ColumnDefinition c)
{
int index = getColumnIndex(c);
if (index < 0)
return -1;
for (int i = 0, m = factories.size(); i < m; i++)
if (factories.get(i).isSimpleSelectorFactory(index))
return i;
return -1;
}
@Override
protected int addOrderingColumn(ColumnDefinition c)
{
int index = super.addOrderingColumn(c);
factories.addSelectorForOrdering(c, index);
return factories.size() - 1;
}
public boolean isAggregate()
{
return factories.doesAggregation();
}
protected Selectors newSelectors(final QueryOptions options) throws InvalidRequestException
{
return new Selectors()
{
private final List<Selector> selectors = factories.newInstances(options);
public void reset()
{
for (Selector selector : selectors)
selector.reset();
}
public boolean isAggregate()
{
return factories.doesAggregation();
}
public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion) throws InvalidRequestException
{
List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
for (Selector selector: selectors)
outputRow.add(selector.getOutput(protocolVersion));
return outputRow;
}
public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
for (Selector selector : selectors)
selector.addInput(protocolVersion, rs);
}
};
}
}
}