/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db.lifecycle;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import org.apache.cassandra.utils.concurrent.Transactional.AbstractTransactional.State;

import static com.google.common.base.Predicates.in;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.size;
import static org.apache.cassandra.db.lifecycle.Helpers.idIn;
import static org.apache.cassandra.db.lifecycle.Helpers.orIn;
import static org.apache.cassandra.db.lifecycle.Helpers.select;

public class LifecycleTransactionTest extends AbstractTransactionalTest
{
    private boolean incrementalBackups;

    @BeforeClass
    public static void setUp()
    {
        MockSchema.cleanup();
    }

    @Before
    public void disableIncrementalBackup()
    {
        incrementalBackups = DatabaseDescriptor.isIncrementalBackupsEnabled();
        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
    }
    @After
    public void restoreIncrementalBackup()
    {
        DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackups);
    }

    @Test
    public void testUpdates() // (including obsoletion)
    {
        ColumnFamilyStore cfs = MockSchema.newCFS();
        Tracker tracker = new Tracker(null, false);
        SSTableReader[] readers = readersArray(0, 3, cfs);
        SSTableReader[] readers2 = readersArray(0, 4, cfs);
        SSTableReader[] readers3 = readersArray(0, 4, cfs);
        tracker.addInitialSSTables(copyOf(readers));
        LifecycleTransaction txn = tracker.tryModify(copyOf(readers), OperationType.UNKNOWN);

        txn.update(readers2[0], true);
        txn.obsolete(readers[1]);

        Assert.assertTrue(txn.isObsolete(readers[1]));
        Assert.assertFalse(txn.isObsolete(readers[0]));

        testBadUpdate(txn, readers2[0], true);  // same reader && instances
        testBadUpdate(txn, readers2[1], true);  // staged obsolete; cannot update
        testBadUpdate(txn, readers3[0], true);  // same reader, diff instances
        testBadUpdate(txn, readers2[2], false); // incorrectly declared original status
        testBadUpdate(txn, readers2[3], true); // incorrectly declared original status

        testBadObsolete(txn, readers[1]);  // staged obsolete; cannot obsolete again
        testBadObsolete(txn, readers2[0]);  // staged update; cannot obsolete

        txn.update(readers2[3], false);

        Assert.assertEquals(3, tracker.getView().compacting.size());
        txn.checkpoint();
        Assert.assertTrue(txn.isObsolete(readers[1]));
        Assert.assertFalse(txn.isObsolete(readers[0]));
        Assert.assertEquals(4, tracker.getView().compacting.size());
        Assert.assertEquals(3, tracker.getView().sstables.size());
        Assert.assertEquals(3, size(txn.current()));
        Assert.assertTrue(all(of(readers2[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
        Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));

        testBadObsolete(txn, readers[1]);  // logged obsolete; cannot obsolete again
        testBadObsolete(txn, readers2[2]);  // never seen instance; cannot obsolete
        testBadObsolete(txn, readers2[3]);  // non-original; cannot obsolete
        testBadUpdate(txn, readers3[1], true);  // logged obsolete; cannot update
        testBadUpdate(txn, readers2[0], true);  // same instance as logged update

        txn.update(readers3[0], true);  // same reader as logged update, different instance
        txn.checkpoint();

        Assert.assertEquals(4, tracker.getView().compacting.size());
        Assert.assertEquals(3, tracker.getView().sstables.size());
        Assert.assertEquals(3, size(txn.current()));
        Assert.assertTrue(all(of(readers3[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
        Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));

        testBadObsolete(txn, readers2[0]); // not current version of sstable

        txn.obsoleteOriginals();
        txn.checkpoint();
        Assert.assertEquals(1, tracker.getView().sstables.size());
        txn.obsoleteOriginals(); // should be no-op
        txn.checkpoint();
        Assert.assertEquals(1, tracker.getView().sstables.size());
        Assert.assertEquals(4, tracker.getView().compacting.size());
    }

    @Test
    public void testCancellation()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS();
        Tracker tracker = new Tracker(null, false);
        List<SSTableReader> readers = readers(0, 3, cfs);
        tracker.addInitialSSTables(readers);
        LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);

        SSTableReader cancel = readers.get(0);
        SSTableReader update = readers(1, 2, cfs).get(0);
        SSTableReader fresh = readers(3, 4,cfs).get(0);
        SSTableReader notPresent = readers(4, 5, cfs).get(0);

        txn.cancel(cancel);
        txn.update(update, true);
        txn.update(fresh, false);

        testBadCancel(txn, cancel);
        testBadCancel(txn, update);
        testBadCancel(txn, fresh);
        testBadCancel(txn, notPresent);
        Assert.assertEquals(2, txn.originals().size());
        Assert.assertEquals(2, tracker.getView().compacting.size());
        Assert.assertTrue(all(readers.subList(1, 3), idIn(tracker.getView().compacting)));

        txn.checkpoint();

        testBadCancel(txn, cancel);
        testBadCancel(txn, update);
        testBadCancel(txn, fresh);
        testBadCancel(txn, notPresent);
        Assert.assertEquals(2, txn.originals().size());
        Assert.assertEquals(3, tracker.getView().compacting.size());
        Assert.assertEquals(3, size(txn.current()));
        Assert.assertTrue(all(concat(readers.subList(1, 3), of(fresh)), idIn(tracker.getView().compacting)));

        txn.cancel(readers.get(2));
        Assert.assertEquals(1, txn.originals().size());
        Assert.assertEquals(2, tracker.getView().compacting.size());
        Assert.assertEquals(2, size(txn.current()));
        Assert.assertTrue(all(of(readers.get(1), fresh), idIn(tracker.getView().compacting)));
    }

    @Test
    public void testSplit()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS();
        Tracker tracker = new Tracker(null, false);
        List<SSTableReader> readers = readers(0, 4, cfs);
        tracker.addInitialSSTables(readers);
        LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);
        txn.cancel(readers.get(3));
        LifecycleTransaction txn2 = txn.split(readers.subList(0, 1));
        Assert.assertEquals(2, txn.originals().size());
        Assert.assertTrue(all(readers.subList(1, 3), in(txn.originals())));
        Assert.assertEquals(1, txn2.originals().size());
        Assert.assertTrue(all(readers.subList(0, 1), in(txn2.originals())));
        txn.update(readers(1, 2, cfs).get(0), true);
        boolean failed = false;
        try
        {
            txn.split(readers.subList(2, 3));
        }
        catch (Throwable t)
        {
            failed = true;
        }
        Assert.assertTrue(failed);
    }

    private static void testBadUpdate(LifecycleTransaction txn, SSTableReader update, boolean original)
    {
        boolean failed = false;
        try
        {
            txn.update(update, original);
        }
        catch (Throwable t)
        {
            failed = true;
        }
        Assert.assertTrue(failed);
    }

    private static void testBadObsolete(LifecycleTransaction txn, SSTableReader update)
    {
        boolean failed = false;
        try
        {
            txn.obsolete(update);
        }
        catch (Throwable t)
        {
            failed = true;
        }
        Assert.assertTrue(failed);
    }

    private static void testBadCancel(LifecycleTransaction txn, SSTableReader cancel)
    {
        boolean failed = false;
        try
        {
            txn.cancel(cancel);
        }
        catch (Throwable t)
        {
            failed = true;
        }
        Assert.assertTrue(failed);
    }

    protected TestableTransaction newTest()
    {
        LogTransaction.waitForDeletions();
        SSTableReader.resetTidying();
        return new TxnTest();
    }

    private static final class TxnTest extends TestableTransaction
    {
        final List<SSTableReader> originals;
        final List<SSTableReader> untouchedOriginals;
        final List<SSTableReader> loggedUpdate;
        final List<SSTableReader> loggedObsolete;
        final List<SSTableReader> stagedObsolete;
        final List<SSTableReader> loggedNew;
        final List<SSTableReader> stagedNew;
        final Tracker tracker;
        final LifecycleTransaction txn;

        private static Tracker tracker(ColumnFamilyStore cfs, List<SSTableReader> readers)
        {
            Tracker tracker = new Tracker(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs), false);
            tracker.addInitialSSTables(readers);
            return tracker;
        }

        private TxnTest()
        {
            this(MockSchema.newCFS());
        }

        private TxnTest(ColumnFamilyStore cfs)
        {
            this(cfs, readers(0, 8, cfs));
        }

        private TxnTest(ColumnFamilyStore cfs, List<SSTableReader> readers)
        {
            this(tracker(cfs, readers), readers);
        }

        private TxnTest(Tracker tracker, List<SSTableReader> readers)
        {
            this(tracker, readers, tracker.tryModify(readers, OperationType.UNKNOWN));
        }

        private TxnTest(Tracker tracker, List<SSTableReader> readers, LifecycleTransaction txn)
        {
            super(txn);
            this.tracker = tracker;
            this.originals = readers;
            this.txn = txn;
            update(txn, loggedUpdate = readers(0, 2, tracker.cfstore), true);
            obsolete(txn, loggedObsolete = readers.subList(2, 4));
            update(txn, loggedNew = readers(8, 10, tracker.cfstore), false);
            txn.checkpoint();
            update(txn, stagedNew = readers(10, 12, tracker.cfstore), false);
            obsolete(txn, stagedObsolete = copyOf(concat(loggedUpdate, originals.subList(4, 6))));
            untouchedOriginals = originals.subList(6, 8);
        }

        private ReaderState state(SSTableReader reader, State state)
        {
            SSTableReader original = select(reader, originals);
            boolean isOriginal = original != null;

            switch (state)
            {
                case ABORTED:
                {
                    return new ReaderState(Action.NONE, Action.NONE, original, original, isOriginal);
                }

                case READY_TO_COMMIT:
                {
                    ReaderState prev = state(reader, State.IN_PROGRESS);
                    Action logged;
                    SSTableReader visible;
                    if (prev.staged == Action.NONE)
                    {
                        logged = prev.logged;
                        visible = prev.currentlyVisible;
                    }
                    else
                    {
                        logged = prev.staged;
                        visible = prev.nextVisible;
                    }
                    return new ReaderState(logged, Action.NONE, visible, visible, isOriginal);
                }

                case IN_PROGRESS:
                {
                    Action logged = Action.get(loggedUpdate.contains(reader) || loggedNew.contains(reader), loggedObsolete.contains(reader));
                    Action staged = Action.get(stagedNew.contains(reader), stagedObsolete.contains(reader));
                    SSTableReader currentlyVisible = ReaderState.visible(reader, in(loggedObsolete), loggedNew, loggedUpdate, originals);
                    SSTableReader nextVisible = ReaderState.visible(reader, orIn(stagedObsolete, loggedObsolete), stagedNew, loggedNew, loggedUpdate, originals);
                    return new ReaderState(logged, staged, currentlyVisible, nextVisible, isOriginal);
                }
            }
            throw new IllegalStateException();
        }

        private List<Pair<SSTableReader, ReaderState>> states(State state)
        {
            List<Pair<SSTableReader, ReaderState>> result = new ArrayList<>();
            for (SSTableReader reader : concat(originals, loggedNew, stagedNew))
                result.add(Pair.create(reader, state(reader, state)));
            return result;
        }

        protected void doAssert(State state)
        {
            for (Pair<SSTableReader, ReaderState> pair : states(state))
            {
                SSTableReader reader = pair.left;
                ReaderState readerState = pair.right;

                Assert.assertEquals(readerState, txn.state(reader));
                Assert.assertEquals(readerState.currentlyVisible, tracker.getView().sstablesMap.get(reader));
                if (readerState.currentlyVisible == null && readerState.nextVisible == null && !readerState.original)
                    Assert.assertTrue(reader.selfRef().globalCount() == 0);
            }
        }

        protected void assertInProgress() throws Exception
        {
            doAssert(State.IN_PROGRESS);
        }

        protected void assertPrepared() throws Exception
        {
            doAssert(State.READY_TO_COMMIT);
        }

        protected void assertAborted() throws Exception
        {
            doAssert(State.ABORTED);
            Assert.assertEquals(0, tracker.getView().compacting.size());
            Assert.assertEquals(8, tracker.getView().sstables.size());
            for (SSTableReader reader : concat(loggedNew, stagedNew))
                Assert.assertTrue(reader.selfRef().globalCount() == 0);
        }

        protected void assertCommitted() throws Exception
        {
            doAssert(State.READY_TO_COMMIT);
            Assert.assertEquals(0, tracker.getView().compacting.size());
            Assert.assertEquals(6, tracker.getView().sstables.size());
            for (SSTableReader reader : concat(loggedObsolete, stagedObsolete))
                Assert.assertTrue(reader.selfRef().globalCount() == 0);
        }

        @Override
        protected boolean commitCanThrow()
        {
            return true;
        }
    }

    private static SSTableReader[] readersArray(int lb, int ub, ColumnFamilyStore cfs)
    {
        return readers(lb, ub, cfs).toArray(new SSTableReader[0]);
    }

    private static List<SSTableReader> readers(int lb, int ub, ColumnFamilyStore cfs)
    {
        List<SSTableReader> readers = new ArrayList<>();
        for (int i = lb ; i < ub ; i++)
            readers.add(MockSchema.sstable(i, i, true, cfs));
        return copyOf(readers);
    }

    private static void update(LifecycleTransaction txn, Iterable<SSTableReader> readers, boolean originals)
    {
        for (SSTableReader reader : readers)
            txn.update(reader, originals);
    }

    private static void obsolete(LifecycleTransaction txn, Iterable<SSTableReader> readers)
    {
        for (SSTableReader reader : readers)
            txn.obsolete(reader);
    }
}
