JCR-3983 Possibility to parallelize the Garbage Collection
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/trunk@1746083 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java b/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
index ba574cf..634938f 100644
--- a/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
+++ b/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
@@ -16,29 +16,15 @@
*/
package org.apache.jackrabbit.core.gc;
-import org.apache.jackrabbit.api.management.DataStoreGarbageCollector;
-import org.apache.jackrabbit.api.management.MarkEventListener;
-import org.apache.jackrabbit.core.RepositoryContext;
-import org.apache.jackrabbit.core.SessionImpl;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.id.NodeId;
-import org.apache.jackrabbit.core.id.PropertyId;
-import org.apache.jackrabbit.core.observation.SynchronousEventListener;
-import org.apache.jackrabbit.core.persistence.IterablePersistenceManager;
-import org.apache.jackrabbit.core.persistence.util.NodeInfo;
-import org.apache.jackrabbit.core.state.ItemStateException;
-import org.apache.jackrabbit.core.state.NoSuchItemStateException;
-import org.apache.jackrabbit.core.state.NodeState;
-import org.apache.jackrabbit.core.state.PropertyState;
-import org.apache.jackrabbit.core.value.InternalValue;
-import org.apache.jackrabbit.spi.Name;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jcr.InvalidItemStateException;
@@ -57,6 +43,26 @@
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.ObservationManager;
+import org.apache.jackrabbit.api.management.DataStoreGarbageCollector;
+import org.apache.jackrabbit.api.management.MarkEventListener;
+import org.apache.jackrabbit.core.RepositoryContext;
+import org.apache.jackrabbit.core.SessionImpl;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.id.NodeId;
+import org.apache.jackrabbit.core.id.PropertyId;
+import org.apache.jackrabbit.core.observation.SynchronousEventListener;
+import org.apache.jackrabbit.core.persistence.IterablePersistenceManager;
+import org.apache.jackrabbit.core.persistence.PersistenceManager;
+import org.apache.jackrabbit.core.persistence.util.NodeInfo;
+import org.apache.jackrabbit.core.state.ItemStateException;
+import org.apache.jackrabbit.core.state.NoSuchItemStateException;
+import org.apache.jackrabbit.core.state.NodeState;
+import org.apache.jackrabbit.core.state.PropertyState;
+import org.apache.jackrabbit.core.value.InternalValue;
+import org.apache.jackrabbit.spi.Name;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Garbage collector for DataStore. This implementation iterates through all
* nodes and reads the binary properties. To detect nodes that are moved while
@@ -80,6 +86,27 @@
* </pre>
*/
public class GarbageCollector implements DataStoreGarbageCollector {
+
+ private class ScanNodeIdListTask implements Callable<Void> {
+
+ private int split;
+ private List<NodeId> nodeList;
+ private PersistenceManager pm;
+ private int pmCount;
+
+ public ScanNodeIdListTask(int split, List<NodeId> nodeList, PersistenceManager pm, int pmCount) {
+ this.split = split;
+ this.nodeList = nodeList;
+ this.pm = pm;
+ this.pmCount = pmCount;
+ }
+
+ public Void call() throws Exception {
+ scanNodeIdList(split, nodeList, pm, pmCount);
+ return null;
+ }
+
+ }
/** logger instance */
static final Logger LOG = LoggerFactory.getLogger(GarbageCollector.class);
@@ -99,6 +126,10 @@
private long sleepBetweenNodes;
+ private long minSplitSize = 100000;
+
+ private int concurrentThreadSize = 3;
+
protected int testDelay;
private final DataStore store;
@@ -148,6 +179,22 @@
return sleepBetweenNodes;
}
+ public long getMinSplitSize() {
+ return minSplitSize;
+ }
+
+ public void setMinSplitSize(long minSplitSize) {
+ this.minSplitSize = minSplitSize;
+ }
+
+ public int getConcurrentThreadSize() {
+ return concurrentThreadSize;
+ }
+
+ public void setConcurrentThreadSize(int concurrentThreadSize) {
+ this.concurrentThreadSize = concurrentThreadSize;
+ }
+
/**
* When testing the garbage collection, a delay is used instead of simulating concurrent access.
*
@@ -256,36 +303,83 @@
pmCount++;
List<NodeId> allNodeIds = pm.getAllNodeIds(null, 0);
int overAllCount = allNodeIds.size();
- int count = 0;
- for (NodeId id : allNodeIds) {
- count++;
- if (count % 1000 == 0) {
- LOG.debug(pm.toString() + " ("+pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + overAllCount + "]...");
- }
- if (callback != null) {
- callback.beforeScanning(null);
- }
+ if (overAllCount > minSplitSize) {
+ final int splits = getConcurrentThreadSize();
+ ExecutorService executorService = Executors.newFixedThreadPool(splits);
try {
- NodeState state = pm.load(id);
- Set<Name> propertyNames = state.getPropertyNames();
- for (Name name : propertyNames) {
- PropertyId pid = new PropertyId(id, name);
- PropertyState ps = pm.load(pid);
- if (ps.getType() == PropertyType.BINARY) {
- for (InternalValue v : ps.getValues()) {
- // getLength will update the last modified date
- // if the persistence manager scan is running
- v.getLength();
- }
- }
+ Set<Future<Void>> futures = new HashSet<Future<Void>>();
+ List<List<NodeId>> lists = splitIntoParts(allNodeIds, splits);
+ LOG.debug(splits + " concurrent Threads will be started. Split Size: " + lists.get(0).size()+" Total Size: " + overAllCount);
+ for (int i = 0; i < splits; i++) {
+ List<NodeId> subList = lists.get(i);
+ futures.add(executorService.submit(new ScanNodeIdListTask(i + 1, subList, pm, pmCount)));
}
- } catch (NoSuchItemStateException e) {
- // the node may have been deleted or moved in the meantime
- // ignore it
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (Exception e) {
+ throw new RepositoryException(e);
+ } finally {
+ executorService.shutdown();
}
+ } else {
+ scanNodeIdList(0, allNodeIds, pm, pmCount);
}
}
}
+
+ private void scanNodeIdList(int split, List<NodeId> nodeList, PersistenceManager pm, int pmCount) throws RepositoryException, ItemStateException {
+ int count = 0;
+ for (NodeId id : nodeList) {
+ count++;
+ if (count % 1000 == 0) {
+ if (split > 0) {
+ LOG.debug("[Split " + split + "] " + pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]...");
+ } else {
+ LOG.debug(pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]...");
+ }
+ }
+ if (callback != null) {
+ callback.beforeScanning(null);
+ }
+ try {
+ NodeState state = pm.load(id);
+ Set<Name> propertyNames = state.getPropertyNames();
+ for (Name name : propertyNames) {
+ PropertyId pid = new PropertyId(id, name);
+ PropertyState ps = pm.load(pid);
+ if (ps.getType() == PropertyType.BINARY) {
+ for (InternalValue v : ps.getValues()) {
+ // getLength will update the last modified date
+ // if the persistence manager scan is running
+ v.getLength();
+ }
+ }
+ }
+ } catch (NoSuchItemStateException e) {
+ // the node may have been deleted or moved in the meantime
+ // ignore it
+ }
+ }
+ }
+
+ private <T> List<List<T>> splitIntoParts(List<T> ls, int parts) {
+ final List<List<T>> listParts = new ArrayList<List<T>>();
+ final int chunkSize = ls.size() / parts;
+ int leftOver = ls.size() % parts;
+ int iTake = chunkSize;
+
+ for (int i = 0, iT = ls.size(); i < iT; i += iTake) {
+ if (leftOver > 0) {
+ leftOver--;
+ iTake = chunkSize + 1;
+ } else {
+ iTake = chunkSize;
+ }
+ listParts.add(new ArrayList<T>(ls.subList(i, Math.min(iT, i + iTake))));
+ }
+ return listParts;
+ }
/**
* Reset modifiedDateOnAccess to 0 and stop the observation