/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.systemview;

import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_ATTRIBUTES_KEY;
import static org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_ATTRIBUTES_LIST_SEPARATOR;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.SubscriptionUtils.fromIterable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.function.Function;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViews;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

/**
 * Test class for {@link SystemViewManagerImpl}.
 */
@ExtendWith(MockitoExtension.class)
public class SystemViewManagerTest extends BaseIgniteAbstractTest {
    private static final String LOCAL_NODE_NAME = "LOCAL_NODE_NAME";

    private final CatalogManager catalog = Mockito.mock(CatalogManager.class);

    private SystemViewManagerImpl viewMgr;

    @BeforeEach
    void setUp() {
        viewMgr = new SystemViewManagerImpl(LOCAL_NODE_NAME, catalog);
    }

    @Test
    public void registerDuplicateNameFails() {
        String name = "testView1";

        SystemView<?> view = dummyView(name);
        SystemView<?> viewWithSameName = dummyView(name);

        viewMgr.register(() -> List.of(view));

        assertThrows(IllegalArgumentException.class, () -> viewMgr.register(() -> List.of(viewWithSameName)));
        verifyNoInteractions(catalog);
    }

    @Test
    public void registerAfterStartFails() {
        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        assertThrows(IllegalStateException.class, () -> viewMgr.register(() -> List.of(dummyView("test"))));
        verifyNoInteractions(catalog);
    }

    @Test
    public void startAfterStartFails() {
        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());

        viewMgr.register(() -> List.of(dummyView("test")));

        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        verify(catalog, times(1)).execute(anyList());
        reset(catalog);

        assertThrows(IllegalStateException.class, viewMgr::startAsync);

        assertThrows(IllegalStateException.class, viewMgr::startAsync);

