/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.session;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;

/**
 * Tests waiting for session attributes.
 */
@GridCommonTest(group = "Task Session")
@SuppressWarnings({"PublicInnerClass"})
public class GridSessionWaitAttributeSelfTest extends GridCommonAbstractTest {
    /** */
    private static final int ATTR_NUM = 100;

    /** */
    private static final int JOB_NUM = 10;

    /** */
    private static final long WAIT_TIMEOUT = 20000;

    /** */
    private enum WaitAttributeType {
        /** waitForAttribute(Serializable key). */
        WAIT_FOR_ATTRIBUTE_KEY,

        /** waitForAttribute(Serializable key, Serializable val). */
        WAIT_FOR_ATTRIBUTE_KEY_VAL,

        /** waitForAttribute(Serializable key, long timeout). */
        WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT,

        /** waitForAttribute(Serializable key, Serializable val, long timeout). */
        WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT,

        /** waitForAttributes(Collection<? extends Serializable> keys). */
        WAIT_FOR_ATTRIBUTES_KEYS,

        /** waitForAttributes(Map<? extends Serializable, ? extends Serializable> attrs). */
        WAIT_FOR_ATTRIBUTES_ATTRS,

        /** waitForAttributes(Collection<? extends Serializable> keys, long timeout). */
        WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT,

