/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.stream.camel;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.squareup.okhttp.MediaType;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import com.squareup.okhttp.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ServiceStatus;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;

/**
 * Test class for {@link CamelStreamer}.
 */
public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
    /** text/plain media type. */
    private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");

    /** The test data. */
    private static final Map<Integer, String> TEST_DATA = new HashMap<>();

    /** The Camel streamer currently under test. */
    private CamelStreamer<Integer, String> streamer;

    /** The Ignite data streamer. */
    private IgniteDataStreamer<Integer, String> dataStreamer;

    /** URL where the REST service will be exposed. */
    private String url;

    /** The UUID of the currently active remote listener. */
    private UUID remoteLsnr;

    /** The OkHttpClient. */
    private OkHttpClient httpClient = new OkHttpClient();

    // Initialize the test data.
    static {
        for (int i = 0; i < 100; i++)
            TEST_DATA.put(i, "v" + i);
    }

    /** Constructor. */
    public IgniteCamelStreamerTest() {
        super(true);
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
    }

    @SuppressWarnings("unchecked")
    @Override public void beforeTest() throws Exception {
        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());

        // find an available local port
        try (ServerSocket ss = new ServerSocket(0)) {
            int port = ss.getLocalPort();

            url = "http://localhost:" + port + "/ignite";
        }

        // create Camel streamer
        dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME);
        streamer = createCamelStreamer(dataStreamer);
    }

    @Override public void afterTest() throws Exception {
        try {
            streamer.stop();
        }
        catch (Exception ignored) {
            // ignore if already stopped
        }

        dataStreamer.close();

        grid().cache(DEFAULT_CACHE_NAME).clear();
    }

    /**
     * @throws Exception
     */
    @Test
    public void testSendOneEntryPerMessage() throws Exception {
        streamer.setSingleTupleExtractor(singleTupleExtractor());

        // Subscribe to cache PUT events.
        CountDownLatch latch = subscribeToPutEvents(50);

        // Action time.
        streamer.start();

        // Send messages.
        sendMessages(0, 50, false);

        // Assertions.
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertCacheEntriesLoaded(50);
    }

    /**
     * @throws Exception
     */
    @Test
    public void testMultipleEntriesInOneMessage() throws Exception {
        streamer.setMultipleTupleExtractor(multipleTupleExtractor());

        // Subscribe to cache PUT events.
        CountDownLatch latch = subscribeToPutEvents(50);

        // Action time.
        streamer.start();

        // Send messages.
        sendMessages(0, 50, true);

        // Assertions.
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertCacheEntriesLoaded(50);
    }

    /**
     * @throws Exception
     */
    @Test
    public void testResponseProcessorIsCalled() throws Exception {
        streamer.setSingleTupleExtractor(singleTupleExtractor());
        streamer.setResponseProcessor(new Processor() {
            @Override public void process(Exchange exchange) throws Exception {
                exchange.getOut().setBody("Foo bar");
            }
        });

        // Subscribe to cache PUT events.
        CountDownLatch latch = subscribeToPutEvents(50);

        // Action time.
        streamer.start();

        // Send messages.
        List<String> responses = sendMessages(0, 50, false);

        for (String r : responses)
            assertEquals("Foo bar", r);

        // Assertions.
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertCacheEntriesLoaded(50);
    }

    /**
     * @throws Exception
     */
    @Test
    public void testUserSpecifiedCamelContext() throws Exception {
        final AtomicInteger cnt = new AtomicInteger();

        // Create a CamelContext with a probe that'll help us know if it has been used.
        CamelContext context = new DefaultCamelContext();
        context.setTracing(true);
        context.addLifecycleStrategy(new LifecycleStrategySupport() {
            @Override public void onEndpointAdd(Endpoint endpoint) {
                cnt.incrementAndGet();
            }
        });

        streamer.setSingleTupleExtractor(singleTupleExtractor());
        streamer.setCamelContext(context);

        // Subscribe to cache PUT events.
        CountDownLatch latch = subscribeToPutEvents(50);

        // Action time.
        streamer.start();

        // Send messages.
        sendMessages(0, 50, false);

        // Assertions.
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertCacheEntriesLoaded(50);
        assertTrue(cnt.get() > 0);
    }

    /**
     * @throws Exception
     */
    @Test
    public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception {
        // Create a CamelContext with a custom property placeholder.
        CamelContext context = new DefaultCamelContext();

        PropertiesComponent pc = new PropertiesComponent("camel.test.properties");

        context.addComponent("properties", pc);

        // Replace the context path in the test URL with the property placeholder.
        url = url.replaceAll("/ignite", "{{test.contextPath}}");

        // Recreate the Camel streamer with the new URL.
        streamer = createCamelStreamer(dataStreamer);

        streamer.setSingleTupleExtractor(singleTupleExtractor());
        streamer.setCamelContext(context);

        // Subscribe to cache PUT events.
        CountDownLatch latch = subscribeToPutEvents(50);

        // Action time.
        streamer.start();

        // Before sending the messages, get the actual URL after the property placeholder was resolved,
        // stripping the jetty: prefix from it.
        url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", "");

        // Send messages.
        sendMessages(0, 50, false);

        // Assertions.
        assertTrue(latch.await(10, TimeUnit.SECONDS));
        assertCacheEntriesLoaded(50);
    }

    /**
     * @throws Exception
     */
    @Test
    public void testInvalidEndpointUri() throws Exception {
        streamer.setSingleTupleExtractor(singleTupleExtractor());
        streamer.setEndpointUri("abc");

        // Action time.
        try {
            streamer.start();
            fail("Streamer started; should have failed.");
        }
        catch (IgniteException ignored) {
            assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
            assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
        }
    }

    /**
     * Creates a Camel streamer.
     */
    private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
        CamelStreamer<Integer, String> streamer = new CamelStreamer<>();

        streamer.setIgnite(grid());
        streamer.setStreamer(dataStreamer);
        streamer.setEndpointUri("jetty:" + url);

        dataStreamer.allowOverwrite(true);
        dataStreamer.autoFlushFrequency(1);

        return streamer;
    }

    /**
     * @throws IOException
     * @return HTTP response payloads.
     */
    private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
        List<String> responses = Lists.newArrayList();

        if (singleMessage) {
            StringBuilder sb = new StringBuilder();

            for (int i = fromIdx; i < fromIdx + cnt; i++)
                sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");

            Request request = new Request.Builder()
                .url(url)
                .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
                .build();

            Response response = httpClient.newCall(request).execute();

            responses.add(response.body().string());
        }
        else {
            for (int i = fromIdx; i < fromIdx + cnt; i++) {
                String payload = i + "," + TEST_DATA.get(i);

                Request request = new Request.Builder()
                    .url(url)
                    .post(RequestBody.create(TEXT_PLAIN, payload))
                    .build();

                Response response = httpClient.newCall(request).execute();

                responses.add(response.body().string());
            }
        }

        return responses;
    }

    /**
     * Returns a {@link StreamSingleTupleExtractor} for testing.
     */
    private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() {
        return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
            @Override public Map.Entry<Integer, String> extract(Exchange exchange) {
                List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));

                return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
            }
        };
    }

    /**
     * Returns a {@link StreamMultipleTupleExtractor} for testing.
     */
    private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() {
        return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
            @Override public Map<Integer, String> extract(Exchange exchange) {
                final Map<String, String> map = Splitter.on("\n")
                    .omitEmptyStrings()
                    .withKeyValueSeparator(",")
                    .split(exchange.getIn().getBody(String.class));

                final Map<Integer, String> answer = new HashMap<>();

                F.forEach(map.keySet(), new IgniteInClosure<String>() {
                    @Override public void apply(String s) {
                        answer.put(Integer.parseInt(s), map.get(s));
                    }
                });

                return answer;
            }
        };
    }

    /**
     * Subscribe to cache put events.
     */
    private CountDownLatch subscribeToPutEvents(int expect) {
        Ignite ignite = grid();

        // Listen to cache PUT events and expect as many as messages as test data items
        final CountDownLatch latch = new CountDownLatch(expect);
        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
            new IgniteBiPredicate<UUID, CacheEvent>() {
            @Override public boolean apply(UUID uuid, CacheEvent evt) {
                latch.countDown();

                return true;
            }
        };

        remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME))
            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);

        return latch;
    }

    /**
     * Assert a given number of cache entries have been loaded.
     */
    private void assertCacheEntriesLoaded(int cnt) {
        // get the cache and check that the entries are present
        IgniteCache<Integer, String> cache = grid().cache(DEFAULT_CACHE_NAME);

        // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
            assertEquals(TEST_DATA.get(key), cache.get(key));

        // assert that the cache exactly the specified amount of elements
        assertEquals(cnt, cache.size(CachePeekMode.ALL));

        // remove the event listener
        grid().events(grid().cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopRemoteListen(remoteLsnr);
    }

}
