/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.index;

import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;

/**
 * An Ignite component that is responsible for handling index-related commands like CREATE or DROP
 * as well as managing indexes' lifecycle.
 *
 * <p>To avoid errors when using indexes while applying replication log during node recovery, the registration of indexes was moved to the
 * start of the tables.</p>
 */
// TODO: IGNITE-19082 Delete this class
public class IndexManager implements IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);

    /** Schema manager. */
    private final SchemaManager schemaManager;

    /** Table manager. */
    private final TableManager tableManager;

    /** Catalog service. */
    private final CatalogService catalogService;

    /** Separate executor for IO operations like storage initialization. */
    private final ExecutorService ioExecutor;

    /** Busy lock to stop synchronously. */
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    /** Prevents double stopping of the component. */
    private final AtomicBoolean stopGuard = new AtomicBoolean();

    /** Versioned value to prevent races when registering/unregistering indexes when processing metastore or catalog events. */
    private final IncrementalVersionedValue<Void> handleMetastoreEventVv;

    /** Low watermark. */
    private final LowWatermark lowWatermark;

    /** A queue for deferred index destruction events. */
    private final LongPriorityQueue<DestroyIndexEvent> destructionEventsQueue =
            new LongPriorityQueue<>(DestroyIndexEvent::catalogVersion);

    /**
     * Constructor.
     *
     * @param schemaManager Schema manager.
     * @param tableManager Table manager.
     * @param catalogService Catalog service.
     * @param ioExecutor Separate executor for IO operations like storage initialization.
     */
    public IndexManager(
            SchemaManager schemaManager,
            TableManager tableManager,
            CatalogService catalogService,
            ExecutorService ioExecutor,
            Consumer<LongFunction<CompletableFuture<?>>> registry,
            LowWatermark lowWatermark
    ) {
        this.schemaManager = schemaManager;
        this.tableManager = tableManager;
        this.catalogService = catalogService;
        this.ioExecutor = ioExecutor;
        this.lowWatermark = lowWatermark;

        handleMetastoreEventVv = new IncrementalVersionedValue<>(registry);
    }

    @Override
    public CompletableFuture<Void> startAsync() {
        LOG.debug("Index manager is about to start");

        recoverDestructionQueue();

        catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters parameters) -> onIndexCreate(parameters));
        catalogService.listen(INDEX_REMOVED, fromConsumer(this::onIndexRemoved));
        lowWatermark.listen(LOW_WATERMARK_CHANGED, parameters -> onLwmChanged((ChangeLowWatermarkEventParameters) parameters));

        LOG.info("Index manager started");

        return nullCompletedFuture();
    }

    @Override
    public CompletableFuture<Void> stopAsync() {
        LOG.debug("Index manager is about to stop");

        if (!stopGuard.compareAndSet(false, true)) {
            LOG.debug("Index manager already stopped");

            return nullCompletedFuture();
        }

        busyLock.block();

        LOG.info("Index manager stopped");

        return nullCompletedFuture();
    }

    /**
     * Returns a multi-version table storage with created index storages by passed parameters.
     *
     * <p>Example: when we start building an index, we will need {@link IndexStorage} (as well as storage {@link MvPartitionStorage}) to
     * build it and we can get them in {@link CatalogEvent#INDEX_CREATE} using this method.</p>
     *
     * <p>During recovery, it is important to wait until the local node becomes a primary replica so that all index building commands are
     * applied from the replication log.</p>
     *
     * @param causalityToken Causality token.
     * @param tableId Table ID.
     * @return Future with multi-version table storage, completes with {@code null} if the table does not exist according to the passed
     *      parameters.
     */
    CompletableFuture<@Nullable MvTableStorage> getMvTableStorage(long causalityToken, int tableId) {
        return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table == null ? null : table.internalTable().storage());
    }

    private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) {
        return inBusyLockAsync(busyLock, () -> {
            CatalogIndexDescriptor index = parameters.indexDescriptor();

            int indexId = index.id();
            int tableId = index.tableId();

            long causalityToken = parameters.causalityToken();
            int catalogVersion = parameters.catalogVersion();

            CatalogTableDescriptor table = catalogService.table(tableId, catalogVersion);

            assert table != null : "tableId=" + tableId + ", indexId=" + indexId;

            if (LOG.isInfoEnabled()) {
                LOG.info(
                        "Creating local index: name={}, id={}, tableId={}, token={}",
                        index.name(), indexId, tableId, causalityToken
                );
            }

            return startIndexAsync(table, index, causalityToken).thenApply(unused -> false);
        });
    }

    private void onIndexRemoved(RemoveIndexEventParameters parameters) {
        inBusyLock(busyLock, () -> {
            int indexId = parameters.indexId();
            int catalogVersion = parameters.catalogVersion();
            int previousCatalogVersion = catalogVersion - 1;

            // Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted.
            CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, previousCatalogVersion);
            assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + previousCatalogVersion;

            int tableId = indexDescriptor.tableId();

            if (catalogService.table(tableId, catalogVersion) == null) {
                // Nothing to do. Index will be destroyed along with the table.
                return;
            }

            destructionEventsQueue.enqueue(new DestroyIndexEvent(catalogVersion, indexId, tableId));
        });
    }

    private CompletableFuture<Boolean> onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
        return inBusyLockAsync(busyLock, () -> {
            int newEarliestCatalogVersion = catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());

            List<DestroyIndexEvent> events = destructionEventsQueue.drainUpTo(newEarliestCatalogVersion);

            return runAsync(
                    () -> events.forEach(event -> destroyIndex(event.indexId(), event.tableId())),
                    ioExecutor
            ).thenApply(unused -> false);
        });
    }

    private void destroyIndex(int indexId, int tableId) {
        TableViewInternal table = tableManager.cachedTable(tableId);

        if (table != null) {
            // In case of DROP TABLE the table will be removed with all its indexes.
            table.unregisterIndex(indexId);
        }
    }

    private CompletableFuture<?> startIndexAsync(
            CatalogTableDescriptor table,
            CatalogIndexDescriptor index,
            long causalityToken
    ) {
        int tableId = index.tableId();

        CompletableFuture<PartitionSet> tablePartitionFuture = tableManager.localPartitionSetAsync(causalityToken, tableId);

        CompletableFuture<SchemaRegistry> schemaRegistryFuture = schemaManager.schemaRegistry(causalityToken, tableId);

        return handleMetastoreEventVv.update(
                causalityToken,
                updater(mvTableStorageById -> tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
                        (partitionSet, schemaRegistry) -> inBusyLock(busyLock, () -> {
                            registerIndex(table, index, partitionSet, schemaRegistry);

                            return null;
                        }), ioExecutor))
        );
    }

    private void registerIndex(
            CatalogTableDescriptor table,
            CatalogIndexDescriptor index,
            PartitionSet partitionSet,
            SchemaRegistry schemaRegistry
    ) {
        TableViewInternal tableView = getTableViewStrict(table.id());

        registerIndexToTable(tableView, table, index, partitionSet, schemaRegistry);
    }

    private TableViewInternal getTableViewStrict(int tableId) {
        TableViewInternal table = tableManager.cachedTable(tableId);

        assert table != null : tableId;

        return table;
    }

    private static <T> BiFunction<T, Throwable, CompletableFuture<T>> updater(Function<T, CompletableFuture<T>> updateFunction) {
        return (t, throwable) -> {
            if (throwable != null) {
                return failedFuture(throwable);
            }

            return updateFunction.apply(t);
        };
    }

    /** Recover deferred destroy events. */
    private void recoverDestructionQueue() {
        // LWM starts updating only after the node is restored.
        HybridTimestamp lwm = lowWatermark.getLowWatermark();

        int earliestCatalogVersion = catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
        int latestCatalogVersion = catalogService.latestCatalogVersion();

        for (int catalogVersion = latestCatalogVersion - 1; catalogVersion >= earliestCatalogVersion; catalogVersion--) {
            int nextVersion = catalogVersion + 1;
            catalogService.indexes(catalogVersion).stream()
                    .filter(idx -> catalogService.index(idx.id(), nextVersion) == null)
                    .forEach(idx -> destructionEventsQueue.enqueue(new DestroyIndexEvent(nextVersion, idx.id(), idx.tableId())));
        }
    }

    /** Internal event. */
    private static class DestroyIndexEvent {
        final int catalogVersion;
        final int indexId;
        final int tableId;

        DestroyIndexEvent(int catalogVersion, int indexId, int tableId) {
            this.catalogVersion = catalogVersion;
            this.indexId = indexId;
            this.tableId = tableId;
        }

        public int catalogVersion() {
            return catalogVersion;
        }

        public int indexId() {
            return indexId;
        }

        public int tableId() {
            return tableId;
        }
    }
}
