blob: d69d036ec8272b6404dbe425477e5abc647941d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.it;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.event.impl.Barrier;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PaxExam.class)
public class JobHandlingTest extends AbstractJobHandlingTest {
public static final String TOPIC = "sling/test";
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
@Before
public void setup() throws IOException {
super.setup();
// create test queue
final org.osgi.service.cm.Configuration config = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(ConfigurationConstants.PROP_NAME, "test");
props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
props.put(ConfigurationConstants.PROP_TOPICS, new String[] {TOPIC, TOPIC + "2"});
props.put(ConfigurationConstants.PROP_RETRIES, 2);
props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
config.update(props);
this.sleep(1000L);
}
@Override
@After
public void cleanup() {
super.cleanup();
}
/**
* Test simple job execution.
* The job is executed once and finished successfully.
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testSimpleJobExecutionUsingJobConsumer() throws Exception {
final Barrier cb = new Barrier(2);
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(final Job job) {
cb.block();
return JobResult.OK;
}
});
this.getJobManager().addJob(TOPIC, null);
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
}
/**
* Test simple job execution.
* The job is executed once and finished successfully.
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testSimpleJobExecutionUsingJobExecutor() throws Exception {
final Barrier cb = new Barrier(2);
this.registerJobExecutor(TOPIC,
new JobExecutor() {
@Override
public JobExecutionResult process(final Job job, final JobExecutionContext context) {
cb.block();
return context.result().succeeded();
}
});
this.getJobManager().addJob(TOPIC, null);
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testManyJobs() throws Exception {
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(final Job job) {
return JobResult.OK;
}
});
final AtomicInteger count = new AtomicInteger(0);
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
public void handleEvent(final Event event) {
count.incrementAndGet();
}
});
// we start "some" jobs
final int COUNT = 300;
for(int i = 0; i < COUNT; i++ ) {
this.getJobManager().addJob(TOPIC, null);
}
while ( count.get() < COUNT ) {
this.sleep(50);
}
assertEquals("Finished count", COUNT, count.get());
assertEquals("Finished count", COUNT, this.getJobManager().getStatistics().getNumberOfFinishedJobs());
}
/**
* Test canceling a job
* The job execution always fails
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testCancelJob() throws Exception {
final Barrier cb = new Barrier(2);
final Barrier cb2 = new Barrier(2);
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(Job job) {
cb.block();
cb2.block();
return JobResult.FAILED;
}
});
final Map<String, Object> jobProperties = Collections.singletonMap("id", (Object)"cancelJobId");
@SuppressWarnings("unchecked")
final Map<String, Object>[] jobPropertiesAsArray = new Map[1];
jobPropertiesAsArray[0] = jobProperties;
// create job
final JobManager jobManager = this.getJobManager();
jobManager.addJob(TOPIC, jobProperties);
cb.block();
assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, jobPropertiesAsArray).size());
// job is currently waiting, therefore cancel fails
final Job e1 = jobManager.getJob(TOPIC, jobProperties);
assertNotNull(e1);
cb2.block(); // and continue job
sleep(200);
// the job is now in the queue again
final Job e2 = jobManager.getJob(TOPIC, jobProperties);
assertNotNull(e2);
assertTrue(jobManager.removeJobById(e2.getId()));
assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, jobPropertiesAsArray).size());
final Collection<Job> col = jobManager.findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
jobPropertiesAsArray);
try {
assertEquals(1, col.size());
} finally {
for(final Job j : col) {
jobManager.removeJobById(j.getId());
}
}
}
/**
* Test get a job
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testGetJob() throws Exception {
final Barrier cb = new Barrier(2);
final Barrier cb2 = new Barrier(2);
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(Job job) {
cb.block();
cb2.block();
return JobResult.OK;
}
});
final JobManager jobManager = this.getJobManager();
final Job j = jobManager.addJob(TOPIC, null);
cb.block();
assertNotNull(jobManager.getJob(TOPIC, null));
cb2.block(); // and continue job
jobManager.removeJobById(j.getId());
}
/**
* Reschedule test.
* The job is rescheduled two times before it fails.
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testStartJobAndReschedule() throws Exception {
final List<Integer> retryCountList = new ArrayList<Integer>();
final Barrier cb = new Barrier(2);
this.registerJobConsumer(TOPIC,
new JobConsumer() {
int retryCount;
@Override
public JobResult process(Job job) {
int retry = 0;
if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
}
if ( retry == retryCount ) {
retryCountList.add(retry);
}
retryCount++;
cb.block();
return JobResult.FAILED;
}
});
final JobManager jobManager = this.getJobManager();
final Job job = jobManager.addJob(TOPIC, null);
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
// the job is retried after two seconds, so we wait again
assertTrue("No event received in the given time.", cb.block(5));
cb.reset();
// the job is retried after two seconds, so we wait again
assertTrue("No event received in the given time.", cb.block(5));
// we have reached the retry so we expect to not get an event
cb.reset();
assertFalse("Unexpected event received in the given time.", cb.block(5));
assertEquals("Unexpected number of retries", 3, retryCountList.size());
jobManager.removeJobById(job.getId());
}
/**
* Notifications.
* We send several jobs which are treated different and then see
* how many invocations have been sent.
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testNotifications() throws Exception {
final List<String> cancelled = Collections.synchronizedList(new ArrayList<String>());
final List<String> failed = Collections.synchronizedList(new ArrayList<String>());
final List<String> finished = Collections.synchronizedList(new ArrayList<String>());
final List<String> started = Collections.synchronizedList(new ArrayList<String>());
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(Job job) {
// events 1 and 4 finish the first time
final String id = (String)job.getProperty("id");
if ( "1".equals(id) || "4".equals(id) ) {
return JobResult.OK;
// 5 fails always
} else if ( "5".equals(id) ) {
return JobResult.FAILED;
} else {
int retry = 0;
if ( job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT) != null ) {
retry = (Integer)job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT);
}
// 2 fails the first time
if ( "2".equals(id) ) {
if ( retry == 0 ) {
return JobResult.FAILED;
} else {
return JobResult.OK;
}
}
// 3 fails the first and second time
if ( "3".equals(id) ) {
if ( retry == 0 || retry == 1 ) {
return JobResult.FAILED;
} else {
return JobResult.OK;
}
}
}
return JobResult.FAILED;
}
});
this.registerEventHandler(NotificationConstants.TOPIC_JOB_CANCELLED,
new EventHandler() {
@Override
public void handleEvent(Event event) {
final String id = (String)event.getProperty("id");
cancelled.add(id);
}
});
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FAILED,
new EventHandler() {
@Override
public void handleEvent(Event event) {
final String id = (String)event.getProperty("id");
failed.add(id);
}
});
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
public void handleEvent(Event event) {
final String id = (String)event.getProperty("id");
finished.add(id);
}
});
this.registerEventHandler(NotificationConstants.TOPIC_JOB_STARTED,
new EventHandler() {
@Override
public void handleEvent(Event event) {
final String id = (String)event.getProperty("id");
started.add(id);
}
});
final JobManager jobManager = this.getJobManager();
jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"1"));
jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"2"));
jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"3"));
jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"4"));
jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"5"));
int count = 0;
final long startTime = System.currentTimeMillis();
do {
count = finished.size() + cancelled.size();
// after 25 seconds we cancel the test
if ( System.currentTimeMillis() - startTime > 25000 ) {
throw new Exception("Timeout during notification test.");
}
} while ( count < 5 || started.size() < 10 );
assertEquals("Finished count", 4, finished.size());
assertEquals("Cancelled count", 1, cancelled.size());
assertEquals("Started count", 10, started.size());
assertEquals("Failed count", 5, failed.size());
}
/**
* Test sending of jobs with and without a processor
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testNoJobProcessor() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
log.info("testNoJobProcessor : registering job consumer");
this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(final Job job) {
final int got = count.incrementAndGet();
log.info("testNoJobProcessor.process : got " + got + " jobs");
return JobResult.OK;
}
});
final JobManager jobManager = this.getJobManager();
log.info("testNoJobProcessor : starting 20 jobs, 10 on " + TOPIC + " and 10 on " + TOPIC + "2");
// we start 20 jobs, every second job has no processor
final int COUNT = 20;
for(int i = 0; i < COUNT; i++ ) {
final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
jobManager.addJob(jobTopic, null);
}
log.info("testNoJobProcessor : waiting for jobs to be received ...");
while ( jobManager.getStatistics().getNumberOfFinishedJobs() < COUNT / 2) {
this.sleep(50);
}
assertEquals("Finished count", COUNT / 2, count.get());
// unprocessed count should be 0 as there is no job consumer for this job
assertEquals("Unprocessed count", 0, jobManager.getStatistics().getNumberOfJobs());
assertEquals("Finished count", COUNT / 2, jobManager.getStatistics().getNumberOfFinishedJobs());
}
}