/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.lifecycle;

import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.*;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;

import static com.google.common.base.Predicates.equalTo;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
import static org.apache.cassandra.db.lifecycle.Helpers.replace;

/**
 * An immutable structure holding the current memtable, the memtables pending
 * flush, the sstables for a column family, and the sstables that are active
 * in compaction (a subset of the sstables).
 *
 * Modifications to instances are all performed via a Function produced by the static methods in this class.
 * These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or
 * accept and apply the changes to the View.
 *
 */
public class View
{
    /**
     * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
     * and the new replacement memtable, until all outstanding write operations on the old table complete.
     * The last item in the list is always the "current" memtable.
     */
    public final List<Memtable> liveMemtables;
    /**
     * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
     * flushed. In chronologically ascending order.
     */
    public final List<Memtable> flushingMemtables;
    final Set<SSTableReader> compacting;
    final Set<SSTableReader> sstables;
    final Set<SSTableReader> premature;
    // we use a Map here so that we can easily perform identity checks as well as equality checks.
    // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
    // and we then check that not only are they all present in the live set, but that the exact instance present is
    // the one we made our decision to compact against.
    final Map<SSTableReader, SSTableReader> sstablesMap;
    final Map<SSTableReader, SSTableReader> compactingMap;

    final SSTableIntervalTree intervalTree;

    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
    {
        assert liveMemtables != null;
        assert flushingMemtables != null;
        assert sstables != null;
        assert compacting != null;
        assert intervalTree != null;

        this.liveMemtables = liveMemtables;
        this.flushingMemtables = flushingMemtables;

        this.sstablesMap = sstables;
        this.sstables = sstablesMap.keySet();
        this.compactingMap = compacting;
        this.compacting = compactingMap.keySet();
        this.premature = premature;
        this.intervalTree = intervalTree;
    }

    public Memtable getCurrentMemtable()
    {
        return liveMemtables.get(liveMemtables.size() - 1);
    }

    /**
     * @return the active memtable and all the memtables that are pending flush.
     */
    public Iterable<Memtable> getAllMemtables()
    {
        return concat(flushingMemtables, liveMemtables);
    }

    // shortcut for all live sstables, so can efficiently use it for size, etc
    public Set<SSTableReader> liveSSTables()
    {
        return sstables;
    }

