/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.ignite;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.ignite.compute.IgniteComputeComponent;
import org.apache.camel.util.ObjectHelper;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.events.EventType;
import org.junit.After;
import org.junit.Test;

import static com.google.common.truth.Truth.assert_;

public class IgniteComputeTest extends AbstractIgniteTest {

    private static final List<Ignite> ADDITIONAL_INSTANCES = Lists.newArrayList();
    private static final List<UUID> LISTENERS = Lists.newArrayList();

    @Override
    protected String getScheme() {
        return "ignite-compute";
    }

    @Override
    protected AbstractIgniteComponent createComponent() {
        return IgniteComputeComponent.fromConfiguration(createConfiguration());
    }

    @Test
    public void testExecuteWithWrongPayload() {
        try {
            template.requestBody("ignite-compute:" + resourceUid + "?executionType=EXECUTE", TestIgniteComputeResources.TEST_CALLABLE, String.class);
        } catch (Exception e) {
            assert_().that(ObjectHelper.getException(RuntimeCamelException.class, e).getMessage()).startsWith("Ignite Compute endpoint with EXECUTE");
            return;
        }

        fail();
    }

    @Test
    @SuppressWarnings("unchecked")
    public void testCall() {
        TestIgniteComputeResources.COUNTER.set(0);

        // Single Callable.
        String result = template.requestBody("ignite-compute:" + resourceUid + "?executionType=CALL", TestIgniteComputeResources.TEST_CALLABLE, String.class);

        assert_().that(result).isEqualTo("hello");

        // Collection of Callables.
        Object[] callables = new Object[5];
        Arrays.fill(callables, TestIgniteComputeResources.TEST_CALLABLE);
        Collection<String> colResult = template.requestBody("ignite-compute:" + resourceUid + "?executionType=CALL", Lists.newArrayList(callables), Collection.class);

        assert_().that(colResult).containsExactly("hello", "hello", "hello", "hello", "hello").inOrder();

        // Callables with a Reducer.
        String reduced = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=CALL", Lists.newArrayList(callables),
                                                       IgniteConstants.IGNITE_COMPUTE_REDUCER, TestIgniteComputeResources.STRING_JOIN_REDUCER, String.class);

        assert_().that(reduced).isEqualTo("hellohellohellohellohello");
    }

    @Test
    public void testRun() {
        TestIgniteComputeResources.COUNTER.set(0);

        // Single Runnable.
        Object result = template.requestBody("ignite-compute:" + resourceUid + "?executionType=RUN", TestIgniteComputeResources.TEST_RUNNABLE_COUNTER, Object.class);
        assert_().that(result).isNull();
        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(1);

        // Multiple Runnables.
        Object[] runnables = new Object[5];
        Arrays.fill(runnables, TestIgniteComputeResources.TEST_RUNNABLE_COUNTER);
        result = template.requestBody("ignite-compute:" + resourceUid + "?executionType=RUN", Lists.newArrayList(runnables), Collection.class);
        assert_().that(result).isNull();
        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(6);
    }

    @Test
    @SuppressWarnings("unchecked")
    public void testBroadcast() {
        TestIgniteComputeResources.COUNTER.set(0);

        startAdditionalGridInstance();
        startAdditionalGridInstance();

        ignite().events().enableLocal(EventType.EVT_JOB_FINISHED);
        LISTENERS.add(ignite().events().remoteListen(null, TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_FINISHED));

        // Single Runnable.
        Object result = template.requestBody("ignite-compute:" + resourceUid + "?executionType=BROADCAST", TestIgniteComputeResources.TEST_RUNNABLE, Object.class);
        assert_().that(result).isNull();
        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(3);

        // Single Callable.
        Collection<String> colResult = template.requestBody("ignite-compute:" + resourceUid + "?executionType=BROADCAST", TestIgniteComputeResources.TEST_CALLABLE,
                                                            Collection.class);
        assert_().that(colResult).isNotNull();
        assert_().that(colResult).containsExactly("hello", "hello", "hello").inOrder();

        // Single Closure.
        colResult = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=BROADCAST", TestIgniteComputeResources.TEST_CLOSURE,
                                                  IgniteConstants.IGNITE_COMPUTE_PARAMS, "Camel", Collection.class);
        assert_().that(colResult).isNotNull();
        assert_().that(colResult).containsExactly("hello Camel", "hello Camel", "hello Camel").inOrder();
    }

    @Test
    public void testExecute() {
        TestIgniteComputeResources.COUNTER.set(0);

        startAdditionalGridInstance();
        startAdditionalGridInstance();

        ignite().events().enableLocal(EventType.EVT_JOB_RESULTED);
        LISTENERS.add(ignite().events().remoteListen(null, TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_RESULTED));

        // ComputeTask instance.
        String result = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=EXECUTE", TestIgniteComputeResources.COMPUTE_TASK,
                                                      IgniteConstants.IGNITE_COMPUTE_PARAMS, 10, String.class);
        assert_().that(result).isNotNull();
        assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9");

        // ComputeTask class.
        result = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=EXECUTE", TestIgniteComputeResources.COMPUTE_TASK.getClass(),
                                               IgniteConstants.IGNITE_COMPUTE_PARAMS, 10, String.class);
        assert_().that(result).isNotNull();
        assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9");
    }

    @Test
    @SuppressWarnings("unchecked")
    public void testApply() {
        TestIgniteComputeResources.COUNTER.set(0);

        // Closure with a single parameter.
        String result = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE,
                                                      IgniteConstants.IGNITE_COMPUTE_PARAMS, "Camel", String.class);
        assert_().that(result).isEqualTo("hello Camel");

        // Closure with a Collection of parameters.
        Collection<String> colResult = template.requestBodyAndHeader("ignite-compute:" + resourceUid + "?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE,
                                                                     IgniteConstants.IGNITE_COMPUTE_PARAMS, Lists.newArrayList("Camel1", "Camel2", "Camel3"), Collection.class);
        assert_().that(colResult).containsAllOf("hello Camel1", "hello Camel2", "hello Camel3");

        // Closure with a Collection of parameters and a Reducer.
        Map<String, Object> headers = ImmutableMap.<String, Object> of(IgniteConstants.IGNITE_COMPUTE_PARAMS, Lists.newArrayList("Camel1", "Camel2", "Camel3"),
                                                                       IgniteConstants.IGNITE_COMPUTE_REDUCER, TestIgniteComputeResources.STRING_JOIN_REDUCER);
        result = template.requestBodyAndHeaders("ignite-compute:" + resourceUid + "?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE, headers, String.class);
        assert_().that(result).isEqualTo("hello Camel1hello Camel2hello Camel3");
    }

    @Override
    public boolean isCreateCamelContextPerClass() {
        return true;
    }

    private void startAdditionalGridInstance() {
        ADDITIONAL_INSTANCES.add(Ignition.start(createConfiguration()));
    }

    @After
    public void stopAdditionalIgniteInstances() {
        for (Ignite ignite : ADDITIONAL_INSTANCES) {
            ignite.close();
        }
        ADDITIONAL_INSTANCES.clear();
    }

    @After
    public void stopRemoteListeners() {
        for (UUID uuid : LISTENERS) {
            ignite().events().stopRemoteListen(uuid);
        }
        LISTENERS.clear();
    }

}
