SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
index 0df2a9c..f925e4a 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
@@ -223,6 +223,7 @@
holder.read = System.currentTimeMillis();
holder.info = this.addOrUpdateScheduledJob(properties, h == null ? null : h.info);
+ this.scheduledJobs.put(key, holder);
this.jobScheduler.scheduleJob(holder.info);
return holder.info;
}
diff --git a/src/test/java/org/apache/sling/event/it/SchedulingTest.java b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
index 567ee70..4107a3b 100644
--- a/src/test/java/org/apache/sling/event/it/SchedulingTest.java
+++ b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
@@ -22,6 +22,8 @@
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.event.jobs.Job;
@@ -32,12 +34,16 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@RunWith(PaxExam.class)
public class SchedulingTest extends AbstractJobHandlingTest {
private static final String TOPIC = "job/scheduled/topic";
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
@Override
@Before
public void setup() throws IOException {
@@ -84,4 +90,40 @@
info2.unschedule();
assertEquals(0, this.getJobManager().getScheduledJobs().size()); // scheduled jobs
}
+
+ @Test
+ public void schedulingLoadTest() throws Exception {
+ logger.info("schedulingLoadTest: start");
+ final AtomicInteger counter = new AtomicInteger();
+ final int NUM_ITERATIONS = 1500;
+ final String ownTopic = "random/" + UUID.randomUUID().toString();
+ this.registerJobConsumer(ownTopic, new JobConsumer() {
+
+ @Override
+ public JobResult process(final Job job) {
+ if ( job.getTopic().equals(ownTopic) ) {
+ counter.incrementAndGet();
+ }
+ return JobResult.OK;
+ }
+
+ });
+ for(int i=0; i<NUM_ITERATIONS; i++) {
+ logger.info("schedulingLoadTest: loop-" + i);
+ this.getJobManager().createJob(ownTopic).schedule().at(new Date(System.currentTimeMillis() + 2500)).add();
+ Thread.sleep(1);
+ }
+ logger.info("schedulingLoadTest: done, letting jobs be triggered, currently at {} jobs, {} schedules", counter.get(), this.getJobManager().getScheduledJobs().size());
+ final long timeout = System.currentTimeMillis() + 30000;
+ while(System.currentTimeMillis() < timeout) {
+ if ((counter.get() == NUM_ITERATIONS) && (this.getJobManager().getScheduledJobs().size() == 0)) {
+ break;
+ }
+ logger.info("schedulingLoadTest: currently at {} jobs, {} schedules", counter.get(), getJobManager().getScheduledJobs().size());
+ Thread.sleep(100);
+ }
+ assertEquals(NUM_ITERATIONS, counter.get());
+ assertEquals(0, this.getJobManager().getScheduledJobs().size());
+ logger.info("schedulingLoadTest: end");
+ }
}