/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;

/**
 * Test behavior of jobs when master node has failed, but job class implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}
 * interface.
 */
@GridCommonTest(group = "Task Session")
public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
    /** Total grid count within the cloud. */
    private static final int GRID_CNT = 2;

    /** Counts how many times master-leave interface implementation was called. */
    private static volatile CountDownLatch invokeLatch;

    /** Latch which blocks job execution until main thread has sent node fail signal. */
    private static volatile CountDownLatch latch;

    /** Latch which blocks main thread until all jobs start their execution. */
    private static volatile CountDownLatch jobLatch;

    /** Should job wait for callback. */
    private static volatile boolean awaitMasterLeaveCallback = true;

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        awaitMasterLeaveCallback = true;
        latch = new CountDownLatch(1);
        jobLatch = new CountDownLatch(GRID_CNT - 1);
        invokeLatch = new CountDownLatch(GRID_CNT - 1);
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setCommunicationSpi(new CommunicationSpi());

        CacheConfiguration ccfg = defaultCacheConfiguration();

        ccfg.setCacheMode(PARTITIONED);

        cfg.setCacheConfiguration(ccfg);

        return cfg;
    }

    /**
     * Get predicate which allows task execution on all nodes except the last one.
     *
     * @return Predicate.
     */
    private IgnitePredicate<ClusterNode> excludeLastPredicate() {
        return new IgnitePredicate<ClusterNode>() {
            @Override public boolean apply(ClusterNode e) {
                return !e.id().equals(grid(GRID_CNT - 1).localNode().id());
            }
        };
    }

    /**
     * Constructor.
     */
    public GridJobMasterLeaveAwareSelfTest() {
        super(/* don't start grid */ false);
    }

    /**
     * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked on job which is initiated by
     * master and is currently running on it.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testLocalJobOnMaster() throws Exception {
        invokeLatch = new CountDownLatch(1);
        jobLatch = new CountDownLatch(1);

        Ignite g = startGrid(0);

        g.compute().executeAsync(new TestTask(1), null);

        jobLatch.await();

        // Count down the latch in a separate thread.
        new Thread(new Runnable() {
            @Override public void run() {
                try {
                    U.sleep(500);
                }
                catch (IgniteInterruptedCheckedException ignore) {
                    // No-op.
                }

                latch.countDown();
            }
        }).start();

        stopGrid(0, true);

        latch.countDown();

        assert invokeLatch.await(5000, MILLISECONDS);
    }

    /**
     * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked
     * when master node leaves topology normally.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testMasterStoppedNormally() throws Exception {
        // Start grids.
        for (int i = 0; i < GRID_CNT; i++)
            startGrid(i);

        int lastGridIdx = GRID_CNT - 1;

        compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
            .executeAsync(new TestTask(GRID_CNT - 1), null);

        jobLatch.await();

        stopGrid(lastGridIdx, true);

        latch.countDown();

        assert invokeLatch.await(5000, MILLISECONDS);
    }

    /**
     * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology
     * abruptly (e.g. due to a network failure or immediate node shutdown).
     *
     * @throws Exception If failed.
     */
    @Test
    public void testMasterStoppedAbruptly() throws Exception {
        // Start grids.
        for (int i = 0; i < GRID_CNT; i++)
            startGrid(i);

        int lastGridIdx = GRID_CNT - 1;

        compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
            .executeAsync(new TestTask(GRID_CNT - 1), null);

        jobLatch.await();

        ((CommunicationSpi)grid(lastGridIdx).configuration().getCommunicationSpi()).blockMessages();

        stopGrid(lastGridIdx, true);

        latch.countDown();

        assert invokeLatch.await(5000, MILLISECONDS);
    }

    /**
     * Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when fails to send
     * {@link GridJobExecuteResponse} to master node.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testCannotSendJobExecuteResponse() throws Exception {
        awaitMasterLeaveCallback = false;

        // Start grids.
        for (int i = 0; i < GRID_CNT; i++)
            startGrid(i);

        int lastGridIdx = GRID_CNT - 1;

        compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
            .executeAsync(new TestTask(GRID_CNT - 1), null);

        jobLatch.await();

        for (int i = 0; i < lastGridIdx; i++)
            ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).waitLatch();

        latch.countDown();

        // Ensure that all worker nodes has already started job response sending.
        for (int i = 0; i < lastGridIdx; i++)
            ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).awaitResponse();

        // Now we stop master grid.
        stopGrid(lastGridIdx, true);

        waitForTopology(GRID_CNT - 1);

        // Release communication SPI wait latches. As master node is stopped, job worker will receive and exception.
        for (int i = 0; i < lastGridIdx; i++)
            ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch();

        assert invokeLatch.await(5000, MILLISECONDS);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testApply1() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup grid) {
                return compute(grid).applyAsync(new TestClosure(), "arg");
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testApply2() throws Exception {
        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup grid) {
                return compute(grid).applyAsync(new TestClosure(), Arrays.asList("arg1", "arg2"));
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testApply3() throws Exception {
        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup grid) {
                return compute(grid).applyAsync(new TestClosure(),
                    Arrays.asList("arg1", "arg2"),
                    new IgniteReducer<Void, Object>() {
                        @Override public boolean collect(@Nullable Void aVoid) {
                            return true;
                        }

                        @Override public Object reduce() {
                            return null;
                        }
                    });
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRun1() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).runAsync(new TestRunnable());
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRun2() throws Exception {
        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).runAsync(Arrays.asList(new TestRunnable(), new TestRunnable()));
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCall1() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).callAsync(new TestCallable());
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCall2() throws Exception {
        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).callAsync(Arrays.asList(new TestCallable(), new TestCallable()));
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCall3() throws Exception {
        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).callAsync(
                    Arrays.asList(new TestCallable(), new TestCallable()),
                    new IgniteReducer<Void, Object>() {
                        @Override public boolean collect(@Nullable Void aVoid) {
                            return true;
                        }

                        @Override public Object reduce() {
                            return null;
                        }
                    });
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBroadcast1() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).broadcastAsync(new TestRunnable());
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBroadcast2() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).broadcastAsync(new TestCallable());
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBroadcast3() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                return compute(prj).broadcastAsync(new TestClosure(), "arg");
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testAffinityRun() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                Affinity<Object> aff = prj.ignite().affinity(DEFAULT_CACHE_NAME);

                ClusterNode node = F.first(prj.nodes());

                return compute(prj).affinityRunAsync(DEFAULT_CACHE_NAME, keyForNode(aff, node), new TestRunnable());
            }
        });
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testAffinityCall() throws Exception {
        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
            @Override public IgniteFuture<?> applyx(ClusterGroup prj) {
                Affinity<Object> aff = prj.ignite().affinity(DEFAULT_CACHE_NAME);

                ClusterNode node = F.first(prj.nodes());

                return compute(prj).affinityCallAsync(DEFAULT_CACHE_NAME, keyForNode(aff, node), new TestCallable());
            }
        });
    }

    /**
     * @param aff Cache affinity.
     * @param node Node.
     * @return Finds some cache key for which given node is primary.
     */
    private Object keyForNode(Affinity<Object> aff, ClusterNode node) {
        assertNotNull(node);

        Object key = null;

        for (int i = 0; i < 1000; i++) {
            if (aff.isPrimary(node, i)) {
                key = i;

                break;
            }
        }

        assertNotNull(key);

        return key;
    }

    /**
     * @param expJobs Expected jobs number.
     * @param taskStarter Task started.
     * @throws Exception If failed.
     */
    private void testMasterLeaveAwareCallback(int expJobs, IgniteClosure<ClusterGroup, IgniteFuture<?>> taskStarter)
        throws Exception {
        jobLatch = new CountDownLatch(expJobs);
        invokeLatch = new CountDownLatch(expJobs);

        for (int i = 0; i < GRID_CNT; i++)
            startGrid(i);

        int lastGridIdx = GRID_CNT - 1;

        IgniteFuture<?> fut = taskStarter.apply(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()));

        jobLatch.await();

        stopGrid(lastGridIdx, true);

        latch.countDown();

        assert invokeLatch.await(5000, MILLISECONDS);

        GridTestUtils.assertThrows(log, () -> fut.get(), IgniteException.class, null);
    }

    /**
     */
    private static class TestMasterLeaveAware {
        /** */
        private final CountDownLatch latch0 = new CountDownLatch(1);

        /**
         * @param log Logger.
         */
        private void execute(IgniteLogger log) {
            try {
                log.info("Started execute.");

                // Countdown shared job latch so that the main thread know that all jobs are
                // inside the "execute" routine.
                jobLatch.countDown();

                log.info("After job latch.");

                // Await for the main thread to allow jobs to proceed.
                latch.await();

                log.info("After latch.");

                if (awaitMasterLeaveCallback) {
                    latch0.await();

                    log.info("After latch0.");
                }
                else
                    log.info("Latch 0 skipped.");
            }
            catch (InterruptedException e) {
                // We do not expect any interruptions here, hence this statement.
                fail("Unexpected exception: " + e);
            }
        }

        /**
         * @param log Logger.
         * @param job Actual job.
         */
        private void onMasterLeave(IgniteLogger log, Object job) {
            log.info("Callback executed: " + job);

            latch0.countDown();

            invokeLatch.countDown();
        }
    }

    /**
     * Master leave aware callable.
     */
    private static class TestCallable implements IgniteCallable<Void>, ComputeJobMasterLeaveAware {
        /** Task session. */
        @LoggerResource
        private IgniteLogger log;

        /** */
        private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();

        /** {@inheritDoc} */
        @Override public Void call() throws Exception {
            masterLeaveAware.execute(log);

            return null;
        }

        /** {@inheritDoc} */
        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
            masterLeaveAware.onMasterLeave(log, this);
        }
    }

    /**
     * Master leave aware runnable.
     */
    private static class TestRunnable implements IgniteRunnable, ComputeJobMasterLeaveAware {
        /** Task session. */
        @LoggerResource
        private IgniteLogger log;

        /** */
        private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();

        /** {@inheritDoc} */
        @Override public void run() {
            masterLeaveAware.execute(log);
        }

        /** {@inheritDoc} */
        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
            masterLeaveAware.onMasterLeave(log, this);
        }
    }

    /**
     * Master leave aware closure.
     */
    private static class TestClosure implements IgniteClosure<String, Void>, ComputeJobMasterLeaveAware {
        /** Task session. */
        @LoggerResource
        private IgniteLogger log;

        /** */
        private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();

        /** {@inheritDoc} */
        @Override public Void apply(String arg) {
            masterLeaveAware.execute(log);

            return null;
        }

        /** {@inheritDoc} */
        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
            masterLeaveAware.onMasterLeave(log, this);
        }
    }

    /**
     * Base implementation of dummy task which produces predefined amount of test jobs on split stage.
     */
    private static class TestTask extends ComputeTaskSplitAdapter<String, Integer> {
        /** How many jobs to produce. */
        private int jobCnt;

        /** */
        @TaskSessionResource
        private ComputeTaskSession taskSes;

        /**
         * Constructor.
         *
         * @param jobCnt How many jobs to produce on split stage.
         */
        private TestTask(int jobCnt) {
            this.jobCnt = jobCnt;
        }

        /** {@inheritDoc} */
        @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) {
            Collection<ComputeJobAdapter> jobs = new ArrayList<>(jobCnt);

            for (int i = 0; i < jobCnt; i++)
                jobs.add(new TestJob());

            return jobs;
        }

        /** {@inheritDoc} */
        @Override public Integer reduce(List<ComputeJobResult> results) {
            return null;
        }
    }

    /**
     * Base implementation of dummy test job.
     */
    private static class TestJob extends ComputeJobAdapter implements ComputeJobMasterLeaveAware {
        /** Task session. */
        @LoggerResource
        private IgniteLogger log;

        /** */
        private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();

        /**
         * Constructor
         */
        private TestJob() {
            super(new Object());
        }

        /** {@inheritDoc} */
        @Override public Object execute() {
            masterLeaveAware.execute(log);

            return null;
        }

        /** {@inheritDoc} */
        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
            masterLeaveAware.onMasterLeave(log, this);
        }
    }

    /**
     * Communication SPI which could optionally block outgoing messages.
     */
    private static class CommunicationSpi extends TcpCommunicationSpi {
        /** Whether to block all outgoing messages. */
        private volatile boolean block;

        /** Job execution response latch. */
        private CountDownLatch respLatch = new CountDownLatch(1);

        /** Whether to wait for a wait latch before returning. */
        private volatile boolean wait;

        /** Wait latch. */
        private CountDownLatch waitLatch = new CountDownLatch(1);

        /** {@inheritDoc} */
        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
            throws IgniteSpiException {
            sendMessage0(node, msg, ackClosure);
        }

        /**
         * Send message optionally either blocking it or throwing an exception if it is of
         * {@link GridJobExecuteResponse} type.
         *
         * @param node Destination node.
         * @param msg Message to be sent.
         * @param ackClosure Ack closure.
         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
         */
        private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
            throws IgniteSpiException {
            if (msg instanceof GridIoMessage) {
                GridIoMessage msg0 = (GridIoMessage)msg;

                if (msg0.message() instanceof GridJobExecuteResponse) {
                    respLatch.countDown();

                    if (wait) {
                        try {
                            U.await(waitLatch);
                        }
                        catch (IgniteInterruptedCheckedException ignore) {
                            // No-op.
                        }
                    }
                }
            }

            if (!block)
                super.sendMessage(node, msg, ackClosure);
        }

        /**
         * Block all outgoing message.
         */
        void blockMessages() {
            block = true;
        }

        /**
         * Whether to block on a wait latch.
         */
        private void waitLatch() {
            wait = true;
        }

        /**
         * Count down wait latch.
         */
        private void releaseWaitLatch() {
            waitLatch.countDown();
        }

        /**
         * Await for job execution response to come.
         *
         * @throws IgniteInterruptedCheckedException If interrupted.
         */
        private void awaitResponse() throws IgniteInterruptedCheckedException {
            U.await(respLatch);
        }
    }
}
