/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.hints;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.FBUtilities;

/**
 * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format.
 */
@SuppressWarnings("deprecation")
public final class LegacyHintsMigrator
{
    private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class);

    private final File hintsDirectory;
    private final long maxHintsFileSize;

    private final ColumnFamilyStore legacyHintsTable;
    private final int pageSize;

    public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize)
    {
        this.hintsDirectory = hintsDirectory;
        this.maxHintsFileSize = maxHintsFileSize;

        legacyHintsTable = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
        pageSize = calculatePageSize(legacyHintsTable);
    }

    // read fewer columns (mutations) per page if they are very large
    private static int calculatePageSize(ColumnFamilyStore legacyHintsTable)
    {
        int size = 128;

        int meanCellCount = legacyHintsTable.getMeanColumns();
        double meanPartitionSize = legacyHintsTable.getMeanPartitionSize();

        if (meanCellCount != 0 && meanPartitionSize != 0)
        {
            int avgHintSize = (int) meanPartitionSize / meanCellCount;
            size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize));
        }

        return size;
    }

    public void migrate()
    {
        // nothing to migrate
        if (legacyHintsTable.isEmpty())
            return;
        logger.info("Migrating legacy hints to new storage");

        // major-compact all of the existing sstables to get rid of the tombstones + expired hints
        logger.info("Forcing a major compaction of {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
        compactLegacyHints();

        // paginate over legacy hints and write them to the new storage
        logger.info("Writing legacy hints to the new storage");
        migrateLegacyHints();

        // truncate the legacy hints table
        logger.info("Truncating {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
        legacyHintsTable.truncateBlocking();
    }

    private void compactLegacyHints()
    {
        Collection<Descriptor> descriptors = new ArrayList<>();
        legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor));
        if (!descriptors.isEmpty())
            forceCompaction(descriptors);
    }

    private void forceCompaction(Collection<Descriptor> descriptors)
    {
        try
        {
            CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get();
        }
        catch (InterruptedException | ExecutionException e)
        {
            throw new RuntimeException(e);
        }
    }

    private void migrateLegacyHints()
    {
        ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
        String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
        //noinspection ConstantConditions
        QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer));
        FileUtils.clean(buffer);
    }

    private void migrateLegacyHints(UUID hostId, ByteBuffer buffer)
    {
        String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " +
                                     "FROM %s.%s " +
                                     "WHERE target_id = ?",
                                     SystemKeyspace.NAME,
                                     SystemKeyspace.LEGACY_HINTS);

        // read all the old hints (paged iterator), write them in the new format
        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId);
        migrateLegacyHints(hostId, rows, buffer);

        // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows
        // to not lose progress in case of a terminated conversion
        deleteLegacyHintsPartition(hostId);
    }

    private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer)
    {
        migrateLegacyHints(hostId, rows.iterator(), buffer);
    }

    private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
    {
        do
        {
            migrateLegacyHintsInternal(hostId, iterator, buffer);
            // if there are hints that didn't fit in the previous file, keep calling the method to write to a new
            // file until we get everything written.
        }
        while (iterator.hasNext());
    }

    private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
    {
        HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis());

        try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor))
        {
            try (HintsWriter.Session session = writer.newSession(buffer))
            {
                while (iterator.hasNext())
                {
                    Hint hint = convertLegacyHint(iterator.next());
                    if (hint != null)
                        session.append(hint);

                    if (session.position() >= maxHintsFileSize)
                        break;
                }
            }
        }
        catch (IOException e)
        {
            throw new FSWriteError(e, descriptor.fileName());
        }
    }

    private static Hint convertLegacyHint(UntypedResultSet.Row row)
    {
        Mutation mutation = deserializeLegacyMutation(row);
        if (mutation == null)
            return null;

        long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table
        int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl");
        int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime);

        int gcgs = Math.min(originalGCGS, mutation.smallestGCGS());

        return Hint.create(mutation, creationTime, gcgs);
    }

    private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
    {
        try (DataInputBuffer dib = new DataInputBuffer(row.getBlob("mutation"), true))
        {
            Mutation mutation = Mutation.serializer.deserialize(dib,
                                                                row.getInt("message_version"));
            mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
            return mutation;
        }
        catch (IOException e)
        {
            logger.error("Failed to migrate a hint for {} from legacy {}.{} table: {}",
                         row.getUUID("target_id"),
                         SystemKeyspace.NAME,
                         SystemKeyspace.LEGACY_HINTS,
                         e);
            return null;
        }
        catch (MarshalException e)
        {
            logger.warn("Failed to validate a hint for {} (table id {}) from legacy {}.{} table - skipping: {})",
                        row.getUUID("target_id"),
                        SystemKeyspace.NAME,
                        SystemKeyspace.LEGACY_HINTS,
                        e);
            return null;
        }
    }

    private static void deleteLegacyHintsPartition(UUID hostId)
    {
        // intentionally use millis, like the rest of the legacy implementation did, just in case
        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints,
                                                                             UUIDType.instance.decompose(hostId),
                                                                             System.currentTimeMillis(),
                                                                             FBUtilities.nowInSeconds()));
        mutation.applyUnsafe();
    }
}
