blob: f5218039f8b31b28849dd3e1ae9edf5bd79b2c82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TupleUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
/**
* Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY clause)
*
* @since 0.1
*/
public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver implements RegionCoprocessor {
private static final Logger LOGGER = LoggerFactory
.getLogger(GroupedAggregateRegionObserver.class);
public static final int MIN_DISTINCT_VALUES = 100;
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
/**
* Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list
* of expressions from the scan and returns the aggregated rows of each group. For example,
* given the following original rows in the RegionScanner: KEY COL1 row1 a row2 b row3 a row4 a
* the following rows will be returned for COUNT(*): KEY COUNT a 3 b 1 The client is required to
* do a sort and a final aggregation, since multiple rows with the same key may be returned from
* different regions.
*/
@Override
protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
boolean keyOrdered = false;
byte[] expressionBytes = scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
expressionBytes = scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
keyOrdered = true;
}
int offset = 0;
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
if (ScanUtil.isLocalIndex(scan)) {
/*
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
Region region = c.getEnvironment().getRegion();
offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
region.getRegionInfo().getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes, 0);
final TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), ScanUtil.getTenantId(scan));
try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
ServerAggregators aggregators =
ServerAggregators.deserialize(scan
.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
.getEnvironment().getConfiguration(), em);
RegionScanner innerScanner = s;
boolean useProto = false;
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
useProto = localIndexBytes != null;
if (localIndexBytes == null) {
localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
}
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
}
ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
innerScanner =
getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
if (j != null) {
innerScanner =
new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
}
long limit = Long.MAX_VALUE;
byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
if (limitBytes != null) {
limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
}
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
} else { // Otherwse, collect them all up in an in memory map
return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
}
}
}
public static long sizeOfUnorderedGroupByMap(int nRows, int valueSize) {
return SizedUtil.sizeOfMap(nRows, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, valueSize);
}
public static void serializeIntoScan(Scan scan, String attribName,
List<Expression> groupByExpressions) {
ByteArrayOutputStream stream =
new ByteArrayOutputStream(Math.max(1, groupByExpressions.size() * 10));
try {
if (groupByExpressions.isEmpty()) { // FIXME ?
stream.write(QueryConstants.TRUE);
} else {
DataOutputStream output = new DataOutputStream(stream);
for (Expression expression : groupByExpressions) {
WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
expression.write(output);
}
}
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
} finally {
try {
stream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
scan.setAttribute(attribName, stream.toByteArray());
}
private List<Expression> deserializeGroupByExpressions(byte[] expressionBytes, int offset)
throws IOException {
List<Expression> expressions = new ArrayList<Expression>(3);
ByteArrayInputStream stream = new ByteArrayInputStream(expressionBytes);
try {
DataInputStream input = new DataInputStream(stream);
while (true) {
try {
int expressionOrdinal = WritableUtils.readVInt(input);
Expression expression =
ExpressionType.values()[expressionOrdinal].newInstance();
expression.readFields(input);
if (offset != 0) {
IndexUtil.setRowKeyExpressionOffset(expression, offset);
}
expressions.add(expression);
} catch (EOFException e) {
break;
}
}
} finally {
stream.close();
}
return expressions;
}
/**
*
* Cache for distinct values and their aggregations which is completely
* in-memory (as opposed to spilling to disk). Used when GROUPBY_SPILLABLE_ATTRIB
* is set to false. The memory usage is tracked at a coursed grain and will
* throw and abort if too much is used.
*
*
* @since 3.0.0
*/
private static final class InMemoryGroupByCache implements GroupByCache {
private final MemoryChunk chunk;
private final Map<ImmutableBytesPtr, Aggregator[]> aggregateMap;
private final ServerAggregators aggregators;
private final RegionCoprocessorEnvironment env;
private final byte[] customAnnotations;
private int estDistVals;
InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
int estValueSize = aggregators.getEstimatedByteSize();
long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize);
TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
this.env = env;
this.estDistVals = estDistVals;
this.aggregators = aggregators;
this.aggregateMap = Maps.newHashMapWithExpectedSize(estDistVals);
this.chunk = tenantCache.getMemoryManager().allocate(estSize);
this.customAnnotations = customAnnotations;
}
@Override
public void close() throws IOException {
this.chunk.close();
}
@Override
public Aggregator[] cache(ImmutableBytesPtr cacheKey) {
ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
Aggregator[] rowAggregators = aggregateMap.get(key);
if (rowAggregators == null) {
// If Aggregators not found for this distinct
// value, clone our original one (we need one
// per distinct value)
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations("Adding new aggregate bucket for row key "
+ Bytes.toStringBinary(key.get(), key.getOffset(),
key.getLength()), customAnnotations));
}
rowAggregators =
aggregators.newAggregators(env.getConfiguration());
aggregateMap.put(key, rowAggregators);
if (aggregateMap.size() > estDistVals) { // increase allocation
estDistVals *= 1.5f;
long estSize = sizeOfUnorderedGroupByMap(estDistVals, aggregators.getEstimatedByteSize());
chunk.resize(estSize);
}
}
return rowAggregators;
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
// Compute final allocation
long estSize = sizeOfUnorderedGroupByMap(aggregateMap.size(), aggregators.getEstimatedByteSize());
chunk.resize(estSize);
final List<Cell> aggResults = new ArrayList<Cell>(aggregateMap.size());
final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter =
aggregateMap.entrySet().iterator();
while (cacheIter.hasNext()) {
Map.Entry<ImmutableBytesPtr, Aggregator[]> entry = cacheIter.next();
ImmutableBytesPtr key = entry.getKey();
Aggregator[] rowAggregators = entry.getValue();
// Generate byte array of Aggregators and set as value of row
byte[] value = aggregators.toBytes(rowAggregators);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength())
+ " with aggregators " + Arrays.asList(rowAggregators).toString()
+ " value = " + Bytes.toStringBinary(value), customAnnotations));
}
Cell keyValue =
PhoenixKeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(),
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0,
value.length);
aggResults.add(keyValue);
}
// scanner using the non spillable, memory-only implementation
return new BaseRegionScanner(s) {
private int index = 0;
@Override
public void close() throws IOException {
try {
s.close();
} finally {
InMemoryGroupByCache.this.close();
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (index >= aggResults.size()) {
return false;
}
results.add(aggResults.get(index));
index++;
return index < aggResults.size();
}
};
}
@Override
public long size() {
return aggregateMap.size();
}
}
private static final class GroupByCacheFactory {
public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory();
private GroupByCacheFactory() {
}
GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
Configuration conf = env.getConfiguration();
boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
if (spillableEnabled) {
return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals);
}
return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals);
}
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators,
ScanUtil.getCustomAnnotations(scan)));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Spillable groupby enabled: " + spillableEnabled,
ScanUtil.getCustomAnnotations(scan)));
}
Region region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = scanner.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesPtr key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore && groupByCache.size() < limit);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(scanner);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
* @param limit TODO
* @throws IOException
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, final long limit) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators,
ScanUtil.getCustomAnnotations(scan)));
}
final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesPtr currentKey = null;
@Override
public boolean next(List<Cell> results) throws IOException {
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesPtr key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
// If we're calculating no aggregate functions, we can exit at the
// start of a new row. Otherwise, we have to wait until an agg
int countOffset = rowAggregators.length == 0 ? 1 : 0;
Region region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
// are more values after the
// ones returned
hasMore = scanner.nextRaw(kvs);
if (!kvs.isEmpty()) {
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
if (!aggBoundary) {
aggregators.aggregate(rowAggregators, result);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Row passed filters: " + kvs
+ ", aggregated values: "
+ Arrays.asList(rowAggregators),
ScanUtil.getCustomAnnotations(scan)));
}
currentKey = key;
}
}
atLimit = rowCount + countOffset >= limit;
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
} while (hasMore && !aggBoundary && !atLimit);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
if (currentKey != null) {
byte[] value = aggregators.toBytes(rowAggregators);
Cell keyValue =
PhoenixKeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
results.add(keyValue);
// If we're at an aggregation boundary, reset the
// aggregators and
// aggregate with the current result (which is not a part of
// the returned result).
if (aggBoundary) {
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
currentKey = key;
rowCount++;
atLimit |= rowCount >= limit;
}
}
// Continue if there are more
if (!atLimit && (hasMore || aggBoundary)) {
return true;
}
currentKey = null;
return false;
}
};
}
@Override
protected boolean isRegionObserverFor(Scan scan) {
return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null ||
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null;
}
}