/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.messaging;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest;
import org.junit.Test;

/**
 * The test checks process messaging.
 */
public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVariationsAbstractTest {
    /**
     * Message topic.
     */
    private static final String MESSAGE_TOPIC = "topic";

    /** */
    private static final int MSGS = 100;

    /**
     * Static latch.
     */
    public static CountDownLatch LATCH;

    /** {@inheritDoc} */
    @Override protected boolean expectedClient(String testGridName) {
        return getTestIgniteInstanceName(CLIENT_NODE_IDX).equals(testGridName)
            || getTestIgniteInstanceName(3).equals(testGridName)
            || getTestIgniteInstanceName(5).equals(testGridName);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLocalServer() throws Exception {
        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                localServerInternal(false);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLocalServerAsync() throws Exception {
        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                localServerInternal(true);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLocalListener() throws Exception {
        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                localListenerInternal();
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testServerClientMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                serverClientMessage(false);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testServerClientMessageAsync() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                serverClientMessage(true);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientClientMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientClientMessage(false);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientClientMessageAsync() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientClientMessage(true);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientServerMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientServerMessage(false);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientServerMessageAsync() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientServerMessage(true);
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCollectionMessage() throws Exception {
        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                collectionMessage();
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testOrderedMessage() throws Exception {
        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                orderedMessage();
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientServerOrderedMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientServerOrderedMessage();
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientClientOrderedMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                clientClientOrderedMessage();
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testServerClientOrderedMessage() throws Exception {
        if (!testsCfg.withClients())
            return;

        runInAllDataModes(new TestRunnable() {
            @Override public void run() throws Exception {
                serverClientOrderedMessage();
            }
        });
    }

    /**
     * Single server test.
     *
     * @param async Async message send flag.
     * @throws Exception If failed.
     */
    private void localServerInternal(boolean async) throws Exception {
        int messages = MSGS;

        Ignite ignite = grid(SERVER_NODE_IDX);

        LATCH = new CountDownLatch(messages);

        ClusterGroup grp = grid(SERVER_NODE_IDX).cluster().forLocal();

        UUID opId = registerListener(grp);

        try {
            for (int i = 0; i < messages; i++)
                sendMessage(ignite, grp, value(i), async);

            assertTrue(LATCH.await(10, TimeUnit.SECONDS));

        }
        finally {
            ignite.message().stopRemoteListen(opId);
        }
    }

    /**
     * Single server test with local listener.
     * @throws Exception If failed.
     */
    private void localListenerInternal() throws Exception {
        int messages = MSGS;

        Ignite ignite = grid(SERVER_NODE_IDX);

        LATCH = new CountDownLatch(messages);

        ClusterGroup grp = grid(SERVER_NODE_IDX).cluster().forLocal();

        MessageListener c = new MessageListener();

        try {
            ignite.message(grp).localListen("localListenerTopic", c);

            for (int i = 0; i < messages; i++)
                ignite.message(grp).send("localListenerTopic", value(i));

            assertTrue(LATCH.await(10, TimeUnit.SECONDS));

        }
        finally {
            ignite.message().stopLocalListen("localListenerTopic", c);
        }
    }

    /**
     * Server sends a message and client receives it.
     *
     * @param async Async message send flag.
     * @throws Exception If failed.
     */
    private void serverClientMessage(boolean async) throws Exception {
        Ignite ignite = grid(SERVER_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forClients();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendMessages(ignite, grp, async);
    }

    /**
     * Client sends a message and client receives it.
     *
     * @param async Async message send flag.
     * @throws Exception If failed.
     */
    private void clientClientMessage(boolean async) throws Exception {
        Ignite ignite = grid(CLIENT_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forClients();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendMessages(ignite, grp, async);
    }

    /**
     * Client sends a message and client receives it.
     *
     * @param async Async message send flag.
     * @throws Exception If failed.
     */
    private void clientServerMessage(boolean async) throws Exception {
        Ignite ignite = grid(CLIENT_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forServers();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendMessages(ignite, grp, async);
    }

    /**
     * @param ignite Ignite.
     * @param grp Cluster group.
     * @param async Async message send flag.
     * @throws Exception If fail.
     */
    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
        int messages = MSGS;

        LATCH = new CountDownLatch(grp.nodes().size() * messages);

        UUID opId = registerListener(grp);

        try {
            for (int i = 0; i < messages; i++)
                sendMessage(ignite, grp, value(i), async);

            assertTrue(LATCH.await(10, TimeUnit.SECONDS));

        }
        finally {
            ignite.message().stopRemoteListen(opId);
        }
    }

    /**
     *
     * @throws Exception If fail.
     */
    private void collectionMessage() throws Exception {
        Ignite ignite = grid(SERVER_NODE_IDX);

        ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();

        assert !grp.nodes().isEmpty();

        int messages = MSGS;

        LATCH = new CountDownLatch(grp.nodes().size() * messages);

        UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new MessageListener());

        try {
            List<Object> msgs = new ArrayList<>();
            for (int i = 0; i < messages; i++)
                msgs.add(value(i));

            ignite.message(grp).send(MESSAGE_TOPIC, msgs);

            assertTrue(LATCH.await(10, TimeUnit.SECONDS));

        }
        finally {
            ignite.message().stopRemoteListen(opId);
        }

    }

    /**
     * @throws Exception If fail.
     */
    private void orderedMessage() throws Exception {
        Ignite ignite = grid(SERVER_NODE_IDX);

        ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendOrderedMessages(ignite, grp);
    }

    /**
     * @throws Exception If fail.
     */
    private void clientServerOrderedMessage() throws Exception {
        Ignite ignite = grid(CLIENT_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forServers();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendOrderedMessages(ignite, grp);
    }

    /**
     * @throws Exception If fail.
     */
    private void clientClientOrderedMessage() throws Exception {
        Ignite ignite = grid(CLIENT_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forClients();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendOrderedMessages(ignite, grp);
    }

    /**
     * @throws Exception If fail.
     */
    private void serverClientOrderedMessage() throws Exception {
        Ignite ignite = grid(SERVER_NODE_IDX);

        ClusterGroup grp = ignite.cluster().forClients();

        assert !grp.nodes().isEmpty();

        registerListenerAndSendOrderedMessages(ignite, grp);
    }

    /**
     * @param ignite Ignite.
     * @param grp Cluster group.
     * @throws Exception If fail.
     */
    private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
        int messages = MSGS;

        LATCH = new CountDownLatch(grp.nodes().size() * messages);

        UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());

        try {
            for (int i = 0; i < messages; i++)
                ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);

            assertTrue(LATCH.await(10, TimeUnit.SECONDS));

        }
        finally {
            ignite.message().stopRemoteListen(opId);
        }

    }

    /**
     * @param nodeSnd Sender Ignite node.
     * @param grp Cluster group.
     * @param msg Message.
     * @param async Async message send flag.
     */
    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) {
        if (async)
            nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
        else
            nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
    }

    /**
     * @param grp Cluster group.
     * @return Message listener uuid.
     * @throws Exception If failed.
     */
    private UUID registerListener(ClusterGroup grp) throws Exception {
        Ignite ignite = grid(SERVER_NODE_IDX);

        IgniteBiPredicate<UUID, Object> lsnr = new MessageListener();

        return ignite.message(grp).remoteListen(MESSAGE_TOPIC, lsnr);
    }

    /**
     * Ignite predicate.
     */
    private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
        /**
         * Default constructor.
         */
        public MessageListener() {
            // No-op.
        }

        /** {@inheritDoc} */
        @Override public boolean apply(UUID nodeId, Object msg) {
            LATCH.countDown();

            return true;
        }
    }

    /**
     * Ignite order predicate.
     */
    private static class OrderedMessageListener implements IgniteBiPredicate<UUID, TestObject> {
        /**
         * Counter.
         */
        private AtomicInteger cntr;

        /**
         * Default constructor.
         */
        OrderedMessageListener() {
            cntr = new AtomicInteger(0);
        }

        /** {@inheritDoc} */
        @Override public boolean apply(UUID nodeId, TestObject msg) {
            assertEquals(cntr.getAndIncrement(), msg.value());

            LATCH.countDown();

            return true;
        }
    }
}
