Merge pull request #619 from keyurkarnik/keyurkarnik_akka

Fixed akka cluster issue to support more than 2 regions
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index b07d09d..9a67945 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -41,6 +41,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -59,7 +60,6 @@
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
@@ -78,6 +78,7 @@
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.base.Optional;
 import com.google.common.collect.BiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
@@ -89,7 +90,8 @@
 public class Export extends ExportingToolBase {
 
     static final Logger logger = LoggerFactory.getLogger( Export.class );
-    public static final String LAST_ID = "lastId";
+	private static final String ENTITY_FETCHER_THREADS = "entityFetchThreads";
+	private static final String ENTITY_MEMBER_FETCHER_MULT = "entityThreadMult";
     
     
     @Autowired
@@ -100,11 +102,23 @@
     private AllEntityIdsObservable allEntityIdsObs;
     private SimpleEdge lastEdge = null;
     
+    //number of threads for fetching entity contents. Each thread will handle a batch of 1000 entity ids
+    private int entityFetcherThreads = 50;
+    //after an individual entity is fetched, the entity members like assets, connections etc need to be fetched
+    //depending on how heavy the assets/connections might be, we might need to multiply the factor so that more threads are allocated
+    //for  pulling the members quickly without the queue backing up.
+    private int entityMemberFetcherMultiplier = 1;
+    
+    
     //TODO : Add blocking queues for these executors where appropriate
-    private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3);
-    private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
-	private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10);
-	private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10);
+    private ExecutorService orgAppCollParallelizer;
+    
+    //fetches the entity content
+    private ExecutorService entityFetcher;
+    //fetches the entity members like connections etc for a given entity
+	private ExecutorService entityMemberFetcher;
+	//fetches the assets for a given entity
+	private ExecutorService assetsFetcher;
 	
 
     @Override
@@ -113,13 +127,70 @@
   
     	Options options = super.createOptions();
     	
-    	Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
-                .withDescription( "Last Entity Id to resume from" ).create( LAST_ID );
-    	options.addOption( lastId);
+    	
+    	Option entityFetcherThreads = OptionBuilder.withArgName( ENTITY_FETCHER_THREADS ).hasArg()
+                .withDescription( "Number of threads to fetch entities in parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS );
+    	options.addOption( entityFetcherThreads);
+    	
+    	Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( ENTITY_MEMBER_FETCHER_MULT ).hasArg()
+                .withDescription( "This defines the number of threads for fetching entity members like assets/collections by multiplying the number of entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT );
+    	options.addOption( entityMemberFetcherMultiplier);
     	
     	return options;
     }
+    
+    @Override
+	protected void validateOptions(CommandLine line) throws MissingOptionException {
+		super.validateOptions(line);
 
+		if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+			try {
+				Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+			} catch (NumberFormatException e) {
+				throw new MissingOptionException("Entity fetcher threads need to be a positive integer");
+			}
+		}
+
+		if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+			try {
+				Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+			} catch (NumberFormatException e) {
+				throw new MissingOptionException("Entity member thread multiplier needs to be a positive integer");
+			}
+		}
+
+	}
+    
+    @Override
+	protected void applyExportParams(CommandLine line) {
+
+		super.applyExportParams(line);
+
+		if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+			entityFetcherThreads = Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+
+			if (entityFetcherThreads < 1) {
+				entityFetcherThreads = 50;
+			}
+		}
+
+		if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+			entityMemberFetcherMultiplier = Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+
+			if (entityMemberFetcherMultiplier < 1) {
+				entityMemberFetcherMultiplier = 1;
+			}
+			if (entityMemberFetcherMultiplier > 5) {
+				entityMemberFetcherMultiplier = 5;
+			}
+		}
+
+		orgAppCollParallelizer = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build());
+		entityFetcher = Executors.newFixedThreadPool(entityFetcherThreads, new ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build());
+		entityMemberFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build());
+		assetsFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build());
+
+	}
     
     @Override
     public void runTool( CommandLine line ) throws Exception {
@@ -432,7 +503,7 @@
 							
 							ConnectableObservable<Results> entityObs = Observable.just(entities)
 									.publish();
-							entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+							entityObs.subscribeOn(Schedulers.from(entityMemberFetcher));
 	
 							
 							// fetch and write connections