SLING-7039: Clean up jobs in state dropped and errors.

git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1804640 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index f3647c6..45ecc3b 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -18,16 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs.config;
 
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -58,6 +48,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Configuration of the job handling
  *
@@ -87,6 +87,11 @@
               description="Specify amount in seconds that job manager waits on startup before starting with job handling. "
                         + "This can be used to allow enough time to restart a cluster before jobs are eventually reassigned.")
         long startup_delay() default 30;
+
+        @AttributeDefinition(name = "Clean-up removed jobs period",
+            description = "Specify the periodic interval in minutes (default is 48h - use 0 to disable) after which " +
+                    "removed jobs (ERROR or DROPPED) should be cleaned from the repository.")
+        int cleanup_period() default 2880;
     }
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger("org.apache.sling.event.impl.jobs");
@@ -149,6 +154,8 @@
     /** The resource path where scheduled jobs are stored - ending with a slash. */
     private String scheduledJobsPathWithSlash;
 
+    private volatile int historyCleanUpRemovedJobs;
+
     /** List of topology awares. */
     private final List<ConfigurationChangeListener> listeners = new ArrayList<>();
 
@@ -200,6 +207,8 @@
             DEFAULT_SCHEDULED_JOBS_PATH);
         this.scheduledJobsPathWithSlash = this.scheduledJobsPath + "/";
 
+        this.historyCleanUpRemovedJobs = config.cleanup_period();
+
         // create initial resources
         final ResourceResolver resolver = this.createResourceResolver();
         try {
@@ -254,6 +263,9 @@
         this.stopProcessing();
     }
 
