/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cassandra.db;

import com.google.common.base.Charsets;

import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.*;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(OrderedJUnit4ClassRunner.class)
public class VerifyTest
{
    public static final String KEYSPACE = "Keyspace1";
    public static final String CF = "Standard1";
    public static final String CF2 = "Standard2";
    public static final String CF3 = "Standard3";
    public static final String CF4 = "Standard4";
    public static final String COUNTER_CF = "Counter1";
    public static final String COUNTER_CF2 = "Counter2";
    public static final String COUNTER_CF3 = "Counter3";
    public static final String COUNTER_CF4 = "Counter4";
    public static final String CORRUPT_CF = "Corrupt1";
    public static final String CORRUPT_CF2 = "Corrupt2";
    public static final String CORRUPTCOUNTER_CF = "CounterCorrupt1";
    public static final String CORRUPTCOUNTER_CF2 = "CounterCorrupt2";

    public static final String CF_UUID = "UUIDKeys";

    @BeforeClass
    public static void defineSchema() throws ConfigurationException
    {
        CompressionParams compressionParameters = CompressionParams.snappy(32768);

        SchemaLoader.loadSchema();
        SchemaLoader.createKeyspace(KEYSPACE,
                                    KeyspaceParams.simple(1),
                                    SchemaLoader.standardCFMD(KEYSPACE, CF).compression(compressionParameters),
                                    SchemaLoader.standardCFMD(KEYSPACE, CF2).compression(compressionParameters),
                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
                                    SchemaLoader.standardCFMD(KEYSPACE, CF4),
                                    SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF),
                                    SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF2),
                                    SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF).compression(compressionParameters),
                                    SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF2).compression(compressionParameters),
                                    SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF3),
                                    SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF4),
                                    SchemaLoader.counterCFMD(KEYSPACE, CORRUPTCOUNTER_CF),
                                    SchemaLoader.counterCFMD(KEYSPACE, CORRUPTCOUNTER_CF2),
                                    SchemaLoader.standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance));
    }


    @Test
    public void testVerifyCorrect() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);

        fillCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testVerifyCounterCorrect() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);

        fillCounterCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testExtendedVerifyCorrect() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);

        fillCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(true);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testExtendedVerifyCounterCorrect() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF2);

        fillCounterCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(true);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testVerifyCorrectUncompressed() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF3);

        fillCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testVerifyCounterCorrectUncompressed() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF3);

        fillCounterCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testExtendedVerifyCorrectUncompressed() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF4);

        fillCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(true);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }

    @Test
    public void testExtendedVerifyCounterCorrectUncompressed() throws IOException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF4);

        fillCounterCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(true);
        }
        catch (CorruptSSTableException err)
        {
            fail("Unexpected CorruptSSTableException");
        }
    }


    @Test
    public void testVerifyIncorrectDigest() throws IOException, WriteTimeoutException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF);

        fillCF(cfs, 2);

        Util.getAll(Util.cmd(cfs).build());

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();


        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent), "rw");
        Long correctChecksum = Long.parseLong(file.readLine());
        file.close();

        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
            fail("Expected a CorruptSSTableException to be thrown");
        }
        catch (CorruptSSTableException err) {}
    }


    @Test
    public void testVerifyCorruptRowCorrectDigest() throws IOException, WriteTimeoutException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);

        fillCF(cfs, 2);

        Util.getAll(Util.cmd(cfs).build());

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        // overwrite one row with garbage
        long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("0"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
        long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("1"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
        long startPosition = row0Start < row1Start ? row0Start : row1Start;
        long endPosition = row0Start < row1Start ? row1Start : row0Start;

        RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw");
        file.seek(startPosition);
        file.writeBytes(StringUtils.repeat('z', (int) 2));
        file.close();

        // Update the Digest to have the right Checksum
        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            // First a simple verify checking digest, which should succeed
            try
            {
                verifier.verify(false);
            }
            catch (CorruptSSTableException err)
            {
                fail("Simple verify should have succeeded as digest matched");
            }

            // Now try extended verify
            try
            {
                verifier.verify(true);

            }
            catch (CorruptSSTableException err)
            {
                return;
            }
            fail("Expected a CorruptSSTableException to be thrown");
        }
    }

    @Test(expected = CorruptSSTableException.class)
    public void testVerifyBrokenSSTableMetadata() throws IOException, WriteTimeoutException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);

        fillCF(cfs, 2);

        Util.getAll(Util.cmd(cfs).build());

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();

        String filenameToCorrupt = sstable.descriptor.filenameFor(Component.STATS);
        RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw");
        file.seek(0);
        file.writeBytes(StringUtils.repeat('z', 2));
        file.close();

        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
        }
    }

    @Test
    public void testMutateRepair() throws IOException, ExecutionException, InterruptedException
    {
        CompactionManager.instance.disableAutoCompaction();
        Keyspace keyspace = Keyspace.open(KEYSPACE);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);

        fillCF(cfs, 2);

        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
        sstable.reloadSSTableMetadata();
        cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
        assertTrue(sstable.isRepaired());
        cfs.forceMajorCompaction();

        sstable = cfs.getLiveSSTables().iterator().next();
        Long correctChecksum;
        try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent), "rw"))
        {
            correctChecksum = Long.parseLong(file.readLine());
        }
        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));
        try (Verifier verifier = new Verifier(cfs, sstable, false))
        {
            verifier.verify(false);
            fail("should be corrupt");
        }
        catch (CorruptSSTableException e)
        {}
        assertFalse(sstable.isRepaired());
    }


    protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable)
    {
        for (int i = 0; i < partitionsPerSSTable; i++)
        {
            UpdateBuilder.create(cfs.metadata, String.valueOf(i))
                         .newRow("c1").add("val", "1")
                         .newRow("c2").add("val", "2")
                         .apply();
        }

        cfs.forceBlockingFlush();
    }

    protected void fillCounterCF(ColumnFamilyStore cfs, int partitionsPerSSTable) throws WriteTimeoutException
    {
        for (int i = 0; i < partitionsPerSSTable; i++)
        {
            UpdateBuilder.create(cfs.metadata, String.valueOf(i))
                         .newRow("c1").add("val", 100L)
                         .apply();
        }

        cfs.forceBlockingFlush();
    }

    protected long simpleFullChecksum(String filename) throws IOException
    {
        FileInputStream inputStream = new FileInputStream(filename);
        CRC32 checksum = new CRC32();
        CheckedInputStream cinStream = new CheckedInputStream(inputStream, checksum);
        byte[] b = new byte[128];
        while (cinStream.read(b) >= 0) {
        }
        return cinStream.getChecksum().getValue();
    }

    protected void writeChecksum(long checksum, String filePath)
    {
        File outFile = new File(filePath);
        BufferedWriter out = null;
        try
        {
            out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8);
            out.write(String.valueOf(checksum));
            out.flush();
            out.close();
        }
        catch (IOException e)
        {
            throw new FSWriteError(e, outFile);
        }
        finally
        {
            FileUtils.closeQuietly(out);
        }

    }

}
