/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.table;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfigurationSchema;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.configuration.tree.NamedListNode;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.SchemaTable;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

/**
 * Tests scenarios for table manager.
 */
@ExtendWith({MockitoExtension.class, WorkDirectoryExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerTest {
    /** The logger. */
    private static final IgniteLogger LOG = IgniteLogger.forClass(TableManagerTest.class);

    /** Public prefix for metastorage. */
    private static final String PUBLIC_PREFIX = "dst-cfg.table.tables.";

    /** The name of the table which is statically configured. */
    private static final String STATIC_TABLE_NAME = "t1";

    /** The name of the table which will be configured dynamically. */
    private static final String DYNAMIC_TABLE_NAME = "t2";

    /** The name of table to drop it. */
    private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "t3";

    /** Table partitions. */
    private static final int PARTITIONS = 32;

    /** Node name. */
    private static final String NODE_NAME = "node1";

    /** Node configuration manager. */
    private ConfigurationManager nodeCfgMgr;

    /** Cluster configuration manager. */
    private ConfigurationManager clusterCfgMgr;

    /** MetaStorage manager. */
    @Mock(lenient = true)
    private MetaStorageManager mm;

    /** Schema manager. */
    @Mock(lenient = true)
    private BaselineManager bm;

    /** Raft manager. */
    @Mock(lenient = true)
    private Loza rm;

    @WorkDirectory
    private Path workDir;

    /** Test node. */
    private final ClusterNode node = new ClusterNode(
        UUID.randomUUID().toString(),
        NODE_NAME,
        new NetworkAddress("127.0.0.1", 2245)
    );

    /** Before all test scenarios. */
    @BeforeEach
    void setUp() {
        try {
            nodeCfgMgr = new ConfigurationManager(
                List.of(NodeConfiguration.KEY),
                Map.of(),
                new TestConfigurationStorage(LOCAL),
                List.of()
            );

            clusterCfgMgr = new ConfigurationManager(
                List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY),
                Map.of(),
                new TestConfigurationStorage(DISTRIBUTED),
                Collections.singletonList(ExtendedTableConfigurationSchema.class)
            );

            nodeCfgMgr.start();
            clusterCfgMgr.start();

            nodeCfgMgr.bootstrap("{\n" +
                "   \"node\":{\n" +
                "      \"metastorageNodes\":[\n" +
                "         \"" + NODE_NAME + "\"\n" +
                "      ]\n" +
                "   }\n" +
                "}");
        }
        catch (Exception e) {
            LOG.error("Failed to bootstrap the test configuration manager.", e);

            fail("Failed to configure manager [err=" + e.getMessage() + ']');
        }
    }

    /** Stop configuration manager. */
    @AfterEach
    void tearDown() {
        nodeCfgMgr.stop();
        clusterCfgMgr.stop();
    }

    /**
     * Tests a table which was defined before start through bootstrap configuration.
     */
    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15255")
    @Test
    public void testStaticTableConfigured() {
        TableManager tableManager = new TableManager(
            clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
            rm,
            bm,
            mm,
            workDir
        );

        assertEquals(1, tableManager.tables().size());

        assertNotNull(tableManager.table(STATIC_TABLE_NAME));
    }

    /**
     * Tests create a table through public API.
     */
    @Test
    public void testCreateTable() {
        CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();

        SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
            SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
        ).withPrimaryKey("key").build();

        Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);

        assertNotNull(table);

        assertSame(table, tblManagerFut.join().table(scmTbl.canonicalName()));
    }

    /**
     * Tests drop a table through public API.
     */
    @Test
    public void testDropTable() {
        CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();

        SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
            SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
        ).withPrimaryKey("key").build();

        mockManagersAndCreateTable(scmTbl, tblManagerFut);

        TableManager tableManager = tblManagerFut.join();

        tableManager.dropTable(scmTbl.canonicalName());

        assertNull(tableManager.table(scmTbl.canonicalName()));

        assertEquals(0, tableManager.tables().size());
    }

    /**
     * Instantiates a table and prepares Table manager.
     */
    @Test
    public void testGetTableDuringCreation() throws Exception {
        CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();

        SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
            SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
        ).withPrimaryKey("key").build();

        Phaser phaser = new Phaser(2);

        CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
            mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
        );

        CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
            phaser.awaitAdvance(0);

            return tblManagerFut.join().table(scmTbl.canonicalName());
        });

        CompletableFuture<Collection<Table>> getAllTablesFut = CompletableFuture.supplyAsync(() -> {
            phaser.awaitAdvance(0);

            return tblManagerFut.join().tables();
        });

        assertFalse(createFut.isDone());
        assertFalse(getFut.isDone());
        assertFalse(getAllTablesFut.isDone());

        phaser.arrive();

        assertSame(createFut.join(), getFut.join());

        assertEquals(1, getAllTablesFut.join().size());
    }

    /**
     * Tries to create a table that already exists.
     */
    @Test
    public void testDoubledCreateTable() {
        CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();

        SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
            .columns(
                SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
                SchemaBuilders.column("val", ColumnType.INT64).asNullable().build())
            .withPrimaryKey("key")
            .build();

        Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);

        assertNotNull(table);

        assertThrows(RuntimeException.class, () -> tblManagerFut.join().createTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
            .changeReplicas(1)
            .changePartitions(10)));

        assertSame(table, tblManagerFut.join().getOrCreateTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
            .changeReplicas(1)
            .changePartitions(10)));
    }

    /**
     * Instantiates Table manager and creates a table in it.
     *
     * @param schemaTable Configuration schema for a table.
     * @param tblManagerFut Future for table manager.
     * @return Table.
     */
    private TableImpl mockManagersAndCreateTable(
        SchemaTable schemaTable,
        CompletableFuture<TableManager> tblManagerFut
    ) {
        return mockManagersAndCreateTableWithDelay(schemaTable, tblManagerFut, null);
    }

    /**
     * Instantiates a table and prepares Table manager. When the latch would open, the method completes.
     *
     * @param schemaTable Configuration schema for a table.
     * @param tblManagerFut Future for table manager.
     * @param phaser Phaser for the wait.
     * @return Table manager.
     */
    @NotNull private TableImpl mockManagersAndCreateTableWithDelay(
        SchemaTable schemaTable,
        CompletableFuture<TableManager> tblManagerFut,
        Phaser phaser
    ) {
        when(rm.prepareRaftGroup(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null));

        AtomicBoolean tableCreatedFlag = new AtomicBoolean();

        try (MockedStatic<SchemaUtils> schemaServiceMock = mockStatic(SchemaUtils.class)) {
            schemaServiceMock.when(() -> SchemaUtils.prepareSchemaDescriptor(anyInt(), any())).
                thenReturn(mock(SchemaDescriptor.class));
        }

        try (MockedStatic<AffinityUtils> affinityServiceMock = mockStatic(AffinityUtils.class)) {
            ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);

            for (int part = 0; part < PARTITIONS; part++)
                assignment.add(new ArrayList<>(Collections.singleton(node)));

            affinityServiceMock.when(() -> AffinityUtils.calculateAssignments(any(), anyInt(), anyInt())).
                thenReturn(assignment);
        }

        TableManager tableManager = new TableManager(
            clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
            rm,
            bm,
            mm,
            workDir
        );

        TableImpl tbl2 = null;

        try {
            tableManager.start();

            tblManagerFut.complete(tableManager);

            when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
                Cursor<Entry> cursor = mock(Cursor.class);

                when(cursor.hasNext()).thenReturn(false);

                return cursor;
            });

            int tablesBeforeCreation = tableManager.tables().size();

            clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
                boolean createTbl = ctx.newValue().get(schemaTable.canonicalName()) != null &&
                    ctx.oldValue().get(schemaTable.canonicalName()) == null;

                boolean dropTbl = ctx.oldValue().get(schemaTable.canonicalName()) != null &&
                    ctx.newValue().get(schemaTable.canonicalName()) == null;

                if (!createTbl && !dropTbl)
                    return CompletableFuture.completedFuture(null);

                tableCreatedFlag.set(createTbl);

                try {
                    when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
                        AtomicBoolean firstRecord = new AtomicBoolean(createTbl);

                        Cursor<Entry> cursor = mock(Cursor.class);

                        when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
                            firstRecord.compareAndSet(true, false));

                        Entry mockEntry = mock(Entry.class);

                        when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));

                        when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));

                        when(cursor.next()).thenReturn(mockEntry);

                        return cursor;
                    });
                }
                catch (NodeStoppingException e) {
                    LOG.error("Node was stopped during table creation.", e);

                    fail();
                }

                if (phaser != null)
                    phaser.arriveAndAwaitAdvance();

                return CompletableFuture.completedFuture(null);
            });

            tbl2 = (TableImpl)tableManager.createTable(schemaTable.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(schemaTable, tblCh)
                .changeReplicas(1)
                .changePartitions(10)
            );

            assertNotNull(tbl2);

            assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
        }
        catch (NodeStoppingException e) {
            LOG.error("Node was stopped during table creation.", e);

            fail();
        }
        finally {
            tableManager.stop();
        }

        return tbl2;
    }
}
