/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.rows;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.CloseableIterator;

/**
 * A utility class to split the given {@link#UnfilteredRowIterator} into smaller chunks each
 * having at most {@link #throttle} + 1 unfiltereds.
 *
 * Only the first output contains partition level info: {@link UnfilteredRowIterator#partitionLevelDeletion}
 * and {@link UnfilteredRowIterator#staticRow}.
 *
 * Besides splitting, this iterator will also ensure each chunk does not finish with an open tombstone marker,
 * by closing any opened tombstone markers and re-opening on the next chunk.
 *
 * The lifecycle of outputed {{@link UnfilteredRowIterator} only last till next call to {@link #next()}.
 *
 * A subsequent {@link #next} call will exhaust the previously returned iterator before computing the next,
 * effectively skipping unfiltereds up to the throttle size.
 *
 * Closing this iterator will close the underlying iterator.
 *
 */
public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowIterator> implements CloseableIterator<UnfilteredRowIterator>
{
    private final UnfilteredRowIterator origin;
    private final int throttle;

    // internal mutable state
    private UnfilteredRowIterator throttledItr;

    // extra unfiltereds from previous iteration
    private Iterator<Unfiltered> overflowed = Collections.emptyIterator();

    @VisibleForTesting
    ThrottledUnfilteredIterator(UnfilteredRowIterator origin, int throttle)
    {
        assert origin != null;
        assert throttle > 1 : "Throttle size must be higher than 1 to properly support open and close tombstone boundaries.";
        this.origin = origin;
        this.throttle = throttle;
        this.throttledItr = null;
    }

    @Override
    protected UnfilteredRowIterator computeNext()
    {
        // exhaust previous throttled iterator
        while (throttledItr != null && throttledItr.hasNext())
            throttledItr.next();

        // The original UnfilteredRowIterator may have only partition deletion or static column but without unfiltereds.
        // Return the original UnfilteredRowIterator
        if (!origin.hasNext())
        {
            if (throttledItr != null)
                return endOfData();
            return throttledItr = origin;
        }

        throttledItr = new WrappingUnfilteredRowIterator()
        {
            private int count = 0;
            private boolean isFirst = throttledItr == null;

            // current batch's openMarker. if it's generated in previous batch,
            // it must be consumed as first element of current batch
            private RangeTombstoneMarker openMarker;

            // current batch's closeMarker.
            // it must be consumed as last element of current batch
            private RangeTombstoneMarker closeMarker = null;

            @Override
            public UnfilteredRowIterator wrapped()
            {
                return origin;
            }

            @Override
            public boolean hasNext()
            {
                return (withinLimit() && origin.hasNext()) || closeMarker != null;
            }

            @Override
            public Unfiltered next()
            {
                if (closeMarker != null)
                {
                    assert count == throttle;
                    Unfiltered toReturn = closeMarker;
                    closeMarker = null;
                    return toReturn;
                }

                Unfiltered next;
                assert withinLimit();
                // in the beginning of the batch, there might be remaining unfiltereds from previous iteration
                if (overflowed.hasNext())
                    next = overflowed.next();
                else
                    next = origin.next();
                recordNext(next);
                return next;
            }

            private void recordNext(Unfiltered unfiltered)
            {
                count++;
                if (unfiltered.isRangeTombstoneMarker())
                    updateMarker((RangeTombstoneMarker) unfiltered);
                // when reach throttle with a remaining openMarker, we need to create corresponding closeMarker.
                if (count == throttle && openMarker != null)
                {
                    assert origin.hasNext();
                    closeOpenMarker(origin.next());
                }
            }

            private boolean withinLimit()
            {
                return count < throttle;
            }

            private void updateMarker(RangeTombstoneMarker marker)
            {
                openMarker = marker.isOpen(isReverseOrder()) ? marker : null;
            }

            /**
             * There 3 cases for next, 1. if it's boundaryMarker, we split it as closeMarker for current batch, next
             * openMarker for next batch 2. if it's boundMakrer, it must be closeMarker. 3. if it's Row, create
             * corresponding closeMarker for current batch, and create next openMarker for next batch including current
             * Row.
             */
            private void closeOpenMarker(Unfiltered next)
            {
                assert openMarker != null;

                if (next.isRangeTombstoneMarker())
                {
                    RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
                    // if it's boundary, create closeMarker for current batch and openMarker for next batch
                    if (marker.isBoundary())
                    {
                        RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker) marker;
                        closeMarker = boundary.createCorrespondingCloseMarker(isReverseOrder());
                        overflowed = Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator();
                    }
                    else
                    {
                        // if it's bound, it must be closeMarker.
                        assert marker.isClose(isReverseOrder());
                        updateMarker(marker);
                        closeMarker = marker;
                    }
                }
                else
                {
                    // it's Row, need to create closeMarker for current batch and openMarker for next batch
                    DeletionTime openDeletion = openMarker.openDeletionTime(isReverseOrder());
                    closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(), next.clustering(), openDeletion);

                    // for next batch
                    overflowed = Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(),
                                                                                       next.clustering(),
                                                                                       openDeletion), next).iterator();
                }
            }

            @Override
            public DeletionTime partitionLevelDeletion()
            {
                return isFirst ? origin.partitionLevelDeletion() : DeletionTime.LIVE;
            }

            @Override
            public Row staticRow()
            {
                return isFirst ? origin.staticRow() : Rows.EMPTY_STATIC_ROW;
            }

            @Override
            public void close()
            {
                // no op
            }
        };
        return throttledItr;
    }

    public void close()
    {
        if (origin != null)
            origin.close();
    }

    /**
     * Splits a {@link UnfilteredPartitionIterator} in {@link UnfilteredRowIterator} batches with size no higher than
     * <b>maxBatchSize</b>
     *
     * @param partitionIterator
     * @param maxBatchSize max number of unfiltereds in the UnfilteredRowIterator. if 0 is given, it means no throttle.
     * @return
     */
    public static CloseableIterator<UnfilteredRowIterator> throttle(UnfilteredPartitionIterator partitionIterator, int maxBatchSize)
    {
        if (maxBatchSize == 0) // opt out
            return partitionIterator;

        return new AbstractIterator<UnfilteredRowIterator>()
        {
            ThrottledUnfilteredIterator current = null;

            protected UnfilteredRowIterator computeNext()
            {
                if (current != null && !current.hasNext())
                {
                    current.close();
                    current = null;
                }

                if (current == null && partitionIterator.hasNext())
                {
                    current = new ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize);
                }

                if (current != null && current.hasNext())
                    return current.next();

                return endOfData();
            }

            public void close()
            {
                if (current != null)
                    current.close();
            }
        };
    }
}