https://issues.apache.org/jira/browse/AMQ-6152

Ensure that when add / remove commands are colocated they don't prevent
the log from being GC'd once it is unreferenced.  
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index bcb819c..82b9ff5 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -505,8 +505,12 @@
 
             // now that the job is removed from the index we can store the remove info and
             // then dereference the log files that hold the initial add command and the most
-            // recent update command.
-            this.store.referenceRemovedLocation(tx, location, removed);
+            // recent update command.  If the remove and the add that created the job are in
+            // the same file we don't need to track it and just let a normal GC of the logs
+            // remove it when the log is unreferenced.
+            if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
+                this.store.referenceRemovedLocation(tx, location, removed);
+            }
         }
     }
 
@@ -589,8 +593,12 @@
 
                     // now that the job is removed from the index we can store the remove info and
                     // then dereference the log files that hold the initial add command and the most
-                    // recent update command.
-                    this.store.referenceRemovedLocation(tx, location, job);
+                    // recent update command.  If the remove and the add that created the job are in
+                    // the same file we don't need to track it and just let a normal GC of the logs
+                    // remove it when the log is unreferenced.
+                    if (job.getLocation().getDataFileId() != location.getDataFileId()) {
+                        this.store.referenceRemovedLocation(tx, location, job);
+                    }
                 }
             }
         }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 1a08931..f73b6a3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -397,22 +397,22 @@
                 Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
                 List<Integer> orphans = new ArrayList<Integer>();
                 while (removals.hasNext()) {
-                    boolean orphanedRemve = true;
+                    boolean orphanedRemove = true;
                     Entry<Integer, List<Integer>> entry = removals.next();
 
                     // If this log is not a GC candidate then there's no need to do a check to rule it out
                     if (gcCandidateSet.contains(entry.getKey())) {
                         for (Integer addLocation : entry.getValue()) {
                             if (completeFileSet.contains(addLocation)) {
-                                orphanedRemve = false;
+                                LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation);
+                                orphanedRemove = false;
                                 break;
                             }
                         }
 
                         // If it's not orphaned than we can't remove it, otherwise we
                         // stop tracking it it's log will get deleted on the next check.
-                        if (!orphanedRemve) {
-                            LOG.trace("A remove in log {} has an add still in existance.", entry.getKey());
+                        if (!orphanedRemove) {
                             gcCandidateSet.remove(entry.getKey());
                         } else {
                             LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
index c013a4c..3685f34 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
@@ -29,6 +29,7 @@
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +46,10 @@
 
     @Before
     public void setUp() throws Exception {
+
+        // investigate gc issue - store usage not getting released
+        org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE);
+
         File directory = new File("target/test/ScheduledJobsDB");
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
@@ -80,7 +85,7 @@
     }
 
     @Test
-    public void test() throws Exception {
+    public void testStoreCleanupLinear() throws Exception {
         final int COUNT = 10;
         final CountDownLatch latch = new CountDownLatch(COUNT);
         scheduler.addListener(new JobListener() {
@@ -122,4 +127,39 @@
 
         LOG.info("Number of journal log files: {}", getNumJournalFiles());
     }
+
+    @Test
+    public void testColocatedAddRemoveCleanup() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        byte[] data = new byte[1024];
+        for (int i = 0; i < data.length; ++i) {
+            data[i] = (byte) (i % 256);
+        }
+
+        long time = TimeUnit.SECONDS.toMillis(2);
+        scheduler.schedule("Message-1", new ByteSequence(data), "", time, 0, 0);
+
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        scheduler.schedule("Message-2", payload, "", time, 0, 0);
+        scheduler.schedule("Message-3", payload, "", time, 0, 0);
+
+        assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+    }
 }