/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.table.distributed;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;

/**
 * Table manager.
 */
public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables {
    /** The logger. */
    private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);

    /** Internal prefix for the metasorage. */
    private static final String INTERNAL_PREFIX = "internal.tables.";

    /** Meta storage service. */
    private final MetaStorageManager metaStorageMgr;

    /** Configuration manager. */
    private final ConfigurationManager configurationMgr;

    /** Raft manmager. */
    private final Loza raftMgr;

    /** Schema manager. */
    private final SchemaManager schemaMgr;

    /** Affinity manager. */
    private final AffinityManager affMgr;

    /** Tables. */
    private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();

    /**
     * Creates a new table manager.
     *
     * @param configurationMgr Configuration manager.
     * @param metaStorageMgr Meta storage manager.
     * @param schemaMgr Schema manager.
     * @param affMgr Affinity manager.
     * @param raftMgr Raft manager.
     * @param vaultManager Vault manager.
     */
    public TableManager(
        ConfigurationManager configurationMgr,
        MetaStorageManager metaStorageMgr,
        SchemaManager schemaMgr,
        AffinityManager affMgr,
        Loza raftMgr,
        VaultManager vaultManager
    ) {
        this.configurationMgr = configurationMgr;
        this.metaStorageMgr = metaStorageMgr;
        this.affMgr = affMgr;
        this.raftMgr = raftMgr;
        this.schemaMgr = schemaMgr;

        listenForTableChange();
    }

    /**
     * Creates local structures for a table.
     *
     * @param name Table name.
     * @param tblId Table id.
     * @param assignment Affinity assignment.
     * @param schemaReg Schema registry for the table.
     */
    private void createTableLocally(
        String name,
        UUID tblId,
        List<List<ClusterNode>> assignment,
        SchemaRegistry schemaReg
    ) {
        int partitions = assignment.size();

        HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);

        for (int p = 0; p < partitions; p++) {
            partitionMap.put(p, raftMgr.startRaftGroup(
                raftGroupName(tblId, p),
                assignment.get(p),
                new PartitionCommandListener()
            ));
        }

        InternalTableImpl internalTable = new InternalTableImpl(tblId, partitionMap, partitions);

        tables.put(name, new TableImpl(internalTable, schemaReg));

        onEvent(TableEvent.CREATE, new TableEventParameters(
            tblId,
            name,
            schemaReg,
            internalTable
        ), null);
    }

    /**
     * Drops local structures for a table.
     *
     * @param name Table name.
     * @param tblId Table id.
     * @param assignment Affinity assignment.
     */
    private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
        int partitions = assignment.size();

        for (int p = 0; p < partitions; p++)
            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));

        TableImpl table = tables.get(name);

        assert table != null : "There is no table with the name specified [name=" + name + ']';

        onEvent(TableEvent.DROP, new TableEventParameters(
            tblId,
            name,
            table.schemaView(),
            table.internalTable()
        ), null);
    }

    /**
     * Compounds a RAFT group unique name.
     *
     * @param tableId Table identifier.
     * @param partition Muber of table partition.
     * @return A RAFT group name.
     */
    @NotNull private String raftGroupName(UUID tableId, int partition) {
        return tableId + "_part_" + partition;
    }

    /**
     * Listens on a drop or create table.
     */
    private void listenForTableChange() {
        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
        configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
            Set<String> tablesToStart = (ctx.newValue() == null || ctx.newValue().namedListKeys() == null) ?
                Collections.emptySet() : new HashSet<>(ctx.newValue().namedListKeys());

            tablesToStart.removeAll(ctx.oldValue().namedListKeys());

            long revision = ctx.storageRevision();

            List<CompletableFuture<Boolean>> futs = new ArrayList<>();

            boolean hasMetastorageLocally = MetaStorageManager.hasMetastorageLocally(configurationMgr);

            for (String tblName : tablesToStart) {
                TableView tableView = ctx.newValue().get(tblName);

                UUID tblId = new UUID(revision, 0L);

                if (hasMetastorageLocally) {
                    var key = new ByteArray(INTERNAL_PREFIX + tblId);
                    futs.add(metaStorageMgr.invoke(
                        Conditions.notExists(key),
                        Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
                        Operations.noop())
                        .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name()))
                        .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name())));
                }

                final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>();
                final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();

                CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
                    .exceptionally(e -> {
                        LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);

                        onEvent(TableEvent.CREATE, new TableEventParameters(
                            tblId,
                            tblName,
                            null,
                            null
                        ), e);

                        return null;
                    })
                    .thenRun(() -> createTableLocally(
                        tblName,
                        tblId,
                        affinityReadyFut.join().assignment(),
                        schemaReadyFut.join().schemaRegistry()
                    ));

                affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
                    if (!tblId.equals(parameters.tableId()))
                        return false;

                    if (e == null)
                        affinityReadyFut.complete(parameters);
                    else
                        affinityReadyFut.completeExceptionally(e);

                    return true;
                });

                schemaMgr.listen(SchemaEvent.INITIALIZED, (parameters, e) -> {
                    if (!tblId.equals(parameters.tableId()))
                        return false;

                    if (e == null)
                        schemaReadyFut.complete(parameters);
                    else
                        schemaReadyFut.completeExceptionally(e);

                    return true;
                });
            }

            Set<String> tablesToStop = (ctx.oldValue() == null || ctx.oldValue().namedListKeys() == null) ?
                Collections.emptySet() : new HashSet<>(ctx.oldValue().namedListKeys());

            tablesToStop.removeAll(ctx.newValue().namedListKeys());

            for (String tblName : tablesToStop) {
                TableImpl t = tables.get(tblName);

                UUID tblId = t.internalTable().tableId();

                if (hasMetastorageLocally) {
                    var key = new ByteArray(INTERNAL_PREFIX + tblId);

                    futs.add(affMgr.removeAssignment(tblId)
                        .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
                        .thenCompose(res ->
                            metaStorageMgr.invoke(Conditions.exists(key),
                                Operations.remove(key),
                                Operations.noop())));
                }

                affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
                    if (!tblId.equals(parameters.tableId()))
                        return false;

                    if (e == null)
                        dropTableLocally(tblName, tblId, parameters.assignment());
                    else {
                        LOG.error("Failed to drop a table [name=" + tblName + ", id=" + tblId + ']', e);

                        onEvent(TableEvent.DROP, new TableEventParameters(
                            tblId,
                            tblName,
                            null,
                            null
                        ), e);
                    }

                    return true;
                });
            }

            return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
        });
    }

    /** {@inheritDoc} */
    @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
        CompletableFuture<Table> tblFut = new CompletableFuture<>();

        listen(TableEvent.CREATE, (params, e) -> {
            String tableName = params.tableName();

            if (!name.equals(tableName))
                return false;

            if (e == null) {
                tblFut.complete(tables.get(name));
            }
            else
                tblFut.completeExceptionally(e);

            return true;
        });

        try {
            configurationMgr.configurationRegistry()
                .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
                change.create(name, tableInitChange)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Table wasn't created [name=" + name + ']', e);

            tblFut.completeExceptionally(e);
        }

        return tblFut.join();
    }

    /** {@inheritDoc} */
    @Override public void dropTable(String name) {
        CompletableFuture<Void> dropTblFut = new CompletableFuture<>();

        listen(TableEvent.DROP, new BiPredicate<>() {
            @Override public boolean test(TableEventParameters params, Throwable e) {
                String tableName = params.tableName();

                if (!name.equals(tableName))
                    return false;

                if (e == null) {
                    Table droppedTable = tables.remove(tableName);

                    assert droppedTable != null;

                    dropTblFut.complete(null);
                }
                else
                    dropTblFut.completeExceptionally(e);

                return true;
            }
        });

        try {
            configurationMgr
                .configurationRegistry()
                .getConfiguration(TablesConfiguration.KEY)
                .tables()
                .change(change -> change.delete(name)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Table wasn't dropped [name=" + name + ']', e);

            dropTblFut.completeExceptionally(e);
        }

        dropTblFut.join();
    }

    /** {@inheritDoc} */
    @Override public List<Table> tables() {
        return new ArrayList<>(tables.values());
    }

    /** {@inheritDoc} */
    @Override public Table table(String name) {
        return tables.get(name);
    }
}