    public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter)
    {
        return filter(select(sstableSet), filter);
    }

    // any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug
    // (strong leak detection), and it does not follow the normal pattern
    @VisibleForTesting
    public Iterable<SSTableReader> allKnownSSTables()
    {
        return Iterables.concat(sstables, filterOut(compacting, sstables));
    }

    public Iterable<SSTableReader> select(SSTableSet sstableSet)
    {
        switch (sstableSet)
        {
            case LIVE:
                return sstables;
            case NONCOMPACTING:
                return filter(sstables, (s) -> !compacting.contains(s));
            case CANONICAL:
                Set<SSTableReader> canonicalSSTables = new HashSet<>();
                for (SSTableReader sstable : compacting)
                    if (sstable.openReason != SSTableReader.OpenReason.EARLY)
                        canonicalSSTables.add(sstable);
                // reason for checking if compacting contains the sstable is that if compacting has an EARLY version
                // of a NORMAL sstable, we still have the canonical version of that sstable in sstables.
                // note that the EARLY version is equal, but not == since it is a different instance of the same sstable.
                for (SSTableReader sstable : sstables)
                    if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY)
                        canonicalSSTables.add(sstable);

                return canonicalSSTables;
            default:
                throw new IllegalStateException();
        }
    }

    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
    {
        return filter(candidates, new Predicate<SSTableReader>()
        {
            public boolean apply(SSTableReader sstable)
            {
                return !compacting.contains(sstable);
            }
        });
    }

    public boolean isEmpty()
    {
        return sstables.isEmpty()
               && liveMemtables.size() <= 1
               && flushingMemtables.size() == 0
               && (liveMemtables.size() == 0 || liveMemtables.get(0).getOperations() == 0);
    }

    @Override
    public String toString()
    {
        return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
    }

    /**
     * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
     * The interval formed by {@code left} and {@code right} shouldn't wrap.
     */
    public Iterable<SSTableReader> liveSSTablesInBounds(PartitionPosition left, PartitionPosition right)
    {
        assert !AbstractBounds.strictlyWrapsAround(left, right);

        if (intervalTree.isEmpty())
            return Collections.emptyList();

        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
        return intervalTree.search(Interval.create(left, stopInTree));
    }

    public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
    {
        assert !AbstractBounds.strictlyWrapsAround(left, right);

        if (intervalTree.isEmpty())
            return Collections.emptyList();

        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
        return intervalTree.search(Interval.create(left, stopInTree));
    }

    public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet)
    {
        return (view) -> view.select(sstableSet);
    }

    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
    {
        return (view) -> view.sstables(sstableSet, filter);
    }

    /**
     * @return a ViewFragment containing the sstables and memtables that may need to be merged
     * for the given @param key, according to the interval tree
     */
    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key)
    {
        assert sstableSet == SSTableSet.LIVE;
        return (view) -> view.intervalTree.search(key);
    }

    /**
     * @return a ViewFragment containing the sstables and memtables that may need to be merged
     * for rows within @param rowBounds, inclusive, according to the interval tree.
     */
    public static Function<View, Iterable<SSTableReader>> selectLive(AbstractBounds<PartitionPosition> rowBounds)
    {
        // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
        // because the fact we restrict the sstables returned by this function is an optimization in the first
        // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
        // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
        // instead of exclusively, so the performance impact is negligible in practice.
        return (view) -> view.liveSSTablesInBounds(rowBounds.left, rowBounds.right);
    }

    // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

    // return a function to un/mark the provided readers compacting in a view
    static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark)
    {
        if (unmark.isEmpty() && Iterables.isEmpty(mark))
            return Functions.identity();
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                assert all(mark, Helpers.idIn(view.sstablesMap));
                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
                                replace(view.compactingMap, unmark, mark),
                                view.premature, view.intervalTree);
            }
        };
    }

    // construct a predicate to reject views that do not permit us to mark these readers compacting;
    // i.e. one of them is either already compacting, has been compacted, or has been replaced
    static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
    {
        return new Predicate<View>()
        {
            public boolean apply(View view)
            {
                for (SSTableReader reader : readers)
                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader))
                        return false;
                return true;
            }
        };
    }

    // construct a function to change the liveset in a Snapshot
    static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add)
    {
        if (remove.isEmpty() && Iterables.isEmpty(add))
            return Functions.identity();
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature,
                                SSTableIntervalTree.build(sstableMap.keySet()));
            }
        };
    }

    // called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable
    static Function<View, View> switchMemtable(final Memtable newMemtable)
    {
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
                assert newLive.size() == view.liveMemtables.size() + 1;
                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
            }
        };
    }

    // called before flush: move toFlush from liveMemtables to flushingMemtables
    static Function<View, View> markFlushing(final Memtable toFlush)
    {
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
                List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
                List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
                                                           of(toFlush),
                                                           filter(flushing, not(lessThan(toFlush)))));
                assert newLive.size() == live.size() - 1;
                assert newFlushing.size() == flushing.size() + 1;
                return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
            }
        };
    }

    // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
    static Function<View, View> replaceFlushed(final Memtable memtable, final Iterable<SSTableReader> flushed)
    {
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
                assert flushingMemtables.size() == view.flushingMemtables.size() - 1;

                if (flushed == null || Iterables.isEmpty(flushed))
                    return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
                                    view.compactingMap, view.premature, view.intervalTree);

                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
                Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
                return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
                                SSTableIntervalTree.build(sstableMap.keySet()));
            }
        };
    }

    static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers)
    {
        Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers);
        return new Function<View, View>()
        {
            public View apply(View view)
            {
                Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet());
                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet());
                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree);
            }
        };
    }


    private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
    {
        return new Predicate<T>()
        {
            public boolean apply(T t)
            {
                return t.compareTo(lessThan) < 0;
            }
        };
    }
}
