JCR-3983 Possibility to parallelize the Garbage Collection

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/trunk@1746083 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java b/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
index ba574cf..634938f 100644
--- a/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
+++ b/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/gc/GarbageCollector.java
@@ -16,29 +16,15 @@
  */
 package org.apache.jackrabbit.core.gc;
 
-import org.apache.jackrabbit.api.management.DataStoreGarbageCollector;
-import org.apache.jackrabbit.api.management.MarkEventListener;
-import org.apache.jackrabbit.core.RepositoryContext;
-import org.apache.jackrabbit.core.SessionImpl;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.id.NodeId;
-import org.apache.jackrabbit.core.id.PropertyId;
-import org.apache.jackrabbit.core.observation.SynchronousEventListener;
-import org.apache.jackrabbit.core.persistence.IterablePersistenceManager;
-import org.apache.jackrabbit.core.persistence.util.NodeInfo;
-import org.apache.jackrabbit.core.state.ItemStateException;
-import org.apache.jackrabbit.core.state.NoSuchItemStateException;
-import org.apache.jackrabbit.core.state.NodeState;
-import org.apache.jackrabbit.core.state.PropertyState;
-import org.apache.jackrabbit.core.value.InternalValue;
-import org.apache.jackrabbit.spi.Name;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jcr.InvalidItemStateException;
@@ -57,6 +43,26 @@
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.ObservationManager;
 
+import org.apache.jackrabbit.api.management.DataStoreGarbageCollector;
+import org.apache.jackrabbit.api.management.MarkEventListener;
+import org.apache.jackrabbit.core.RepositoryContext;
+import org.apache.jackrabbit.core.SessionImpl;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.id.NodeId;
+import org.apache.jackrabbit.core.id.PropertyId;
+import org.apache.jackrabbit.core.observation.SynchronousEventListener;
+import org.apache.jackrabbit.core.persistence.IterablePersistenceManager;
+import org.apache.jackrabbit.core.persistence.PersistenceManager;
+import org.apache.jackrabbit.core.persistence.util.NodeInfo;
+import org.apache.jackrabbit.core.state.ItemStateException;
+import org.apache.jackrabbit.core.state.NoSuchItemStateException;
+import org.apache.jackrabbit.core.state.NodeState;
+import org.apache.jackrabbit.core.state.PropertyState;
+import org.apache.jackrabbit.core.value.InternalValue;
+import org.apache.jackrabbit.spi.Name;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Garbage collector for DataStore. This implementation iterates through all
  * nodes and reads the binary properties. To detect nodes that are moved while
@@ -80,6 +86,27 @@
  * </pre>
  */
 public class GarbageCollector implements DataStoreGarbageCollector {
+    
+	private class ScanNodeIdListTask implements Callable<Void> {
+
+        private int split;
+        private List<NodeId> nodeList;
+        private PersistenceManager pm;
+        private int pmCount;
+
+        public ScanNodeIdListTask(int split, List<NodeId> nodeList, PersistenceManager pm, int pmCount) {
+            this.split = split;
+            this.nodeList = nodeList;
+            this.pm = pm;
+            this.pmCount = pmCount;
+        }
+
+        public Void call() throws Exception {
+            scanNodeIdList(split, nodeList, pm, pmCount);
+            return null;
+        }
+
+    }
 
     /** logger instance */
     static final Logger LOG = LoggerFactory.getLogger(GarbageCollector.class);
@@ -99,6 +126,10 @@
 
     private long sleepBetweenNodes;
 
+    private long minSplitSize = 100000;
+
+    private int concurrentThreadSize = 3;
+
     protected int testDelay;
 
     private final DataStore store;
@@ -148,6 +179,22 @@
         return sleepBetweenNodes;
     }
 
