/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.iterate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndex;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.List;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;

/**
 * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause.
 * <p>
 * Note that currently the sort is entirely done in memory. 
 *  
 * 
 * @since 0.1
 */
public class OrderedResultIterator implements PeekingResultIterator {

    /** A container that holds pointers to a {@link Result} and its sort keys. */
    protected static class ResultEntry {
        protected final ImmutableBytesWritable[] sortKeys;
        protected final Tuple result;

        ResultEntry(ImmutableBytesWritable[] sortKeys, Tuple result) {
            this.sortKeys = sortKeys;
            this.result = result;
        }
        
        ImmutableBytesWritable getSortKey(int index) {
            checkPositionIndex(index, sortKeys.length);
            return sortKeys[index];
        }
        
        Tuple getResult() {
            return result;
        }
    }
    
    /** A function that returns Nth key for a given {@link ResultEntry}. */
    private static class NthKey implements Function<ResultEntry, ImmutableBytesWritable> {
        private final int index;

        NthKey(int index) {
            this.index = index;
        }
        @Override
        public ImmutableBytesWritable apply(ResultEntry entry) {
            return entry.getSortKey(index);
        }
    }

    /** Returns the expression of a given {@link OrderByExpression}. */
    private static final Function<OrderByExpression, Expression> TO_EXPRESSION = new Function<OrderByExpression, Expression>() {
        @Override
        public Expression apply(OrderByExpression column) {
            return column.getExpression();
        }
    };

    private final int thresholdBytes;
    private final Integer limit;
    private final Integer offset;
    private final ResultIterator delegate;
    private final List<OrderByExpression> orderByExpressions;
    private final long estimatedByteSize;
    
    private PeekingResultIterator resultIterator;
    private long byteSize;

    protected ResultIterator getDelegate() {
        return delegate;
    }
    
