https://issues.apache.org/jira/browse/AMQ-6152
Ensure that when add / remove commands are colocated they don't prevent
the log from being GC'd once it is unreferenced.
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index bcb819c..82b9ff5 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -505,8 +505,12 @@
// now that the job is removed from the index we can store the remove info and
// then dereference the log files that hold the initial add command and the most
- // recent update command.
- this.store.referenceRemovedLocation(tx, location, removed);
+ // recent update command. If the remove and the add that created the job are in
+ // the same file we don't need to track it and just let a normal GC of the logs
+ // remove it when the log is unreferenced.
+ if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
+ this.store.referenceRemovedLocation(tx, location, removed);
+ }
}
}
@@ -589,8 +593,12 @@
// now that the job is removed from the index we can store the remove info and
// then dereference the log files that hold the initial add command and the most
- // recent update command.
- this.store.referenceRemovedLocation(tx, location, job);
+ // recent update command. If the remove and the add that created the job are in
+ // the same file we don't need to track it and just let a normal GC of the logs
+ // remove it when the log is unreferenced.
+ if (job.getLocation().getDataFileId() != location.getDataFileId()) {
+ this.store.referenceRemovedLocation(tx, location, job);
+ }
}
}
}
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 1a08931..f73b6a3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -397,22 +397,22 @@
Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
List<Integer> orphans = new ArrayList<Integer>();
while (removals.hasNext()) {
- boolean orphanedRemve = true;
+ boolean orphanedRemove = true;
Entry<Integer, List<Integer>> entry = removals.next();
// If this log is not a GC candidate then there's no need to do a check to rule it out
if (gcCandidateSet.contains(entry.getKey())) {
for (Integer addLocation : entry.getValue()) {
if (completeFileSet.contains(addLocation)) {
- orphanedRemve = false;
+ LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation);
+ orphanedRemove = false;
break;
}
}
// If it's not orphaned than we can't remove it, otherwise we
// stop tracking it it's log will get deleted on the next check.
- if (!orphanedRemve) {
- LOG.trace("A remove in log {} has an add still in existance.", entry.getKey());
+ if (!orphanedRemove) {
gcCandidateSet.remove(entry.getKey());
} else {
LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
index c013a4c..3685f34 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
@@ -29,6 +29,7 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -45,6 +46,10 @@
@Before
public void setUp() throws Exception {
+
+ // investigate gc issue - store usage not getting released
+ org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE);
+
File directory = new File("target/test/ScheduledJobsDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
@@ -80,7 +85,7 @@
}
@Test
- public void test() throws Exception {
+ public void testStoreCleanupLinear() throws Exception {
final int COUNT = 10;
final CountDownLatch latch = new CountDownLatch(COUNT);
scheduler.addListener(new JobListener() {
@@ -122,4 +127,39 @@
LOG.info("Number of journal log files: {}", getNumJournalFiles());
}
+
+ @Test
+ public void testColocatedAddRemoveCleanup() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ latch.countDown();
+ }
+ });
+
+ byte[] data = new byte[1024];
+ for (int i = 0; i < data.length; ++i) {
+ data[i] = (byte) (i % 256);
+ }
+
+ long time = TimeUnit.SECONDS.toMillis(2);
+ scheduler.schedule("Message-1", new ByteSequence(data), "", time, 0, 0);
+
+ assertTrue(latch.await(70, TimeUnit.SECONDS));
+ assertEquals(0, latch.getCount());
+
+ scheduler.schedule("Message-2", payload, "", time, 0, 0);
+ scheduler.schedule("Message-3", payload, "", time, 0, 0);
+
+ assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getNumJournalFiles() == 1;
+ }
+ }, TimeUnit.MINUTES.toMillis(2)));
+
+ LOG.info("Number of journal log files: {}", getNumJournalFiles());
+ }
}