        verifyNoMoreInteractions(catalog);
    }

    @Test
    public void registrationCompletesWithoutViews() {
        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        verifyNoMoreInteractions(catalog);

        assertTrue(viewMgr.completeRegistration().isDone());
    }

    @ParameterizedTest
    @EnumSource(NativeTypeSpec.class)
    public void registerAllColumnTypes(NativeTypeSpec typeSpec) {
        NativeType type = SchemaTestUtils.specToType(typeSpec);

        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());

        viewMgr.register(() -> List.of(dummyView("test", type)));
        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        verify(catalog, times(1)).execute(anyList());
        assertTrue(viewMgr.completeRegistration().isDone());
    }

    @Test
    public void managerStartsSuccessfullyEvenIfCatalogRespondsWithError() {
        CatalogValidationException expected = new CatalogValidationException("Expected exception.");

        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
        when(catalog.execute(anyList())).thenReturn(failedFuture(expected));

        viewMgr.register(() -> List.of(dummyView("test")));

        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        verify(catalog, times(1)).execute(anyList());

        assertThat(viewMgr.completeRegistration(), willBe(nullValue()));
    }

    @Test
    public void nodeAttributesUpdatedAfterStart() {
        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());

        String name1 = "view1";
        String name2 = "view2";

        viewMgr.register(() -> List.of(dummyView(name1), dummyView(name2)));

        assertThat(viewMgr.nodeAttributes(), aMapWithSize(0));

        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        assertThat(viewMgr.nodeAttributes(), is(Map.of(NODE_ATTRIBUTES_KEY, String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, name1.toUpperCase(
                Locale.ROOT), name2.toUpperCase(Locale.ROOT)))));
    }

    @Test
    public void registrationFutureCompletesWhenComponentStops() {
        assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());

        assertThat(viewMgr.completeRegistration(), willThrowFast(NodeStoppingException.class));
    }

    @Test
    public void startAfterStopFails() {
        assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());

        //noinspection ThrowableNotThrown
        assertThrowsWithCause(viewMgr::startAsync, NodeStoppingException.class);
    }

    @Test
    public void registerAfterStopFails() {
        assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());

        //noinspection ThrowableNotThrown
        assertThrowsWithCause(() -> viewMgr.register(() -> List.of(dummyView("test"))), NodeStoppingException.class);
    }

    @Test
    public void stopAfterStopDoesNothing() {
        assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
        assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
    }

    @Test
    @SuppressWarnings("DataFlowIssue")
    void managerDerivesViewPlacementFromLogicalTopologyEvents() {
        String viewName = "MY_VIEW";

        assertThat(viewMgr.owningNodes(viewName), empty());

        List<String> allNodes = List.of("A", "B", "C");

        LogicalTopologySnapshot topologySnapshot = topologySnapshot(viewName, allNodes, 0, 2);
        viewMgr.onNodeJoined(first(topologySnapshot.nodes()), topologySnapshot);

        assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
        assertThat(viewMgr.owningNodes(viewName), hasItem("C"));

        topologySnapshot = topologySnapshot(viewName, allNodes, 0, 1);
        viewMgr.onNodeLeft(first(topologySnapshot.nodes()), topologySnapshot);

        assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
        assertThat(viewMgr.owningNodes(viewName), hasItem("B"));

        topologySnapshot = topologySnapshot(viewName, allNodes, 1, 2);
        viewMgr.onTopologyLeap(topologySnapshot);

        assertThat(viewMgr.owningNodes(viewName), hasItem("B"));
        assertThat(viewMgr.owningNodes(viewName), hasItem("C"));
    }

    @Test
    void viewScanTest() {
        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());

        String nodeView = "NODE_VIEW";
        String clusterView = "CLUSTER_VIEW";

        class Pojo {
            private final int c1;
            private final int c2;

            private Pojo(int c1, int c2) {
                this.c1 = c1;
                this.c2 = c2;
            }
        }

        Publisher<Pojo> dataSet = fromIterable(List.of(new Pojo(1, 1), new Pojo(2, 2)));

        viewMgr.register(() -> List.of(
                SystemViews.<Pojo>nodeViewBuilder()
                        .name(nodeView)
                        .nodeNameColumnAlias("NODE")
                        .addColumn("C1", NativeTypes.INT32, p -> p.c1)
                        .addColumn("C2", NativeTypes.INT32, p -> p.c2)
                        .dataProvider(dataSet)
                        .build(),
                SystemViews.<Pojo>clusterViewBuilder()
                        .name(clusterView)
                        .addColumn("C1", NativeTypes.INT32, p -> p.c1)
                        .addColumn("C2", NativeTypes.INT32, p -> p.c2)
                        .dataProvider(dataSet)
                        .build()
        ));

        assertThat(viewMgr.startAsync(), willCompleteSuccessfully());

        {
            DrainAllSubscriber<InternalTuple> subs = new DrainAllSubscriber<>();

            viewMgr.scanView(clusterView).subscribe(subs);

            List<InternalTuple> entries = await(subs.completion);

            assertThat(entries, hasSize(2));
            assertThat(entries.get(0).intValue(0), equalTo(1));
            assertThat(entries.get(0).intValue(1), equalTo(1));
            assertThat(entries.get(1).intValue(0), equalTo(2));
            assertThat(entries.get(1).intValue(1), equalTo(2));
        }

        {
            DrainAllSubscriber<InternalTuple> subs = new DrainAllSubscriber<>();

            viewMgr.scanView(nodeView).subscribe(subs);

            List<InternalTuple> entries = await(subs.completion);

            assertThat(entries, hasSize(2));
            assertThat(entries.get(0).stringValue(0), equalTo(LOCAL_NODE_NAME));
            assertThat(entries.get(0).intValue(1), equalTo(1));
            assertThat(entries.get(0).intValue(2), equalTo(1));
            assertThat(entries.get(1).stringValue(0), equalTo(LOCAL_NODE_NAME));
            assertThat(entries.get(1).intValue(1), equalTo(2));
            assertThat(entries.get(1).intValue(2), equalTo(2));
        }

    }

    private static SystemView<?> dummyView(String name) {
        return dummyView(name, NativeTypes.INT32);
    }

    private static <T> SystemView<T> dummyView(String name, NativeType type) {
        return (SystemView<T>) SystemViews.nodeViewBuilder()
                .nodeNameColumnAlias("NODE")
                .name(name)
                .addColumn("c1", type, (Function<Object, T>) Function.identity())
                .dataProvider(fromIterable(List.of()))
                .build();
    }

    private static LogicalTopologySnapshot topologySnapshot(String viewName, List<String> allNodes, int... owningNodes) {
        BitSet owningNodesSet = new BitSet();

        for (int idx : owningNodes) {
            owningNodesSet.set(idx);
        }

        List<LogicalNode> topology = new ArrayList<>(allNodes.size());

        for (int i = 0; i < allNodes.size(); i++) {
            String name = allNodes.get(i);

            ClusterNode clusterNode = new ClusterNodeImpl(name, name, new NetworkAddress("127.0.0.1", 1010 + i));

            Map<String, String> systemAttributes;
            if (owningNodesSet.get(i)) {
                systemAttributes = Map.of(NODE_ATTRIBUTES_KEY, viewName);
            } else {
                systemAttributes = Map.of();
            }

            topology.add(new LogicalNode(clusterNode, Map.of(), systemAttributes, List.of()));
        }

        return new LogicalTopologySnapshot(1, topology);
    }

    static class DrainAllSubscriber<T> implements Subscriber<T> {
        private final List<T> entries = new ArrayList<>();

        CompletableFuture<List<T>> completion = new CompletableFuture<>();

        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(T item) {
            entries.add(item);
        }

        @Override
        public void onError(Throwable throwable) {
            completion.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            completion.complete(entries);
        }
    }
}
