blob: 5cab2817b5fc2f4577d1940ece2e2632db65e125 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.jobs.impl;
import org.apache.sling.jobs.*;
import org.apache.sling.jobs.Types;
import org.apache.sling.jobs.impl.spi.JobStorage;
import org.apache.sling.jobs.impl.storage.InMemoryJobStorage;
import org.apache.sling.mom.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import static org.junit.Assert.*;
/**
*/
public class JobManagerImplTest {
private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerImplTest.class);
private JobManager jobManager;
private JobStorage jobStorage;
private OutboundJobUpdateListener messageSender;
@Mock
private TopicManager topicManager;
@Mock
private QueueManager queueManager;
private Map<org.apache.sling.mom.Types.TopicName, Queue<QueueEntry>> topicQueues;
private Map<org.apache.sling.mom.Types.QueueName, Queue<QueueEntry>> messageQueues;
public JobManagerImplTest() {
MockitoAnnotations.initMocks(this);
}
@Before
public void setUp() throws Exception {
topicQueues = new HashMap<org.apache.sling.mom.Types.TopicName, Queue<QueueEntry>>();
messageQueues = new HashMap<org.apache.sling.mom.Types.QueueName, Queue<QueueEntry>>();
//noinspection unchecked
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
org.apache.sling.mom.Types.TopicName topic = (org.apache.sling.mom.Types.TopicName) invocationOnMock.getArguments()[0];
org.apache.sling.mom.Types.CommandName command = (org.apache.sling.mom.Types.CommandName) invocationOnMock.getArguments()[1];
@SuppressWarnings("unchecked") Map<String, Object> properties = (Map<String, Object>) invocationOnMock.getArguments()[2];
LOGGER.info("Topic Manager publish {} {} {} ", new Object[]{ topic, command, properties });
Queue<QueueEntry> queue = topicQueues.get(topic);
if ( queue == null) {
queue = new ArrayBlockingQueue<QueueEntry>(100);
topicQueues.put(topic, queue);
}
queue.add(new QueueEntry(command, properties));
return null;
}
}).when(topicManager)
.publish(Mockito.any(org.apache.sling.mom.Types.TopicName.class), Mockito.any(org.apache.sling.mom.Types.CommandName.class), Mockito.any(Map.class));
//noinspection unchecked
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
org.apache.sling.mom.Types.QueueName topic = (org.apache.sling.mom.Types.QueueName) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked") Map<String, Object> properties = (Map<String, Object>) invocationOnMock.getArguments()[1];
LOGGER.info("Queue Manager add {} {} {} ", new Object[]{ topic, properties });
Queue<QueueEntry> queue = messageQueues.get(topic);
if ( queue == null) {
queue = new ArrayBlockingQueue<QueueEntry>(100);
messageQueues.put(topic, queue);
}
queue.add(new QueueEntry(properties));
return null;
}
}).when(queueManager)
.add(Mockito.any(org.apache.sling.mom.Types.QueueName.class), Mockito.any(Map.class));
messageSender = new OutboundJobUpdateListener(topicManager, queueManager);
jobStorage = new InMemoryJobStorage();
jobManager = new JobManagerImpl(jobStorage, messageSender);
}
@After
public void tearDown() throws Exception {
messageSender.dispose();
jobStorage.dispose();
}
@Test
public void testCreateJob() throws Exception {
String testId = "testGetJobById"+System.currentTimeMillis();
Job job = createJob(testId);
assertEquals(testId, job.getProperties().get("testid"));
org.apache.sling.mom.Types.TopicName topicName = org.apache.sling.mom.Types.topicName("testtopic");
assertNotNull(messageQueues.get(topicName));
assertEquals(1, messageQueues.get(topicName).size());
QueueEntry qe = messageQueues.get(topicName).remove();
assertNotNull(qe);
Map<String, Object> jobProperties = qe.getProperties();
// pretend to consume the job message
JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
assertEquals(job.getId(), dequeuedJob.getId());
// the operation of the job object is tested in other unit tests, don't repeat that here.
assertEquals(testId, dequeuedJob.getProperties().get("testid"));
}
@Test
public void testGetJobById() throws Exception {
String testId = "testGetJobById"+System.currentTimeMillis();
Job job = createJob(testId);
Job searchedJob = jobManager.getJobById(job.getId());
assertNotNull(searchedJob);
assertEquals(job.getId(), searchedJob.getId());
assertEquals(testId, searchedJob.getProperties().get("testid"));
}
private Job createJob(String testId) {
Map<String, Object> testProps = new HashMap<String, Object>();
testProps.put("job.name", "Jobname ");
testProps.put("testid", testId);
Job job = jobManager.newJobBuilder(Types.jobQueue("testtopic"), Types.jobType("testtype"))
.addProperties(testProps)
.add();
assertNotNull(job);
return job;
}
@Test
public void testStopJobById() throws Exception {
String testId = "testGetJobById"+System.currentTimeMillis();
Job job = createJob(testId);
Thread.sleep(10);
jobManager.stopJobById(job.getId());
Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
assertEquals(1,messageQ.size());
QueueEntry qe = messageQ.remove();
assertNotNull(qe);
Map<String, Object> jobProperties = qe.getProperties();
// get the job off the queue.
JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
assertEquals(job.getId(), dequeuedJob.getId());
// the operation of the job object is tested in other unit tests, don't repeat that here.
assertEquals(testId, dequeuedJob.getProperties().get("testid"));
// get any messages sent to the topic.
assertEquals(1, topicQ.size());
QueueEntry stoppedQE = topicQ.remove();
Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
assertNotNull(stoppedJobProperties);
JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
assertEquals(JobUpdate.JobUpdateCommand.STOP_JOB.asCommandName(), stoppedQE.getCommand());
assertEquals(job.getId(), stoppedJob.getId());
// the stop message to the topic wont have any properties.
}
@Test
public void testAbortJob() throws Exception {
String testId = "testGetJobById"+System.currentTimeMillis();
Job job = createJob(testId);
Thread.sleep(10);
jobManager.abortJob(job.getId());
LOGGER.info("Message Queues {}", messageQueues);
LOGGER.info("Topic Queues {}", topicQueues);
Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
assertEquals(1,messageQ.size());
QueueEntry qe = messageQ.remove();
assertNotNull(qe);
Map<String, Object> jobProperties = qe.getProperties();
// get the job off the queue.
JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
assertEquals(job.getId(), dequeuedJob.getId());
// the operation of the job object is tested in other unit tests, dont repeat that here.
assertEquals(testId, dequeuedJob.getProperties().get("testid"));
// get any messages sent to the topic.
assertEquals(1, topicQ.size());
QueueEntry stoppedQE = topicQ.remove();
Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
assertNotNull(stoppedJobProperties);
JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
assertEquals(JobUpdate.JobUpdateCommand.ABORT_JOB.asCommandName(), stoppedQE.getCommand());
assertEquals(job.getId(), stoppedJob.getId());
}
@Test
public void testRetryJobById() throws Exception {
String testId = "testGetJobById"+System.currentTimeMillis();
Job job = createJob(testId);
Thread.sleep(10);
jobManager.retryJobById(job.getId());
Queue<QueueEntry> messageQ = messageQueues.get(org.apache.sling.mom.Types.queueName("testtopic"));
Queue<QueueEntry> topicQ = topicQueues.get(org.apache.sling.mom.Types.topicName("testtopic"));
assertEquals(1,messageQ.size());
QueueEntry qe = messageQ.remove();
assertNotNull(qe);
Map<String, Object> jobProperties = qe.getProperties();
// get the job off the queue.
JobImpl dequeuedJob = new JobImpl(new JobUpdateImpl(jobProperties));
assertEquals(job.getId(), dequeuedJob.getId());
// the operation of the job object is tested in other unit tests, don't repeat that here.
assertEquals(testId, dequeuedJob.getProperties().get("testid"));
// get any messages sent to the topic.
assertEquals(1, topicQ.size());
QueueEntry stoppedQE = topicQ.remove();
Map<String, Object> stoppedJobProperties = stoppedQE.getProperties();
assertNotNull(stoppedJobProperties);
JobImpl stoppedJob = new JobImpl(new JobUpdateImpl(stoppedJobProperties));
assertEquals(JobUpdate.JobUpdateCommand.RETRY_JOB.asCommandName(), stoppedQE.getCommand());
assertEquals(job.getId(), stoppedJob.getId());
}
}