SLING-7039: Clean up jobs in state dropped and errors.
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1804640 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index f3647c6..45ecc3b 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -18,16 +18,6 @@
*/
package org.apache.sling.event.impl.jobs.config;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
@@ -58,6 +48,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Configuration of the job handling
*
@@ -87,6 +87,11 @@
description="Specify amount in seconds that job manager waits on startup before starting with job handling. "
+ "This can be used to allow enough time to restart a cluster before jobs are eventually reassigned.")
long startup_delay() default 30;
+
+ @AttributeDefinition(name = "Clean-up removed jobs period",
+ description = "Specify the periodic interval in minutes (default is 48h - use 0 to disable) after which " +
+ "removed jobs (ERROR or DROPPED) should be cleaned from the repository.")
+ int cleanup_period() default 2880;
}
/** Logger. */
private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
@@ -149,6 +154,8 @@
/** The resource path where scheduled jobs are stored - ending with a slash. */
private String scheduledJobsPathWithSlash;
+ private volatile int historyCleanUpRemovedJobs;
+
/** List of topology awares. */
private final List<ConfigurationChangeListener> listeners = new ArrayList<>();
@@ -200,6 +207,8 @@
DEFAULT_SCHEDULED_JOBS_PATH);
this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
+ this.historyCleanUpRemovedJobs = config.cleanup_period();
+
// create initial resources
final ResourceResolver resolver = this.createResourceResolver();
try {
@@ -254,6 +263,9 @@
this.stopProcessing();
}
+ public int getHistoryCleanUpRemovedJobs() {
+ return this.historyCleanUpRemovedJobs;
+ }
/**
* Is this component still active?
* @return Active?
@@ -450,7 +462,6 @@
/**
* Stop processing
- * @param deactivate Whether to deactivate the capabilities
*/
private void stopProcessing() {
logger.debug("Stopping job processing...");
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
index 7fdcb88..2b9cf4f 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
@@ -18,18 +18,23 @@
*/
package org.apache.sling.event.impl.jobs.tasks;
-import java.util.Calendar;
-import java.util.Iterator;
-
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.queues.ResultBuilderImpl;
import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+
/**
* Maintenance task...
*
@@ -91,9 +96,72 @@
}
}
+
+ if (this.configuration.getHistoryCleanUpRemovedJobs() > 0 &&
+ schedulerRuns % 60 == 1) {
+ Calendar removeDate = Calendar.getInstance();
+ removeDate.add(Calendar.MINUTE, - this.configuration.getHistoryCleanUpRemovedJobs());
+ this.historyCleanUpRemovedJobs(removeDate);
+ }
+
logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
}
+ private void historyCleanUpRemovedJobs(Calendar since) {
+ ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ HistoryCleanUpTask.cleanup(
+ since,
+ resolver,
+ new JobExecutionContext() {
+ @Override
+ public void asyncProcessingFinished(JobExecutionResult result) {
+
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void initProgress(int steps, long eta) {
+
+ }
+
+ @Override
+ public void incrementProgressCount(int steps) {
+
+ }
+
+ @Override
+ public void updateProgress(long eta) {
+
+ }
+
+ @Override
+ public void log(String message, Object... args) {
+
+ }
+
+ @Override
+ public ResultBuilder result() {
+ return new ResultBuilderImpl();
+ }
+ },
+ this.configuration.getStoredCancelledJobsPath(),
+ null,
+ Arrays.asList(
+ Job.JobState.DROPPED.name(),
+ Job.JobState.ERROR.name()
+ ));
+ } catch (PersistenceException e) {
+ this.logger.warn("Exception during job resource tree cleanup.", e);
+ } finally {
+ resolver.close();
+ }
+ }
+
/**
* Simple empty folder removes empty folders for the last ten minutes
* starting five minutes ago.
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
index 29feafa..6cda613 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
@@ -18,12 +18,6 @@
*/
package org.apache.sling.event.impl.jobs.tasks;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -41,6 +35,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+
/**
* Task to clean up the history,
* A clean up task can be configured with three properties:
@@ -117,7 +117,7 @@
return context.result().succeeded();
}
- private void cleanup(final Calendar removeDate,
+ static void cleanup(final Calendar removeDate,
final ResourceResolver resolver,
final JobExecutionContext context,
final String basePath,
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java b/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java
new file mode 100644
index 0000000..6860ae2
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.tasks;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Map;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.common.collect.Maps;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HistoryCleanUpRemovedJobsTest {
+
+ private static final String JCR_PATH = JobManagerConfiguration.DEFAULT_REPOSITORY_PATH + "/cancelled";
+ private static final String JCR_TOPIC = "test";
+ private static final String JCR_JOB_NAME = "test-job";
+ private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+ private static final int MAX_AGE_IN_DAYS = 60;
+
+ @Rule
+ public final SlingContext ctx = new SlingContext();
+
+ @Mock
+ private JobManagerConfiguration configuration;
+ @Mock
+ private JobSchedulerImpl jobScheduler;
+
+ private CleanUpTask task;
+
+ @Before
+ public void setUp() {
+ setupConfiguration();
+ setUpTask();
+ }
+
+ private void setupConfiguration() {
+ Mockito.when(configuration.getStoredCancelledJobsPath()).thenReturn(JCR_PATH);
+ Mockito.when(configuration.createResourceResolver()).thenReturn(ctx.resourceResolver());
+ Mockito.when(configuration.getHistoryCleanUpRemovedJobs()).thenReturn(1);
+ }
+
+ private void setUpTask() {
+ task = new CleanUpTask(configuration, jobScheduler);
+ }
+
+ @Test
+ public void shouldNotDeleteDroppedResourcesYoungerThanRemoveDate() {
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.SECOND, -1);
+ Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+ task.run();
+ assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+ }
+
+ @Test
+ public void shouldNotDeleteErrorResourcesYoungerThanRemoveDate() {
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.SECOND, -1);
+ Resource resource = createResourceForDate(calendar, Job.JobState.ERROR.name());
+ task.run();
+ assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+ }
+
+ @Test
+ public void shouldNotDeleteSuccessfulResourcesOlderThanRemoveDate() {
+ Calendar calendar = Calendar.getInstance();
+
+ calendar.add(Calendar.MINUTE,-1);
+ Resource resource = createResourceForDate(calendar, Job.JobState.SUCCEEDED.name());
+
+ task.run();
+ assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+ }
+
+ @Test
+ public void shouldDeleteDroppedResourcesOlderThanRemoveDate() {
+ Calendar calendar = Calendar.getInstance();
+
+ calendar.add(Calendar.MINUTE,-1);
+ Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+
+ task.run();
+ assertNull(ctx.resourceResolver().getResource(resource.getPath()));
+ }
+
+ @Test
+ public void shouldDeleteErrorResourcesOlderThanRemoveDate() {
+ Calendar calendar = Calendar.getInstance();
+
+ calendar.add(Calendar.MINUTE,-1);
+ Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+
+ task.run();
+ assertNull(ctx.resourceResolver().getResource(resource.getPath()));
+ }
+
+ private Resource createResourceForDate(Calendar cal, String status) {
+ String path = JCR_PATH + '/' + JCR_TOPIC + '/' + DATE_FORMATTER.format(cal.getTime()) + '/' + JCR_JOB_NAME;
+ return ctx.create().resource(path, JobImpl.PROPERTY_FINISHED_STATE, status);
+ }
+
+}