        /** waitForAttributes(Map<? extends Serializable, ? extends Serializable> attrs, long timeout). */
        WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration c = super.getConfiguration(igniteInstanceName);

        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();

        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));

        c.setDiscoverySpi(discoSpi);

        c.setPublicThreadPoolSize(JOB_NUM * 2);

        return c;
    }


    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        startGrid(1);
        startGrid(2);
    }

    /**
     * @param prefix Prefix.
     * @param mtd Method.
     * @param i Index.
     * @return Session attribute key.
     */
    private static String createKey(String prefix, Enum mtd, int i) {
        assert prefix != null;
        assert mtd != null;

        return prefix + "test.key." + mtd.name() + '.' + i;
    }

    /**
     * @param prefix Prefix.
     * @param mtd Method.
     * @param i Index.
     * @return Session attribute value.
     */
    private static String createValue(String prefix, Enum mtd, int i) {
        assert prefix != null;
        assert mtd != null;

        return prefix + "test.value." + mtd.name() + '.' + i;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttribute() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributeWithTimeout() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributeValue() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_VAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributeValueWithTimeout() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributeValues() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_ATTRS);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributeValuesWithTimeout() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributes() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_KEYS);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWaitAttributesWithTimeout() throws Exception {
        checkWaitAttributeMethod(WaitAttributeType.WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT);
    }

    /**
     * @param type Type.
     * @throws Exception If failed.
     */
    private void checkWaitAttributeMethod(WaitAttributeType type) throws Exception {
        assert type != null;

        Ignite ignite1 = G.ignite(getTestIgniteInstanceName() + '1');
        Ignite ignite2 = G.ignite(getTestIgniteInstanceName() + '2');

        assert ignite1 != null;
        assert ignite2 != null;

        ignite1.compute().localDeployTask(TestSessionTask.class, TestSessionTask.class.getClassLoader());

        ComputeTaskFuture<?> fut = ignite1.compute().executeAsync(TestSessionTask.class.getName(), type);

        fut.getTaskSession().mapFuture().get();

        ComputeTaskSession ses = fut.getTaskSession();

        info("Task job siblings [size=" + ses.getJobSiblings().size() + ", siblings=" + ses.getJobSiblings() + ']');

        for (int i = 0; i < ATTR_NUM; i++) {
            String key = createKey("fut", type, i);
            String val = createValue("fut", type, i);

            ses.setAttribute(key, val);
        }

        // Check all job attributes.
        for (ComputeJobSibling sibling : ses.getJobSiblings()) {
            info("Checking session attributes for sibling: " + sibling);

            checkSessionAttributes(ses, sibling.getJobId().toString(), type);
        }

        // Check that fut attributes have been set.
        checkSessionAttributes(ses, "fut", type);

        // Signal finish.
        ses.setAttribute("done", true);

        fut.get();
    }

    /**
     * @param ses Session.
     * @param prefix Prefix.
     * @param type Type.
     */
    private static void checkSessionAttributes(ComputeTaskSession ses, String prefix, WaitAttributeType type) {
        assert ses != null;
        assert type != null;

        try {
            switch (type) {
                case WAIT_FOR_ATTRIBUTE_KEY: {
                    for (int i = 0; i < ATTR_NUM; i++) {
                        String key = createKey(prefix, type, i);
                        String val = createValue(prefix, type, i);

                        Serializable obj = ses.waitForAttribute(key, 0);

                        assert obj != null :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
                        assert val.equals(obj) :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';

                        //System.out.println(
                        //    Thread.currentThread().getName() +
                        //        ":: Waited for attribute [key=" + key + ", val=" + obj + ", ses=" + ses + ']'
                        //);
                    }

                    break;
                }

                case WAIT_FOR_ATTRIBUTE_KEY_TIMEOUT: {
                    for (int i = 0; i < ATTR_NUM; i++) {
                        String key = createKey(prefix, type, i);
                        String val = createValue(prefix, type, i);

                        Serializable obj = ses.waitForAttribute(key, WAIT_TIMEOUT);

                        assert obj != null :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
                        assert val.equals(obj) :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ", receivedVal=" + obj + ']';
                    }

                    break;
                }

                case WAIT_FOR_ATTRIBUTE_KEY_VAL: {
                    for (int i = 0; i < ATTR_NUM; i++) {
                        String key = createKey(prefix, type, i);
                        String val = createValue(prefix, type, i);

                        boolean attr = ses.waitForAttribute(key, val, 0);

                        assert attr :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ']';
                    }

                    break;
                }

                case WAIT_FOR_ATTRIBUTE_KEY_VAL_TIMEOUT: {
                    for (int i = 0; i < ATTR_NUM; i++) {
                        String key = createKey(prefix, type, i);
                        String val = createValue(prefix, type, i);

                        boolean attr = ses.waitForAttribute(key, val, WAIT_TIMEOUT);

                        assert attr :
                            "Failed to wait for attribute [key=" + key + ", val=" + val + ']';
                    }

                    break;
                }

                case WAIT_FOR_ATTRIBUTES_ATTRS: {
                    Map<Object, Object> map = new HashMap<>();

                    for (int i = 0; i < ATTR_NUM; i++)
                        map.put(createKey(prefix, type, i), createValue(prefix, type, i));

                    boolean attrs = ses.waitForAttributes(map, 0);

                    assert attrs :
                        "Failed to wait for attribute [attrs=" + map + ']';

                    break;
                }

                case WAIT_FOR_ATTRIBUTES_ATTRS_TIMEOUT: {
                    Map<Object, Object> map = new HashMap<>();

                    for (int i = 0; i < ATTR_NUM; i++)
                        map.put(createKey(prefix, type, i), createValue(prefix, type, i));

                    boolean attrs = ses.waitForAttributes(map, WAIT_TIMEOUT);

                    assert attrs :
                        "Failed to wait for attribute [attrs=" + map + ']';

                    break;
                }

                case WAIT_FOR_ATTRIBUTES_KEYS: {
                    Map<Object, Object> map = new HashMap<>();

                    for (int i = 0; i < ATTR_NUM; i++)
                        map.put(createKey(prefix, type, i), createValue(prefix, type, i));

                    Map<?, ?> res = ses.waitForAttributes(map.keySet(), 0);

                    assert res != null : "Failed to wait for attribute [keys=" + map.keySet() + ']';

                    for (Map.Entry<Object, Object> entry : map.entrySet()) {
                        Object obj = res.get(entry.getKey());

                        assert obj != null : "Failed to get value from result map [key=" + entry.getKey() + ']';
                        assert entry.getValue().equals(obj) : "Fount unexpected value [key=" + entry.getKey()
                            + ", val=" + obj + ", expected=" + entry.getValue();
                    }

                    break;
                }

                case WAIT_FOR_ATTRIBUTES_KEYS_TIMEOUT: {
                    Map<Object, Object> map = new HashMap<>();

                    for (int i = 0; i < ATTR_NUM; i++)
                        map.put(createKey(prefix, type, i), createValue(prefix, type, i));

                    Map<?, ?> res = ses.waitForAttributes(map.keySet(), WAIT_TIMEOUT);

                    assert res != null : "Failed to wait for attribute [keys=" + map.keySet() + ']';

                    for (Map.Entry<Object, Object> entry : map.entrySet()) {
                        Object obj = res.get(entry.getKey());

                        assert obj != null : "Failed to get value from result map [key=" + entry.getKey() + ']';
                        assert entry.getValue().equals(obj) : "Fount unexpected value [key=" + entry.getKey()
                            + ", val=" + obj + ", expected=" + entry.getValue();
                    }

                    break;
                }

                default: {
                    assert false : "Unknown session wait type.";
                }
            }
        }
        catch (InterruptedException e) {
            throw new IgniteException("Got interrupted while waiting for session attributes.", e);
        }
    }

    /** */
    @ComputeTaskSessionFullSupport
    public static class TestSessionTask extends ComputeTaskSplitAdapter<WaitAttributeType, Object> {
        /** {@inheritDoc} */
        @Override protected Collection<TestSessionJob> split(int gridSize, WaitAttributeType type) {
            assert type != null;

            Collection<TestSessionJob> jobs = new ArrayList<>(JOB_NUM);

            for (int i = 0; i < JOB_NUM; i++)
                jobs.add(new TestSessionJob(type));

            return jobs;
        }

        /** {@inheritDoc} */
        @Override public Object reduce(List<ComputeJobResult> results) {
            return null;
        }
    }

    /** */
    public static class TestSessionJob extends ComputeJobAdapter {
        /** */
        @TaskSessionResource
        private ComputeTaskSession taskSes;

        /** */
        @JobContextResource
        private ComputeJobContext jobCtx;

        /** Logger. */
        @LoggerResource
        private IgniteLogger log;

        /**
         * @param arg Wait attribute type.
         */
        public TestSessionJob(WaitAttributeType arg) {
            super(arg);
        }

        /** {@inheritDoc} */
        @Override public Serializable execute() {
            WaitAttributeType m = argument(0);

            checkSessionAttributes(taskSes, "fut", m);

            IgniteUuid jobId = jobCtx.getJobId();

            for (int i = 0; i < ATTR_NUM; i++) {
                String key = createKey(jobId.toString(), m, i);
                String val = createValue(jobId.toString(), m, i);

                taskSes.setAttribute(key, val);
            }

            // Check that attributes just set are present.
            checkSessionAttributes(taskSes, jobId.toString(), m);

            Collection<ComputeJobSibling> siblings = taskSes.getJobSiblings();

            if (log.isInfoEnabled())
                log.info("Got siblings from job [size=" + siblings.size() + ", siblings=" + siblings + ']');

            // Check attributes from siblings.
            for (ComputeJobSibling sibling : taskSes.getJobSiblings()) {
                if (!sibling.getJobId().equals(jobId))
                    checkSessionAttributes(taskSes, sibling.getJobId().toString(), m);
            }

            try {
                taskSes.waitForAttribute("done", true, 0);
            }
            catch (InterruptedException e) {
                throw new IgniteException("Got interrupted while waiting for 'done' attribute.", e);
            }

            return null;
        }
    }
}
