blob: d4df198ba77c31301b7a0d0fa5e9179e9c346cce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.iterate;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TupleUtil;
/**
*
* This class implements client-side hash aggregation in memory.
* Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
*
*/
public class ClientHashAggregatingResultIterator
implements AggregatingResultIterator {
private static final int HASH_AGG_INIT_SIZE = 64*1024;
private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
private final ResultIterator resultIterator;
private final Aggregators aggregators;
private final List<Expression> groupByExpressions;
private final OrderBy orderBy;
private final MemoryChunk memoryChunk;
private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
private List<ImmutableBytesWritable> keyList;
private Iterator<ImmutableBytesWritable> keyIterator;
public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator,
Aggregators aggregators, List<Expression> groupByExpressions, OrderBy orderBy) {
Objects.requireNonNull(resultIterator);
Objects.requireNonNull(aggregators);
Objects.requireNonNull(groupByExpressions);
this.resultIterator = resultIterator;
this.aggregators = aggregators;
this.groupByExpressions = groupByExpressions;
this.orderBy = orderBy;
memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
}
@Override
public Tuple next() throws SQLException {
if (keyIterator == null) {
hash = populateHash();
/********
*
* Perform a post-aggregation sort only when required. There are 3 possible scenarios:
* (1) The query DOES NOT have an ORDER BY -- in this case, we DO NOT perform a sort, and the results will be in random order.
* (2) The query DOES have an ORDER BY, the ORDER BY keys match the GROUP BY keys, and all the ORDER BY keys are ASCENDING
* -- in this case, we DO perform a sort. THE ORDER BY has been optimized away, because the non-hash client aggregation
* generates results in ascending order of the GROUP BY keys.
* (3) The query DOES have an ORDER BY, but the ORDER BY keys do not match the GROUP BY keys, or at least one ORDER BY key is DESCENDING
* -- in this case, we DO NOT perform a sort, because the ORDER BY has not been optimized away and will be performed later by the
* client aggregation code.
*
* Finally, we also handle optimization of reverse sort here. This is currently defensive, because reverse sort is not optimized away.
*
********/
if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) {
keyList = sortKeys();
keyIterator = keyList.iterator();
} else {
keyIterator = hash.keySet().iterator();
}
}
if (!keyIterator.hasNext()) {
return null;
}
ImmutableBytesWritable key = keyIterator.next();
Aggregator[] rowAggregators = hash.get(key);
byte[] value = aggregators.toBytes(rowAggregators);
Tuple tuple = wrapKeyValueAsResult(PhoenixKeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return tuple;
}
@Override
public void close() throws SQLException {
keyIterator = null;
keyList = null;
hash = null;
try {
memoryChunk.close();
} finally {
resultIterator.close();
}
}
@Override
public Aggregator[] aggregate(Tuple result) {
Aggregator[] rowAggregators = aggregators.getAggregators();
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
return rowAggregators;
}
@Override
public void explain(List<String> planSteps) {
resultIterator.explain(planSteps);
}
@Override
public String toString() {
return "ClientHashAggregatingResultIterator [resultIterator="
+ resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
+ groupByExpressions + "]";
}
// Copied from ClientGroupedAggregatingResultIterator
protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
try {
ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
ptr.set(key.get(), key.getOffset(), key.getLength());
return ptr;
} catch (IOException e) {
throw new SQLException(e);
}
}
// Copied from ClientGroupedAggregatingResultIterator
protected Tuple wrapKeyValueAsResult(Cell keyValue) {
return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
}
private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
final int aggSize = aggregators.getEstimatedByteSize();
long keySize = 0;
for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
key = getGroupingKey(result, key);
Aggregator[] rowAggregators = hash.get(key);
if (rowAggregators == null) {
keySize += key.getSize();
long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, aggSize) + keySize;
if (hashSize > memoryChunk.getSize() + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE) {
// This will throw InsufficientMemoryException if necessary
memoryChunk.resize(hashSize + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
}
rowAggregators = aggregators.newAggregators();
hash.put(key, rowAggregators);
}
aggregators.aggregate(rowAggregators, result);
}
return hash;
}
private List<ImmutableBytesWritable> sortKeys() {
// This will throw InsufficientMemoryException if necessary
memoryChunk.resize(memoryChunk.getSize() + SizedUtil.sizeOfArrayList(hash.size()));
keyList = new ArrayList<ImmutableBytesWritable>(hash.size());
keyList.addAll(hash.keySet());
Comparator<ImmutableBytesWritable> comp = new ImmutableBytesWritable.Comparator();
if (orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) {
comp = Collections.reverseOrder(comp);
}
Collections.sort(keyList, comp);
return keyList;
}
}