/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static org.apache.ignite.events.EventType.EVT_JOB_STARTED;

/**
 * Abstract test for {@link org.apache.ignite.cluster.ClusterGroup}
 */
@SuppressWarnings("deprecation")
public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest implements Externalizable {
    /** Waiting timeout. */
    private static final int WAIT_TIMEOUT = 30000;

    /** Utility static variable. */
    private static final AtomicInteger cnt = new AtomicInteger(0);

    /** Mutex. */
    private static final Object mux = new Object();

    /** Projection. */
    private ClusterGroup prj;

    /** Runnable job. */
    private IgniteRunnable runJob = new TestRunnable();

    /** Callable job. */
    private IgniteCallable<String> calJob = new TestCallable<>();

    /** Closure job. */
    private IgniteClosure<String, String> clrJob = new IgniteClosure<String, String>() {
        @Override public String apply(String s) {
            return s;
        }

        @Override public String toString() {
            return "clrJob";
        }
    };

    /** Reducer. */
    private IgniteReducer<String, Object> rdc = new IgniteReducer<String, Object>() {
        @Override public boolean collect(String e) {
            return true;
        }

        @Nullable @Override public Object reduce() {
            return null;
        }

        @Override public String toString() {
            return "rdc";
        }
    };

    /** */
    protected ClusterGroupAbstractTest() {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setIncludeEventTypes(EventType.EVTS_ALL);

        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        prj = projection();

        cnt.set(0);
    }

    /**
     * @return Projection.
     */
    protected abstract ClusterGroup projection();

    /**
     * @return Local node ID.
     */
    @Nullable protected abstract UUID localNodeId();

    /**
     * @return Remote nodes IDs.
     */
    protected Collection<UUID> remoteNodeIds() {
        return F.nodeIds(projection().forRemotes().nodes());
    }

    /**
     * @return Projection size.
     */
    private int projectionSize() {
        int size = localNodeId() != null ? 1 : 0;

        size += remoteNodeIds().size();

        assert size > 0;

        return size;
    }

    /**
     * @return Collection of projection node IDs.
     */
    private Collection<UUID> projectionNodeIds() {
        Collection<UUID> ids = new LinkedList<>();

        UUID id = localNodeId();

        if (id != null)
            ids.add(id);

        ids.addAll(remoteNodeIds());

        assert !ids.isEmpty();

        return ids;
    }

    /**
     * Test for projection on not existing node IDs.
     */
    @Test
    public void testInvalidProjection() {
        Collection<UUID> ids = new HashSet<>();

        ids.add(UUID.randomUUID());
        ids.add(UUID.randomUUID());

        ClusterGroup invalidPrj = prj.forNodeIds(ids);

        assertEquals(0, invalidPrj.nodes().size());
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testProjection() throws Exception {
        assert prj != null;

        assert prj.ignite() != null;
        assert prj.predicate() != null;

        int size = projectionSize();

        assert prj.nodes().size() == size;

        Collection<UUID> nodeIds = projectionNodeIds();

        for (ClusterNode node : prj.nodes())
            assert nodeIds.contains(node.id());
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testRemoteNodes() throws Exception {
        Collection<UUID> remoteNodeIds = remoteNodeIds();

        UUID locNodeId = localNodeId();

        int size = remoteNodeIds.size();

        String name = "oneMoreGrid";

        try {
            Ignite g = startGrid(name);

            UUID excludedId = g.cluster().localNode().id();

            assertEquals(size, prj.forRemotes().nodes().size());

            for (ClusterNode node : prj.forRemotes().nodes()) {
                UUID id = node.id();

                assert !id.equals(locNodeId) && remoteNodeIds.contains(id) && !excludedId.equals(id);
            }
        }
        finally {
            stopGrid(name);
        }
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testRemoteProjection() throws Exception {
        Collection<UUID> remoteNodeIds = remoteNodeIds();

        ClusterGroup remotePrj = projection().forRemotes();

        Collection<UUID> prjNodeIds = F.nodeIds(remotePrj.nodes());

        assert prjNodeIds.size() == remoteNodeIds.size();

        assert prjNodeIds.containsAll(remoteNodeIds());

        assert !prjNodeIds.contains(localNodeId());

        String name = "oneMoreGrid";

        try {
            Ignite g = startGrid(name);

            UUID excludedId = g.cluster().localNode().id();

            assert !F.nodeIds(remotePrj.nodes()).contains(excludedId);
        }
        finally {
            stopGrid(name);
        }
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testExecution() throws Exception {
        String name = "oneMoreGrid";

        Collection<IgniteBiTuple<Ignite, IgnitePredicate<Event>>> lsnrs = new LinkedList<>();

        try {
            final AtomicInteger cnt = new AtomicInteger();

            Ignite g = startGrid(name);

            IgnitePredicate<Event> lsnr;

            if (!IgniteCluster.class.isAssignableFrom(projection().getClass())) {
                g.events().localListen(lsnr = new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        assert evt.type() == EVT_JOB_STARTED;

                        assert false;

                        return true;
                    }
                }, EVT_JOB_STARTED);

                lsnrs.add(F.t(g, lsnr));
            }

            for (ClusterNode node : prj.nodes()) {
                g = G.ignite(node.id());

                g.events().localListen(lsnr = new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        assert evt.type() == EVT_JOB_STARTED;

                        synchronized (mux) {
                            cnt.incrementAndGet();

                            mux.notifyAll();
                        }

                        return true;
                    }
                }, EVT_JOB_STARTED);

                lsnrs.add(F.t(g, lsnr));
            }

            run1(cnt);
            run2(cnt);

            call1(cnt);
            call2(cnt);
            call3(cnt);
            call4(cnt);
            call5(cnt);

            forkjoin1(cnt);
            forkjoin2(cnt);

            exec1(cnt);
            exec2(cnt);

            executorService(cnt);

            checkActiveFutures();
        }
        finally {
            for (IgniteBiTuple<Ignite, IgnitePredicate<Event>> t : lsnrs)
                t.get1().events().stopLocalListen(t.get2(), EVT_JOB_STARTED);

            stopGrid(name);
        }
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void run1(AtomicInteger cnt) throws Exception {
        IgniteFuture<Void> fut = compute(prj).broadcastAsync(runJob);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).broadcast(runJob);

        waitForValue(cnt, projectionSize());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void run2(AtomicInteger cnt) throws Exception {
        Collection<IgniteRunnable> jobs = F.asList(runJob);

        IgniteFuture<Void> fut = compute(prj).runAsync(jobs);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).run(jobs);

        waitForValue(cnt, jobs.size());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void call1(AtomicInteger cnt) throws Exception {
        IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(calJob);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).broadcast(calJob);

        waitForValue(cnt, projectionSize());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void call2(AtomicInteger cnt) throws Exception {
        Collection<IgniteCallable<String>> jobs = F.asList(calJob);

        IgniteFuture<Collection<String>> fut = compute(prj).callAsync(jobs);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).call(jobs);

        waitForValue(cnt, jobs.size());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void call3(AtomicInteger cnt) throws Exception {
        IgniteFuture<String> fut = compute(prj).applyAsync(clrJob, (String)null);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).apply(clrJob, (String)null);

        waitForValue(cnt, 1);
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void call4(AtomicInteger cnt) throws Exception {
        Collection<String> args = F.asList("a", "b", "c");

        IgniteFuture<Collection<String>> fut = compute(prj).applyAsync(clrJob, args);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).apply(clrJob, args);

        waitForValue(cnt, args.size());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void call5(AtomicInteger cnt) throws Exception {
        IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(new TestClosure(), "arg");

        waitForExecution(fut);

        cnt.set(0);

        Collection<String> res = compute(prj).broadcast(new TestClosure(), "arg");

        assertEquals(projectionSize(), res.size());

        waitForValue(cnt, projectionSize());

        for (String resStr : res)
            assertEquals("arg", resStr);
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void forkjoin1(AtomicInteger cnt) throws Exception {
        Collection<String> args = F.asList("a", "b", "c");

        IgniteFuture fut = compute(prj).applyAsync(clrJob, args, rdc);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).apply(clrJob, args, rdc);

        waitForValue(cnt, args.size());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void forkjoin2(AtomicInteger cnt) throws Exception {
        Collection<IgniteCallable<String>> jobs = F.asList(calJob);

        IgniteFuture<Object> fut = compute(prj).callAsync(jobs, rdc);

        waitForExecution(fut);

        cnt.set(0);

        compute(prj).call(jobs, rdc);

        waitForValue(cnt, jobs.size());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void exec1(AtomicInteger cnt) throws Exception {
        cnt.set(0);

        compute(prj).execute(TestTask.class.getName(), null);

        waitForValue(cnt, projectionSize());

        cnt.set(0);

        compute(prj).execute(new TestTask(), null);

        waitForValue(cnt, projectionSize());

        cnt.set(0);

        compute(prj).execute(TestTask.class, null);

        waitForValue(cnt, projectionSize());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void exec2(AtomicInteger cnt) throws Exception {
        cnt.set(0);

        compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class.getName(), null);

        waitForValue(cnt, projectionSize());

        cnt.set(0);

        compute(prj).withTimeout(WAIT_TIMEOUT).execute(new TestTask(), null);

        waitForValue(cnt, projectionSize());

        cnt.set(0);

        compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class, null);

        waitForValue(cnt, projectionSize());
    }

    /**
     * @param cnt Counter.
     * @throws Exception If failed.
     */
    private void executorService(AtomicInteger cnt) throws Exception {
        cnt.set(0);

        ExecutorService execSrvc = prj.ignite().executorService(prj);

        Future<String> fut = execSrvc.submit(new TestCallable<String>() {
            @Override public String call() throws Exception {
                return "submit1";
            }
        });

        waitForValue(cnt, 1);

        assertEquals("submit1", fut.get());

        cnt.set(0);

        fut = execSrvc.submit(new TestRunnable(), "submit2");

        waitForValue(cnt, 1);

        assertEquals("submit2", fut.get());

        cnt.set(0);

        Future<?> runFut = execSrvc.submit(new TestRunnable());

        waitForValue(cnt, 1);

        runFut.get();
    }

    /**
     * @param fut Execution future.
     * @throws InterruptedException Thrown if wait was interrupted.
     */
    @SuppressWarnings({"UnconditionalWait"})
    private void waitForExecution(IgniteFuture fut) throws InterruptedException {
        long sleep = 250;

        long threshold = System.currentTimeMillis() + WAIT_TIMEOUT;

        do {
            synchronized (mux) {
                mux.wait(sleep);
            }
        }
        while (fut != null && !fut.isDone() && !fut.isCancelled() && threshold > System.currentTimeMillis());

        assert fut == null || fut.isDone();
    }

    /**
     * @param cnt Counter to check.
     * @param val Value to check.
     * @throws InterruptedException Thrown if wait was interrupted.
     */
    private void waitForValue(AtomicInteger cnt, int val) throws InterruptedException {
        assert cnt != null;
        assert val > 0;

        long threshold = System.currentTimeMillis() + WAIT_TIMEOUT;

        long time;

        while (threshold > (time = System.currentTimeMillis()))
            synchronized (mux) {
                if (cnt.get() == val)
                    break;

                mux.wait(threshold - time);
            }

        assert cnt.get() == val;
    }

    /**
     * @throws Exception If test failed.
     */
    private void checkActiveFutures() throws Exception {
        assertEquals(0, compute(prj).activeTaskFutures().size());

        cnt.set(0);

        Collection<IgniteFuture<Object>> futsList = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            IgniteFuture<Object> fut = compute(prj).callAsync(new TestWaitCallable<>());

            assertFalse(fut.isDone());

            Map<IgniteUuid, ComputeTaskFuture<Object>> futs = compute(prj).activeTaskFutures();

            assertEquals(i + 1, futs.size());

            assertTrue(futs.containsKey(((ComputeTaskFuture)fut).getTaskSession().getId()));

            futsList.add(fut);
        }

        synchronized (mux) {
            cnt.incrementAndGet();

            mux.notifyAll();
        }

        for (IgniteFuture<Object> fut : futsList)
            fut.get();

        assertEquals(0, compute(prj).activeTaskFutures().size());
    }

    /**
     *  Test closure.
     */
    private static class TestClosure implements IgniteClosure<String, String> {
        /** {@inheritDoc} */
        @Override public String apply(String s) {
            return s;
        }
    }

    /**
     * Test runnable.
     */
    private static class TestRunnable implements IgniteRunnable {
        /** {@inheritDoc} */
        @Override public void run() {
            // No-op.
        }
    }

    /**
     * Test callable.
     */
    private static class TestCallable<T> implements IgniteCallable<T> {
        /** {@inheritDoc} */
        @Nullable @Override public T call() throws Exception {
            return null;
        }
    }

    /**
     * Test callable.
     */
    private static class TestWaitCallable<T> implements IgniteCallable<T> {
        /** {@inheritDoc} */
        @Nullable @Override public T call() throws Exception {
            synchronized (mux) {
                while (cnt.get() == 0)
                    mux.wait();
            }

            return null;
        }
    }

    /**
     * Test task.
     */
    @SuppressWarnings({"PublicInnerClass"})
    public static class TestTask extends ComputeTaskSplitAdapter<String, Void> {
        /** {@inheritDoc} */
        @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) {
            Collection<ComputeJob> jobs = new HashSet<>();

            for (int i = 0; i < gridSize; i++)
                jobs.add(new TestJob());

            return jobs;
        }

        /** {@inheritDoc} */
        @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
            return null;
        }
    }

    /**
     * Test job.
     */
    @SuppressWarnings({"PublicInnerClass"})
    public static class TestJob extends ComputeJobAdapter {
        /** {@inheritDoc} */
        @Nullable @Override public Object execute() {
            return null;
        }
    }

    /** {@inheritDoc} */
    @Override public void writeExternal(ObjectOutput out) throws IOException {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        // No-op.
    }
}
