| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.iterate; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkPositionIndex; |
| |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.Comparator; |
| import java.util.List; |
| |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; |
| import org.apache.phoenix.expression.Expression; |
| import org.apache.phoenix.expression.OrderByExpression; |
| import org.apache.phoenix.schema.SortOrder; |
| import org.apache.phoenix.schema.tuple.Tuple; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.apache.phoenix.util.SizedUtil; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.Collections2; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Ordering; |
| |
| /** |
| * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause. |
| * <p> |
| * Note that currently the sort is entirely done in memory. |
| * |
| * |
| * @since 0.1 |
| */ |
| public class OrderedResultIterator implements PeekingResultIterator { |
| |
| /** A container that holds pointers to a {@link Result} and its sort keys. */ |
| protected static class ResultEntry { |
| protected final ImmutableBytesWritable[] sortKeys; |
| protected final Tuple result; |
| |
| ResultEntry(ImmutableBytesWritable[] sortKeys, Tuple result) { |
| this.sortKeys = sortKeys; |
| this.result = result; |
| } |
| |
| ImmutableBytesWritable getSortKey(int index) { |
| checkPositionIndex(index, sortKeys.length); |
| return sortKeys[index]; |
| } |
| |
| Tuple getResult() { |
| return result; |
| } |
| } |
| |
| /** A function that returns Nth key for a given {@link ResultEntry}. */ |
| private static class NthKey implements Function<ResultEntry, ImmutableBytesWritable> { |
| private final int index; |
| |
| NthKey(int index) { |
| this.index = index; |
| } |
| @Override |
| public ImmutableBytesWritable apply(ResultEntry entry) { |
| return entry.getSortKey(index); |
| } |
| } |
| |
| /** Returns the expression of a given {@link OrderByExpression}. */ |
| private static final Function<OrderByExpression, Expression> TO_EXPRESSION = new Function<OrderByExpression, Expression>() { |
| @Override |
| public Expression apply(OrderByExpression column) { |
| return column.getExpression(); |
| } |
| }; |
| |
| private final int thresholdBytes; |
| private final Integer limit; |
| private final Integer offset; |
| private final ResultIterator delegate; |
| private final List<OrderByExpression> orderByExpressions; |
| private final long estimatedByteSize; |
| |
| private PeekingResultIterator resultIterator; |
| private long byteSize; |
| |
| protected ResultIterator getDelegate() { |
| return delegate; |
| } |
| |
| public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, |
| int thresholdBytes, Integer limit, Integer offset) { |
| this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0); |
| } |
| |
| public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, |
| int thresholdBytes) throws SQLException { |
| this(delegate, orderByExpressions, thresholdBytes, null, null); |
| } |
| |
| public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, |
| int thresholdBytes, Integer limit, Integer offset,int estimatedRowSize) { |
| checkArgument(!orderByExpressions.isEmpty()); |
| this.delegate = delegate; |
| this.orderByExpressions = orderByExpressions; |
| this.thresholdBytes = thresholdBytes; |
| this.offset = offset == null ? 0 : offset; |
| if (limit != null) { |
| this.limit = limit + this.offset; |
| } else { |
| this.limit = null; |
| } |
| long estimatedEntrySize = |
| // ResultEntry |
| SizedUtil.OBJECT_SIZE + |
| // ImmutableBytesWritable[] |
| SizedUtil.ARRAY_SIZE + orderByExpressions.size() * SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE + |
| // Tuple |
| SizedUtil.OBJECT_SIZE + estimatedRowSize; |
| |
| // Make sure we don't overflow Long, though this is really unlikely to happen. |
| assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + this.offset); |
| |
| this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * estimatedEntrySize; |
| } |
| |
| public Integer getLimit() { |
| return limit; |
| } |
| |
| public long getEstimatedByteSize() { |
| return estimatedByteSize; |
| } |
| |
| public long getByteSize() { |
| return byteSize; |
| } |
| /** |
| * Builds a comparator from the list of columns in ORDER BY clause. |
| * @param orderByExpressions the columns in ORDER BY clause. |
| * @return the comparator built from the list of columns in ORDER BY clause. |
| */ |
| // ImmutableBytesWritable.Comparator doesn't implement generics |
| @SuppressWarnings("unchecked") |
| private static Comparator<ResultEntry> buildComparator(List<OrderByExpression> orderByExpressions) { |
| Ordering<ResultEntry> ordering = null; |
| int pos = 0; |
| for (OrderByExpression col : orderByExpressions) { |
| Expression e = col.getExpression(); |
| Comparator<ImmutableBytesWritable> comparator = |
| e.getSortOrder() == SortOrder.DESC && !e.getDataType().isFixedWidth() |
| ? buildDescVarLengthComparator() |
| : new ImmutableBytesWritable.Comparator(); |
| Ordering<ImmutableBytesWritable> o = Ordering.from(comparator); |
| if(!col.isAscending()) o = o.reverse(); |
| o = col.isNullsLast() ? o.nullsLast() : o.nullsFirst(); |
| Ordering<ResultEntry> entryOrdering = o.onResultOf(new NthKey(pos++)); |
| ordering = ordering == null ? entryOrdering : ordering.compound(entryOrdering); |
| } |
| return ordering; |
| } |
| |
| /* |
| * Same as regular comparator, but if all the bytes match and the length is |
| * different, returns the longer length as bigger. |
| */ |
| private static Comparator<ImmutableBytesWritable> buildDescVarLengthComparator() { |
| return new Comparator<ImmutableBytesWritable>() { |
| |
| @Override |
| public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2) { |
| return DescVarLengthFastByteComparisons.compareTo( |
| o1.get(), o1.getOffset(), o1.getLength(), |
| o2.get(), o2.getOffset(), o2.getLength()); |
| } |
| |
| }; |
| } |
| |
| @Override |
| public Tuple next() throws SQLException { |
| return getResultIterator().next(); |
| } |
| |
| private PeekingResultIterator getResultIterator() throws SQLException { |
| if (resultIterator != null) { |
| return resultIterator; |
| } |
| |
| final int numSortKeys = orderByExpressions.size(); |
| List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION)); |
| final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions); |
| try{ |
| final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit, |
| thresholdBytes); |
| resultIterator = new PeekingResultIterator() { |
| int count = 0; |
| |
| @Override |
| public Tuple next() throws SQLException { |
| ResultEntry entry = queueEntries.poll(); |
| while (entry != null && offset != null && count < offset) { |
| count++; |
| if (entry.getResult() == null) { return null; } |
| entry = queueEntries.poll(); |
| } |
| if (entry == null || (limit != null && count++ > limit)) { |
| resultIterator.close(); |
| resultIterator = PeekingResultIterator.EMPTY_ITERATOR; |
| return null; |
| } |
| return entry.getResult(); |
| } |
| |
| @Override |
| public Tuple peek() throws SQLException { |
| ResultEntry entry = queueEntries.peek(); |
| while (entry != null && offset != null && count < offset) { |
| entry = queueEntries.poll(); |
| count++; |
| if (entry == null) { return null; } |
| } |
| if (limit != null && count > limit) { return null; } |
| entry = queueEntries.peek(); |
| if (entry == null) { return null; } |
| return entry.getResult(); |
| } |
| |
| @Override |
| public void explain(List<String> planSteps) { |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| queueEntries.close(); |
| } |
| }; |
| for (Tuple result = delegate.next(); result != null; result = delegate.next()) { |
| int pos = 0; |
| ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys]; |
| for (Expression expression : expressions) { |
| final ImmutableBytesWritable sortKey = new ImmutableBytesWritable(); |
| boolean evaluated = expression.evaluate(result, sortKey); |
| // set the sort key that failed to get evaluated with null |
| sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null; |
| } |
| queueEntries.add(new ResultEntry(sortKeys, result)); |
| } |
| this.byteSize = queueEntries.getByteSize(); |
| } catch (IOException e) { |
| ServerUtil.createIOException(e.getMessage(), e); |
| } finally { |
| delegate.close(); |
| } |
| |
| return resultIterator; |
| } |
| |
| @Override |
| public Tuple peek() throws SQLException { |
| return getResultIterator().peek(); |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| // Guard against resultIterator being null |
| if (null != resultIterator) { |
| resultIterator.close(); |
| } |
| resultIterator = PeekingResultIterator.EMPTY_ITERATOR; |
| } |
| |
| |
| @Override |
| public void explain(List<String> planSteps) { |
| delegate.explain(planSteps); |
| planSteps.add("CLIENT" + (offset != null ? "" : " OFFSET " + offset) |
| + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " |
| + orderByExpressions.toString()); |
| } |
| |
| @Override |
| public String toString() { |
| return "OrderedResultIterator [thresholdBytes=" + thresholdBytes |
| + ", limit=" + limit + ", offset=" + offset + ", delegate=" + delegate |
| + ", orderByExpressions=" + orderByExpressions |
| + ", estimatedByteSize=" + estimatedByteSize |
| + ", resultIterator=" + resultIterator + ", byteSize=" |
| + byteSize + "]"; |
| } |
| } |