+    public int getHistoryCleanUpRemovedJobs() {
+        return this.historyCleanUpRemovedJobs;
+    }
     /**
      * Is this component still active?
      * @return Active?
@@ -450,7 +462,6 @@
 
     /**
      * Stop processing
-     * @param deactivate Whether to deactivate the capabilities
      */
     private void stopProcessing() {
         logger.debug("Stopping job processing...");
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
index 7fdcb88..2b9cf4f 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
@@ -18,18 +18,23 @@
  */
 package org.apache.sling.event.impl.jobs.tasks;
 
-import java.util.Calendar;
-import java.util.Iterator;
-
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.jobs.queues.ResultBuilderImpl;
 import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+
 /**
  * Maintenance task...
  *
@@ -91,9 +96,72 @@
             }
         }
 
+
+        if (this.configuration.getHistoryCleanUpRemovedJobs() > 0 &&
+                schedulerRuns % 60 == 1) {
+            Calendar removeDate = Calendar.getInstance();
+            removeDate.add(Calendar.MINUTE, - this.configuration.getHistoryCleanUpRemovedJobs());
+            this.historyCleanUpRemovedJobs(removeDate);
+        }
+
         logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
     }
 
+    private void historyCleanUpRemovedJobs(Calendar since) {
+        ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            HistoryCleanUpTask.cleanup(
+                    since,
+                    resolver,
+                    new JobExecutionContext() {
+                        @Override
+                        public void asyncProcessingFinished(JobExecutionResult result) {
+
+                        }
+
+                        @Override
+                        public boolean isStopped() {
+                            return false;
+                        }
+
+                        @Override
+                        public void initProgress(int steps, long eta) {
+
+                        }
+
+                        @Override
+                        public void incrementProgressCount(int steps) {
+
+                        }
+
+                        @Override
+                        public void updateProgress(long eta) {
+
+                        }
+
+                        @Override
+                        public void log(String message, Object... args) {
+
+                        }
+
+                        @Override
+                        public ResultBuilder result() {
+                            return new ResultBuilderImpl();
+                        }
+                    },
+                    this.configuration.getStoredCancelledJobsPath(),
+                    null,
+                    Arrays.asList(
+                            Job.JobState.DROPPED.name(),
+                            Job.JobState.ERROR.name()
+                    ));
+        } catch (PersistenceException e) {
+            this.logger.warn("Exception during job resource tree cleanup.", e);
+        } finally {
+            resolver.close();
+        }
+    }
+
     /**
      * Simple empty folder removes empty folders for the last ten minutes
      * starting five minutes ago.
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
index 29feafa..6cda613 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
@@ -18,12 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs.tasks;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -41,6 +35,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * Task to clean up the history,
  * A clean up task can be configured with three properties:
@@ -117,7 +117,7 @@
         return context.result().succeeded();
     }
 
-    private void cleanup(final Calendar removeDate,
+    static void cleanup(final Calendar removeDate,
             final ResourceResolver resolver,
             final JobExecutionContext context,
             final String basePath,
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java b/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java
new file mode 100644
index 0000000..6860ae2
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpRemovedJobsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.tasks;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Map;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.common.collect.Maps;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HistoryCleanUpRemovedJobsTest {
+
+    private static final String JCR_PATH = JobManagerConfiguration.DEFAULT_REPOSITORY_PATH + "/cancelled";
+    private static final String JCR_TOPIC = "test";
+    private static final String JCR_JOB_NAME = "test-job";
+    private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+    private static final int MAX_AGE_IN_DAYS = 60;
+
+    @Rule
+    public final SlingContext ctx = new SlingContext();
+
+    @Mock
+    private JobManagerConfiguration configuration;
+    @Mock
+    private JobSchedulerImpl jobScheduler;
+
+    private CleanUpTask task;
+
+    @Before
+    public void setUp() {
+        setupConfiguration();
+        setUpTask();
+    }
+
+    private void setupConfiguration() {
+        Mockito.when(configuration.getStoredCancelledJobsPath()).thenReturn(JCR_PATH);
+        Mockito.when(configuration.createResourceResolver()).thenReturn(ctx.resourceResolver());
+        Mockito.when(configuration.getHistoryCleanUpRemovedJobs()).thenReturn(1);
+    }
+
+    private void setUpTask() {
+        task = new CleanUpTask(configuration, jobScheduler);
+    }
+
+    @Test
+    public void shouldNotDeleteDroppedResourcesYoungerThanRemoveDate() {
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.SECOND, -1);
+        Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+        task.run();
+        assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+    }
+
+    @Test
+    public void shouldNotDeleteErrorResourcesYoungerThanRemoveDate() {
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.SECOND, -1);
+        Resource resource = createResourceForDate(calendar, Job.JobState.ERROR.name());
+        task.run();
+        assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+    }
+
+    @Test
+    public void shouldNotDeleteSuccessfulResourcesOlderThanRemoveDate() {
+        Calendar calendar = Calendar.getInstance();
+
+        calendar.add(Calendar.MINUTE,-1);
+        Resource resource = createResourceForDate(calendar, Job.JobState.SUCCEEDED.name());
+
+        task.run();
+        assertNotNull(ctx.resourceResolver().getResource(resource.getPath()));
+    }
+
+    @Test
+    public void shouldDeleteDroppedResourcesOlderThanRemoveDate() {
+        Calendar calendar = Calendar.getInstance();
+
+        calendar.add(Calendar.MINUTE,-1);
+        Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+
+        task.run();
+        assertNull(ctx.resourceResolver().getResource(resource.getPath()));
+    }
+
+    @Test
+    public void shouldDeleteErrorResourcesOlderThanRemoveDate() {
+        Calendar calendar = Calendar.getInstance();
+
+        calendar.add(Calendar.MINUTE,-1);
+        Resource resource = createResourceForDate(calendar, Job.JobState.DROPPED.name());
+
+        task.run();
+        assertNull(ctx.resourceResolver().getResource(resource.getPath()));
+    }
+
+    private Resource createResourceForDate(Calendar cal, String status) {
+        String path = JCR_PATH + '/' + JCR_TOPIC + '/' + DATE_FORMATTER.format(cal.getTime()) + '/' + JCR_JOB_NAME;
+        return ctx.create().resource(path, JobImpl.PROPERTY_FINISHED_STATE, status);
+    }
+
+}