+    public long getMinSplitSize() {
+        return minSplitSize;
+    }
+
+    public void setMinSplitSize(long minSplitSize) {
+        this.minSplitSize = minSplitSize;
+    }
+
+    public int getConcurrentThreadSize() {
+        return concurrentThreadSize;
+    }
+
+    public void setConcurrentThreadSize(int concurrentThreadSize) {
+        this.concurrentThreadSize = concurrentThreadSize;
+    }
+
     /**
      * When testing the garbage collection, a delay is used instead of simulating concurrent access.
      *
@@ -256,36 +303,83 @@
             pmCount++;
             List<NodeId> allNodeIds = pm.getAllNodeIds(null, 0);
             int overAllCount = allNodeIds.size();
-            int count = 0;
-            for (NodeId id : allNodeIds) {
-                count++;
-                if (count % 1000 == 0) {
-                    LOG.debug(pm.toString() + " ("+pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + overAllCount + "]...");
-                }
-                if (callback != null) {
-                    callback.beforeScanning(null);
-                }
+            if (overAllCount > minSplitSize) {
+                final int splits = getConcurrentThreadSize();
+                ExecutorService executorService = Executors.newFixedThreadPool(splits);
                 try {
-                    NodeState state = pm.load(id);
-                    Set<Name> propertyNames = state.getPropertyNames();
-                    for (Name name : propertyNames) {
-                        PropertyId pid = new PropertyId(id, name);
-                        PropertyState ps = pm.load(pid);
-                        if (ps.getType() == PropertyType.BINARY) {
-                            for (InternalValue v : ps.getValues()) {
-                                // getLength will update the last modified date
-                                // if the persistence manager scan is running
-                                v.getLength();
-                            }
-                        }
+                    Set<Future<Void>> futures = new HashSet<Future<Void>>();
+                    List<List<NodeId>> lists = splitIntoParts(allNodeIds, splits);
+                    LOG.debug(splits + " concurrent Threads will be started. Split Size: " + lists.get(0).size()+" Total Size: " + overAllCount);
+                    for (int i = 0; i < splits; i++) {
+                        List<NodeId> subList = lists.get(i);
+                        futures.add(executorService.submit(new ScanNodeIdListTask(i + 1, subList, pm, pmCount)));
                     }
-                } catch (NoSuchItemStateException e) {
-                    // the node may have been deleted or moved in the meantime
-                    // ignore it
+                    for (Future<Void> future : futures) {
+                        future.get();
+                    }
+                } catch (Exception e) {
+                    throw new RepositoryException(e);
+                } finally {
+                    executorService.shutdown();
                 }
+            } else {
+                scanNodeIdList(0, allNodeIds, pm, pmCount);
             }
         }
     }
+    
+    private void scanNodeIdList(int split, List<NodeId> nodeList, PersistenceManager pm, int pmCount) throws RepositoryException, ItemStateException {
+        int count = 0;
+        for (NodeId id : nodeList) {
+            count++;
+            if (count % 1000 == 0) {
+                if (split > 0) {
+                    LOG.debug("[Split " + split + "] " + pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]...");
+                } else {
+                    LOG.debug(pm.toString() + " (" + pmCount + "/" + pmList.length + "): analyzed " + count + " nodes [" + nodeList.size() + "]...");
+                }
+            }
+            if (callback != null) {
+                callback.beforeScanning(null);
+            }
+            try {
+                NodeState state = pm.load(id);
+                Set<Name> propertyNames = state.getPropertyNames();
+                for (Name name : propertyNames) {
+                    PropertyId pid = new PropertyId(id, name);
+                    PropertyState ps = pm.load(pid);
+                    if (ps.getType() == PropertyType.BINARY) {
+                        for (InternalValue v : ps.getValues()) {
+                            // getLength will update the last modified date
+                            // if the persistence manager scan is running
+                            v.getLength();
+                        }
+                    }
+                }
+            } catch (NoSuchItemStateException e) {
+                // the node may have been deleted or moved in the meantime
+                // ignore it
+            }
+        }
+    }
+
+    private <T> List<List<T>> splitIntoParts(List<T> ls, int parts) {
+        final List<List<T>> listParts = new ArrayList<List<T>>();
+        final int chunkSize = ls.size() / parts;
+        int leftOver = ls.size() % parts;
+        int iTake = chunkSize;
+
+        for (int i = 0, iT = ls.size(); i < iT; i += iTake) {
+            if (leftOver > 0) {
+                leftOver--;
+                iTake = chunkSize + 1;
+            } else {
+                iTake = chunkSize;
+            }
+            listParts.add(new ArrayList<T>(ls.subList(i, Math.min(iT, i + iTake))));
+        }
+        return listParts;
+    }
 
     /**
      * Reset modifiedDateOnAccess to 0 and stop the observation