Merge pull request #619 from keyurkarnik/keyurkarnik_akka
Fixed akka cluster issue to support more than 2 regions
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index b07d09d..9a67945 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -59,7 +60,6 @@
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
@@ -78,6 +78,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -89,7 +90,8 @@
public class Export extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( Export.class );
- public static final String LAST_ID = "lastId";
+ private static final String ENTITY_FETCHER_THREADS = "entityFetchThreads";
+ private static final String ENTITY_MEMBER_FETCHER_MULT = "entityThreadMult";
@Autowired
@@ -100,11 +102,23 @@
private AllEntityIdsObservable allEntityIdsObs;
private SimpleEdge lastEdge = null;
+ //number of threads for fetching entity contents. Each thread will handle a batch of 1000 entity ids
+ private int entityFetcherThreads = 50;
+ //after an individual entity is fetched, the entity members like assets, connections etc need to be fetched
+ //depending on how heavy the assets/connections might be, we might need to multiply the factor so that more threads are allocated
+ //for pulling the members quickly without the queue backing up.
+ private int entityMemberFetcherMultiplier = 1;
+
+
//TODO : Add blocking queues for these executors where appropriate
- private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3);
- private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
- private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10);
- private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10);
+ private ExecutorService orgAppCollParallelizer;
+
+ //fetches the entity content
+ private ExecutorService entityFetcher;
+ //fetches the entity members like connections etc for a given entity
+ private ExecutorService entityMemberFetcher;
+ //fetches the assets for a given entity
+ private ExecutorService assetsFetcher;
@Override
@@ -113,13 +127,70 @@
Options options = super.createOptions();
- Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
- .withDescription( "Last Entity Id to resume from" ).create( LAST_ID );
- options.addOption( lastId);
+
+ Option entityFetcherThreads = OptionBuilder.withArgName( ENTITY_FETCHER_THREADS ).hasArg()
+ .withDescription( "Number of threads to fetch entities in parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS );
+ options.addOption( entityFetcherThreads);
+
+ Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( ENTITY_MEMBER_FETCHER_MULT ).hasArg()
+ .withDescription( "This defines the number of threads for fetching entity members like assets/collections by multiplying the number of entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT );
+ options.addOption( entityMemberFetcherMultiplier);
return options;
}
+
+ @Override
+ protected void validateOptions(CommandLine line) throws MissingOptionException {
+ super.validateOptions(line);
+ if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+ try {
+ Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+ } catch (NumberFormatException e) {
+ throw new MissingOptionException("Entity fetcher threads need to be a positive integer");
+ }
+ }
+
+ if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+ try {
+ Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+ } catch (NumberFormatException e) {
+ throw new MissingOptionException("Entity member thread multiplier needs to be a positive integer");
+ }
+ }
+
+ }
+
+ @Override
+ protected void applyExportParams(CommandLine line) {
+
+ super.applyExportParams(line);
+
+ if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+ entityFetcherThreads = Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+
+ if (entityFetcherThreads < 1) {
+ entityFetcherThreads = 50;
+ }
+ }
+
+ if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+ entityMemberFetcherMultiplier = Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+
+ if (entityMemberFetcherMultiplier < 1) {
+ entityMemberFetcherMultiplier = 1;
+ }
+ if (entityMemberFetcherMultiplier > 5) {
+ entityMemberFetcherMultiplier = 5;
+ }
+ }
+
+ orgAppCollParallelizer = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build());
+ entityFetcher = Executors.newFixedThreadPool(entityFetcherThreads, new ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build());
+ entityMemberFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build());
+ assetsFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build());
+
+ }
@Override
public void runTool( CommandLine line ) throws Exception {
@@ -432,7 +503,7 @@
ConnectableObservable<Results> entityObs = Observable.just(entities)
.publish();
- entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+ entityObs.subscribeOn(Schedulers.from(entityMemberFetcher));
// fetch and write connections