/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sling.jobs.impl;

import org.apache.sling.jobs.*;
import org.apache.sling.jobs.Types;
import org.apache.sling.jobs.impl.spi.JobStorage;
import org.apache.sling.jobs.impl.storage.InMemoryJobStorage;
import org.apache.sling.mom.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

import static org.junit.Assert.*;

/**
 */
public class JobManagerImplTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerImplTest.class);
    private JobManager jobManager;
    private JobStorage jobStorage;
    private OutboundJobUpdateListener messageSender;
    @Mock
    private TopicManager topicManager;
    @Mock
    private QueueManager queueManager;
    private Map<org.apache.sling.mom.Types.TopicName, Queue<QueueEntry>> topicQueues;
    private Map<org.apache.sling.mom.Types.QueueName, Queue<QueueEntry>> messageQueues;

    public JobManagerImplTest() {
        MockitoAnnotations.initMocks(this);
    }

    @Before
    public void setUp() throws Exception {
        topicQueues = new HashMap<org.apache.sling.mom.Types.TopicName, Queue<QueueEntry>>();
        messageQueues = new HashMap<org.apache.sling.mom.Types.QueueName, Queue<QueueEntry>>();
        //noinspection unchecked
        Mockito.doAnswer(new Answer<Void>() {
            @Override
            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                org.apache.sling.mom.Types.TopicName topic = (org.apache.sling.mom.Types.TopicName) invocationOnMock.getArguments()[0];
                org.apache.sling.mom.Types.CommandName command = (org.apache.sling.mom.Types.CommandName) invocationOnMock.getArguments()[1];
                @SuppressWarnings("unchecked") Map<String, Object> properties = (Map<String, Object>) invocationOnMock.getArguments()[2];
                LOGGER.info("Topic Manager publish {} {} {} ", new Object[]{ topic, command, properties });
                Queue<QueueEntry> queue = topicQueues.get(topic);
                if ( queue == null) {
                    queue = new ArrayBlockingQueue<QueueEntry>(100);
                    topicQueues.put(topic, queue);

                }
                queue.add(new QueueEntry(command, properties));
                return null;
            }
        }).when(topicManager)
                .publish(Mockito.any(org.apache.sling.mom.Types.TopicName.class), Mockito.any(org.apache.sling.mom.Types.CommandName.class), Mockito.any(Map.class));

        //noinspection unchecked
        Mockito.doAnswer(new Answer<Void>() {
            @Override
            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                org.apache.sling.mom.Types.QueueName topic = (org.apache.sling.mom.Types.QueueName) invocationOnMock.getArguments()[0];
                @SuppressWarnings("unchecked") Map<String, Object> properties = (Map<String, Object>) invocationOnMock.getArguments()[1];
                LOGGER.info("Queue Manager add {} {} {} ", new Object[]{ topic, properties });
                Queue<QueueEntry> queue = messageQueues.get(topic);
                if ( queue == null) {
                    queue = new ArrayBlockingQueue<QueueEntry>(100);
                    messageQueues.put(topic, queue);

                }
                queue.add(new QueueEntry(properties));
                return null;
            }
        }).when(queueManager)
                .add(Mockito.any(org.apache.sling.mom.Types.QueueName.class), Mockito.any(Map.class));

        messageSender = new OutboundJobUpdateListener(topicManager, queueManager);
        jobStorage = new InMemoryJobStorage();
        jobManager = new JobManagerImpl(jobStorage, messageSender);

    }

    @After
    public void tearDown() throws Exception {
        messageSender.dispose();
        jobStorage.dispose();
    }

    @Test
    public void testCreateJob() throws Exception {
        String testId = "testGetJobById"+System.currentTimeMillis();
        Job job = createJob(testId);

        assertEquals(testId, job.getProperties().get("testid"));
        org.apache.sling.mom.Types.TopicName topicName = org.apache.sling.mom.Types.topicName("testtopic");
        assertNotNull(messageQueues.get(topicName));
        assertEquals(1, messageQueues.get(topicName).size());
        QueueEntry qe = messageQueues.get(topicName).remove();
        assertNotNull(qe);
        Map<String, Object> jobProperties = qe.getProperties();

        // pretend to consume the job message
        JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
        assertEquals(job.getId(), dequeuedJob.getId());
        // the operation of the job object is tested in other unit tests, don't repeat that here.
        assertEquals(testId, dequeuedJob.getProperties().get("testid"));

    }

    @Test
    public void testGetJobById() throws Exception {
        String testId = "testGetJobById"+System.currentTimeMillis();
        Job job = createJob(testId);


        Job searchedJob = jobManager.getJobById(job.getId());
        assertNotNull(searchedJob);
        assertEquals(job.getId(), searchedJob.getId());
        assertEquals(testId, searchedJob.getProperties().get("testid"));

    }

    private Job createJob(String testId) {
        Map<String, Object> testProps = new HashMap<String, Object>();
        testProps.put("job.name", "Jobname ");
        testProps.put("testid", testId);
        Job job = jobManager.newJobBuilder(Types.jobQueue("testtopic"), Types.jobType("testtype"))
                .addProperties(testProps)
                .add();
        assertNotNull(job);
        return job;
    }


    @Test
    public void testStopJobById() throws Exception {
        String testId = "testGetJobById"+System.currentTimeMillis();
        Job job = createJob(testId);
        Thread.sleep(10);
        jobManager.stopJobById(job.getId());
        Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
        Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
        assertEquals(1,messageQ.size());
        QueueEntry qe = messageQ.remove();
        assertNotNull(qe);
        Map<String, Object> jobProperties = qe.getProperties();

        // get the job off the queue.
        JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
        assertEquals(job.getId(), dequeuedJob.getId());
        // the operation of the job object is tested in other unit tests, don't repeat that here.
        assertEquals(testId, dequeuedJob.getProperties().get("testid"));


        // get any messages sent to the topic.
        assertEquals(1, topicQ.size());
        QueueEntry stoppedQE = topicQ.remove();
        Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
        assertNotNull(stoppedJobProperties);
        JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
        assertEquals(JobUpdate.JobUpdateCommand.STOP_JOB.asCommandName(), stoppedQE.getCommand());
        assertEquals(job.getId(), stoppedJob.getId());
        // the stop message to the topic wont have any properties.



    }

    @Test
    public void testAbortJob() throws Exception {
        String testId = "testGetJobById"+System.currentTimeMillis();
        Job job = createJob(testId);
        Thread.sleep(10);
        jobManager.abortJob(job.getId());
        LOGGER.info("Message Queues  {}", messageQueues);
        LOGGER.info("Topic Queues  {}", topicQueues);
        Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
        Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
        assertEquals(1,messageQ.size());
        QueueEntry qe = messageQ.remove();
        assertNotNull(qe);
        Map<String, Object> jobProperties = qe.getProperties();

        // get the job off the queue.
        JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
        assertEquals(job.getId(), dequeuedJob.getId());
        // the operation of the job object is tested in other unit tests, dont repeat that here.
        assertEquals(testId, dequeuedJob.getProperties().get("testid"));


        // get any messages sent to the topic.
        assertEquals(1, topicQ.size());
        QueueEntry stoppedQE = topicQ.remove();
        Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
        assertNotNull(stoppedJobProperties);
        JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
        assertEquals(JobUpdate.JobUpdateCommand.ABORT_JOB.asCommandName(), stoppedQE.getCommand());
        assertEquals(job.getId(), stoppedJob.getId());

    }

    @Test
    public void testRetryJobById() throws Exception {
        String testId = "testGetJobById"+System.currentTimeMillis();
        Job job = createJob(testId);
        Thread.sleep(10);
        jobManager.retryJobById(job.getId());
        Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
        Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
        assertEquals(1,messageQ.size());
        QueueEntry qe = messageQ.remove();
        assertNotNull(qe);
        Map<String, Object> jobProperties = qe.getProperties();

        // get the job off the queue.
        JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
        assertEquals(job.getId(), dequeuedJob.getId());
        // the operation of the job object is tested in other unit tests, don't repeat that here.
        assertEquals(testId, dequeuedJob.getProperties().get("testid"));


        // get any messages sent to the topic.
        assertEquals(1, topicQ.size());
        QueueEntry stoppedQE = topicQ.remove();
        Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
        assertNotNull(stoppedJobProperties);
        JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
        assertEquals(JobUpdate.JobUpdateCommand.RETRY_JOB.asCommandName(), stoppedQE.getCommand());
        assertEquals(job.getId(), stoppedJob.getId());

    }

}