/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.event.it;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerMethod;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.ops4j.pax.exam.CoreOptions.options;
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;

@RunWith(PaxExam.class)
@ExamReactorStrategy(PerMethod.class)
public class ChaosTest extends AbstractJobHandlingTest {

    /** Duration for firing jobs in seconds. */
    private static final long DURATION = 1 * 60;

    private static final int NUM_ORDERED_THREADS = 3;
    private static final int NUM_PARALLEL_THREADS = 6;
    private static final int NUM_ROUND_THREADS = 6;

    private static final int NUM_ORDERED_TOPICS = 2;
    private static final int NUM_PARALLEL_TOPICS = 8;
    private static final int NUM_ROUND_TOPICS = 8;

    private static final String ORDERED_TOPIC_PREFIX = "sling/chaos/ordered/";
    private static final String PARALLEL_TOPIC_PREFIX = "sling/chaos/parallel/";
    private static final String ROUND_TOPIC_PREFIX = "sling/chaos/round/";

    private static final String[] ORDERED_TOPICS = new String[NUM_ORDERED_TOPICS];
    private static final String[] PARALLEL_TOPICS = new String[NUM_PARALLEL_TOPICS];
    private static final String[] ROUND_TOPICS = new String[NUM_ROUND_TOPICS];

    static {
        for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
            ORDERED_TOPICS[i] = ORDERED_TOPIC_PREFIX + String.valueOf(i);
        }
        for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
            PARALLEL_TOPICS[i] = PARALLEL_TOPIC_PREFIX + String.valueOf(i);
        }
        for(int i=0; i<NUM_ROUND_TOPICS; i++) {
            ROUND_TOPICS[i] = ROUND_TOPIC_PREFIX + String.valueOf(i);
        }
    }

    @Configuration
    public Option[] configuration() {
        return options(
            baseConfiguration(),
            // create ordered test queue
            factoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration")
                .put(ConfigurationConstants.PROP_NAME, "chaos-ordered")
                .put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name())
                .put(ConfigurationConstants.PROP_TOPICS, ORDERED_TOPICS)
                .put(ConfigurationConstants.PROP_RETRIES, 2)
                .put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L)
                .asOption(),
            // create round robin test queue
            factoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration")
                .put(ConfigurationConstants.PROP_NAME, "chaos-roundrobin")
                .put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name())
                .put(ConfigurationConstants.PROP_TOPICS, ROUND_TOPICS)
                .put(ConfigurationConstants.PROP_RETRIES, 2)
                .put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L)
                .put(ConfigurationConstants.PROP_MAX_PARALLEL, 5)
                .asOption()
        );
    }

    /**
     * Setup consumers
     */
    private void setupJobConsumers() {
        for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
            this.registerJobConsumer(ORDERED_TOPICS[i],

                new JobConsumer() {

                    @Override
                    public JobResult process(final Job job) {
                        return JobResult.OK;
                    }
                });
        }
        for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
            this.registerJobConsumer(PARALLEL_TOPICS[i],

                new JobConsumer() {

                    @Override
                    public JobResult process(final Job job) {
                        return JobResult.OK;
                    }
                });
        }
        for(int i=0; i<NUM_ROUND_TOPICS; i++) {
            this.registerJobConsumer(ROUND_TOPICS[i],

                new JobConsumer() {

                    @Override
                    public JobResult process(final Job job) {
                        return JobResult.OK;
                    }
                });
        }
    }

    private static final class CreateJobThread extends Thread {

        private final String[] topics;

        private final JobManager jobManager;

        private final Random random = new Random();

        final Map<String, AtomicLong> created;

        final AtomicLong finishedThreads;

        public CreateJobThread(final JobManager jobManager,
                final String[] topics,
                final Map<String, AtomicLong> created,
                final AtomicLong finishedThreads) {
            this.topics = topics;
            this.jobManager = jobManager;
            this.created = created;
            this.finishedThreads = finishedThreads;
        }

        @Override
        public void run() {
            int index = 0;
            final long startTime = System.currentTimeMillis();
            final long endTime = startTime + DURATION * 1000;
            while ( System.currentTimeMillis() < endTime ) {
                final String topic = topics[index];
                if ( jobManager.addJob(topic, null) != null ) {
                    created.get(topic).incrementAndGet();

                    index++;
                    if ( index == topics.length ) {
                        index = 0;
                    }
                }
                final int sleepTime = random.nextInt(200);
                try {
                    Thread.sleep(sleepTime);
                } catch ( final InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            finishedThreads.incrementAndGet();
        }

    }

    /**
     * Setup job creation threads
     */
    private void setupJobCreationThreads(final List<Thread> threads,
            final JobManager jobManager,
            final Map<String, AtomicLong> created,
            final AtomicLong finishedThreads) {
        for(int i=0;i<NUM_ORDERED_THREADS;i++) {
            threads.add(new CreateJobThread(jobManager, ORDERED_TOPICS, created, finishedThreads));
        }
        for(int i=0;i<NUM_PARALLEL_THREADS;i++) {
            threads.add(new CreateJobThread(jobManager, PARALLEL_TOPICS, created, finishedThreads));
        }
        for(int i=0;i<NUM_ROUND_THREADS;i++) {
            threads.add(new CreateJobThread(jobManager, ROUND_TOPICS, created, finishedThreads));
        }
    }

    /**
     * Setup chaos thread(s)
     *
     * Chaos is right now created by sending topology changing/changed events randomly
     */
    private void setupChaosThreads(final List<Thread> threads,
            final AtomicLong finishedThreads) {
        final List<TopologyView> views = new ArrayList<>();
        // register topology listener
        final ServiceRegistration<TopologyEventListener> reg = this.bc.registerService(TopologyEventListener.class, new TopologyEventListener() {

            @Override
            public void handleTopologyEvent(final TopologyEvent event) {
                if ( event.getType() == Type.TOPOLOGY_INIT ) {
                    views.add(event.getNewView());
                }
            }
        }, null);
        while ( views.isEmpty() ) {
            this.sleep(10);
        }
        reg.unregister();
        final TopologyView view = views.get(0);

        try {
            final Collection<ServiceReference<TopologyEventListener>> refs = this.bc.getServiceReferences(TopologyEventListener.class, null);
            assertNotNull(refs);
            assertFalse(refs.isEmpty());
            TopologyEventListener found = null;
            for(final ServiceReference<TopologyEventListener> ref : refs) {
                final TopologyEventListener listener = this.bc.getService(ref);
                if ( listener != null && listener.getClass().getName().equals("org.apache.sling.event.impl.jobs.config.TopologyHandler") ) {
                    found = listener;
                    break;
                }
                bc.ungetService(ref);
            }
            assertNotNull(found);
            final TopologyEventListener tel = found;

            threads.add(new Thread() {

                private final Random random = new Random();

                @Override
                public void run() {
                    final long startTime = System.currentTimeMillis();
                    // this thread runs 30 seconds longer than the job creation thread
                    final long endTime = startTime + (DURATION +30) * 1000;
                    while ( System.currentTimeMillis() < endTime ) {
                        final int sleepTime = random.nextInt(25) + 15;
                        try {
                            Thread.sleep(sleepTime * 1000);
                        } catch ( final InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING, view, null));
                        final int changingTime = random.nextInt(20) + 3;
                        try {
                            Thread.sleep(changingTime * 1000);
                        } catch ( final InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED, view, view));
                    }
                    tel.getClass().getName();
                    finishedThreads.incrementAndGet();
                }
            });
        } catch (InvalidSyntaxException e) {
            e.printStackTrace();
        }
    }

    @Test(timeout=DURATION * 16000L)
    public void testDoChaos() throws Exception {

        // setup added, created and finished map
        // added and finished are filled by notifications
        // created is filled by the threads starting jobs
        final Map<String, AtomicLong> added = new HashMap<>();
        final Map<String, AtomicLong> created = new HashMap<>();
        final Map<String, AtomicLong> finished = new HashMap<>();
        final List<String> topics = new ArrayList<>();
        for(int i=0;i<NUM_ORDERED_TOPICS;i++) {
            added.put(ORDERED_TOPICS[i], new AtomicLong());
            created.put(ORDERED_TOPICS[i], new AtomicLong());
            finished.put(ORDERED_TOPICS[i], new AtomicLong());
            topics.add(ORDERED_TOPICS[i]);
        }
        for(int i=0;i<NUM_PARALLEL_TOPICS;i++) {
            added.put(PARALLEL_TOPICS[i], new AtomicLong());
            created.put(PARALLEL_TOPICS[i], new AtomicLong());
            finished.put(PARALLEL_TOPICS[i], new AtomicLong());
            topics.add(PARALLEL_TOPICS[i]);
        }
        for(int i=0;i<NUM_ROUND_TOPICS;i++) {
            added.put(ROUND_TOPICS[i], new AtomicLong());
            created.put(ROUND_TOPICS[i], new AtomicLong());
            finished.put(ROUND_TOPICS[i], new AtomicLong());
            topics.add(ROUND_TOPICS[i]);
        }

        final List<Thread> threads = new ArrayList<>();
        final AtomicLong finishedThreads = new AtomicLong();

        this.registerEventHandler("org/apache/sling/event/notification/job/*",
                new EventHandler() {

                    @Override
                    public void handleEvent(final Event event) {
                        final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
                        if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
                            finished.get(topic).incrementAndGet();
                        } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
                            added.get(topic).incrementAndGet();
                        }
                    }
                });

        // setup job consumers
        this.setupJobConsumers();

        // setup job creation tests
        this.setupJobCreationThreads(threads, jobManager, created, finishedThreads);

        this.setupChaosThreads(threads, finishedThreads);

        System.out.println("Starting threads...");
        // start threads
        for(final Thread t : threads) {
            t.setDaemon(true);
            t.start();
        }

        System.out.println("Sleeping for " + DURATION + " seconds to wait for threads to finish...");
        // for sure we can sleep for the duration
        this.sleep(DURATION * 1000);

        System.out.println("Polling for threads to finish...");
        // wait until threads are finished
        while ( finishedThreads.get() < threads.size() ) {
            this.sleep(100);
        }

        System.out.println("Waiting for job handling to finish...");
        final Set<String> allTopics = new HashSet<>(topics);
        while ( !allTopics.isEmpty() ) {
            final Iterator<String> iter = allTopics.iterator();
            while ( iter.hasNext() ) {
                final String topic = iter.next();
                if ( finished.get(topic).get() == created.get(topic).get() ) {
                    iter.remove();
                }
            }
            this.sleep(100);
        }
/* We could try to enable this with Oak again - but right now JR observation handler is too
 * slow.
            System.out.println("Checking notifications...");
            for(final String topic : topics) {
                assertEquals("Checking topic " + topic, created.get(topic).get(), added.get(topic).get());
            }
 */

    }
}