    public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
            int thresholdBytes, Integer limit, Integer offset) {
        this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0);
    }

    public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
            int thresholdBytes) throws SQLException {
        this(delegate, orderByExpressions, thresholdBytes, null, null);
    }

    public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, 
            int thresholdBytes, Integer limit, Integer offset,int estimatedRowSize) {
        checkArgument(!orderByExpressions.isEmpty());
        this.delegate = delegate;
        this.orderByExpressions = orderByExpressions;
        this.thresholdBytes = thresholdBytes;
        this.offset = offset == null ? 0 : offset;
        if (limit != null) {
            this.limit = limit + this.offset;
        } else {
            this.limit = null;
        }
        long estimatedEntrySize =
            // ResultEntry
            SizedUtil.OBJECT_SIZE + 
            // ImmutableBytesWritable[]
            SizedUtil.ARRAY_SIZE + orderByExpressions.size() * SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE +
            // Tuple
            SizedUtil.OBJECT_SIZE + estimatedRowSize;

        // Make sure we don't overflow Long, though this is really unlikely to happen.
        assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + this.offset);

        this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * estimatedEntrySize;
    }

    public Integer getLimit() {
        return limit;
    }

    public long getEstimatedByteSize() {
        return estimatedByteSize;
    }

    public long getByteSize() {
        return byteSize;
    }
    /**
     * Builds a comparator from the list of columns in ORDER BY clause.
     * @param orderByExpressions the columns in ORDER BY clause.
     * @return the comparator built from the list of columns in ORDER BY clause.
     */
    // ImmutableBytesWritable.Comparator doesn't implement generics
    @SuppressWarnings("unchecked")
    private static Comparator<ResultEntry> buildComparator(List<OrderByExpression> orderByExpressions) {
        Ordering<ResultEntry> ordering = null;
        int pos = 0;
        for (OrderByExpression col : orderByExpressions) {
            Expression e = col.getExpression();
            Comparator<ImmutableBytesWritable> comparator = 
                    e.getSortOrder() == SortOrder.DESC && !e.getDataType().isFixedWidth() 
                    ? buildDescVarLengthComparator() 
                    : new ImmutableBytesWritable.Comparator();
            Ordering<ImmutableBytesWritable> o = Ordering.from(comparator);
            if(!col.isAscending()) o = o.reverse();
            o = col.isNullsLast() ? o.nullsLast() : o.nullsFirst();
            Ordering<ResultEntry> entryOrdering = o.onResultOf(new NthKey(pos++));
            ordering = ordering == null ? entryOrdering : ordering.compound(entryOrdering);
        }
        return ordering;
    }

    /*
     * Same as regular comparator, but if all the bytes match and the length is
     * different, returns the longer length as bigger.
     */
    private static Comparator<ImmutableBytesWritable> buildDescVarLengthComparator() {
        return new Comparator<ImmutableBytesWritable>() {

            @Override
            public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2) {
                return DescVarLengthFastByteComparisons.compareTo(
                        o1.get(), o1.getOffset(), o1.getLength(),
                        o2.get(), o2.getOffset(), o2.getLength());
            }
            
        };
    }
    
    @Override
    public Tuple next() throws SQLException {
        return getResultIterator().next();
    }
    
    private PeekingResultIterator getResultIterator() throws SQLException {
        if (resultIterator != null) {
            return resultIterator;
        }
        
        final int numSortKeys = orderByExpressions.size();
        List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
        final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
        try{
            final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit,
                    thresholdBytes);
            resultIterator = new PeekingResultIterator() {
                int count = 0;

                @Override
                public Tuple next() throws SQLException {
                    ResultEntry entry = queueEntries.poll();
                    while (entry != null && offset != null && count < offset) {
                        count++;
                        if (entry.getResult() == null) { return null; }
                        entry = queueEntries.poll();
                    }
                    if (entry == null || (limit != null && count++ > limit)) {
                        resultIterator.close();
                        resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
                        return null;
                    }
                    return entry.getResult();
                }
                
                @Override
                public Tuple peek() throws SQLException {
                    ResultEntry entry = queueEntries.peek();
                    while (entry != null && offset != null && count < offset) {
                        entry = queueEntries.poll();
                        count++;
                        if (entry == null) { return null; }
                    }
                    if (limit != null && count > limit) { return null; }
                    entry = queueEntries.peek();
                    if (entry == null) { return null; }
                    return entry.getResult();
                }

                @Override
                public void explain(List<String> planSteps) {
                }
                
                @Override
                public void close() throws SQLException {
                    queueEntries.close();
                }
            };
            for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
                int pos = 0;
                ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys];
                for (Expression expression : expressions) {
                    final ImmutableBytesWritable sortKey = new ImmutableBytesWritable();
                    boolean evaluated = expression.evaluate(result, sortKey);
                    // set the sort key that failed to get evaluated with null
                    sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null;
                }
                queueEntries.add(new ResultEntry(sortKeys, result));
            }
            this.byteSize = queueEntries.getByteSize();
        } catch (IOException e) {
            ServerUtil.createIOException(e.getMessage(), e);
        } finally {
            delegate.close();
        }
        
        return resultIterator;
    }

    @Override
    public Tuple peek() throws SQLException {
        return getResultIterator().peek();
    }

    @Override
    public void close() throws SQLException {
        // Guard against resultIterator being null
        if (null != resultIterator) {
            resultIterator.close();
        }
        resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
    }


    @Override
    public void explain(List<String> planSteps) {
        delegate.explain(planSteps);
        planSteps.add("CLIENT" + (offset != null ? "" : " OFFSET " + offset)
                + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY "
                + orderByExpressions.toString());
    }

    @Override
    public String toString() {
        return "OrderedResultIterator [thresholdBytes=" + thresholdBytes
                + ", limit=" + limit + ", offset=" + offset + ", delegate=" + delegate
                + ", orderByExpressions=" + orderByExpressions
                + ", estimatedByteSize=" + estimatedByteSize
                + ", resultIterator=" + resultIterator + ", byteSize="
                + byteSize + "]";
    }
}
