Merge pull request #619 from keyurkarnik/keyurkarnik_akka
Fixed akka cluster issue to support more than 2 regions
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index 322ac6a..ebe8164 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -74,6 +74,14 @@
* Publish message to all topic subscribers in all regions.
*/
void publishToAllRegions( String topic, Object message, ActorRef sender );
+ /**
+ * Publish message to all topic subscribers in local region only.
+ */
+ void publishToLocalRegion( String topic, Object message, ActorRef sender );
+ /**
+ * Publish message to all topic subscribers in remote regions only.
+ */
+ void publishToRemoteRegions( String topic, Object message, ActorRef sender );
void leaveCluster();
}
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index ea9ada8..27c90cc 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -152,9 +152,21 @@
@Override
public void publishToAllRegions( String topic, Object message, ActorRef sender ) {
+ publishToLocalRegion(topic, message, sender);
+ publishToRemoteRegions(topic, message, sender);
+ }
+
+ @Override
+ public void publishToLocalRegion( String topic, Object message, ActorRef sender ) {
+
// send to local subscribers to topic
mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender );
+ }
+
+ @Override
+ public void publishToRemoteRegions( String topic, Object message, ActorRef sender ) {
+
// send to each ClusterClient
for ( ActorRef clusterClient : clusterClientsByRegion.values() ) {
clusterClient.tell( new ClusterClient.Publish( topic, message ), sender );
@@ -423,7 +435,7 @@
}
ActorRef clusterClient = system.actorOf( ClusterClient.props(
- ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client");
+ ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client-"+region);
clusterClientsByRegion.put( region, clusterClient );
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index 93b6ddb..5a71305 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -38,14 +38,17 @@
private final ActorSystemManager actorSystemManager;
private final UniqueValuesTable table;
+
+ private final UniqueValuesFig uniqueValuesFig;
private int count = 0;
@Inject
- public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager ) {
+ public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager, UniqueValuesFig uniqueValuesFig) {
- this.table = table;
+ this.uniqueValuesFig = uniqueValuesFig;
+ this.table = table;
this.actorSystemManager = actorSystemManager;
}
@@ -86,8 +89,12 @@
getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
getSender() );
-
- actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
+
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", new Reservation( res ), getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
+ }
} catch (Throwable t) {
@@ -111,14 +118,24 @@
// cannot reserve, somebody else owns the unique value
Response response = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey());
getSender().tell( response, getSender() );
- actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+ }
return;
} else if ( owner == null ) {
// cannot commit without first reserving
Response response = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey());
getSender().tell( response, getSender() );
- actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+ }
+
return;
}
@@ -127,7 +144,11 @@
Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() );
getSender().tell( response, getSender() );
- actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", response, getSelf() );
+ }
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ),
@@ -158,7 +179,11 @@
getSender() );
// unique value record may have already been cleaned up, also clear cache
- actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
+ }
return;
}
@@ -170,7 +195,11 @@
getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ),
getSender() );
- actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
+ if(uniqueValuesFig.getSkipRemoteRegions()) {
+ actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
+ } else {
+ actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
+ }
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ),
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
index 0134779..ab0f4a4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
@@ -29,6 +29,8 @@
public interface UniqueValuesFig extends GuicyFig, Serializable {
String UNIQUEVALUE_USE_CLUSTER = "collection.uniquevalues.usecluster";
+
+ String UNIQUEVALUE_SKIP_REMOTE_REGIONS = "collection.uniquevalues.skip.remote.regions";
String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors";
@@ -41,14 +43,25 @@
String UNIQUEVALUE_REQUEST_TIMEOUT = "collection.uniquevalues.request.timeout";
String UNIQUEVALUE_REQUEST_RETRY_COUNT = "collection.uniquevalues.request.retrycount";
+
+
/**
- * Tells Usergrid whether or not to use the Akka Cluster sytem to verify unique values ( more consistent)
+ * Tells Usergrid whether or not to use the Akka Cluster system to verify unique values ( more consistent)
+ * Setting this to false by default to avoid extra complications by default.
*/
@Key(UNIQUEVALUE_USE_CLUSTER)
- @Default("true")
+ @Default("false")
boolean getUnqiueValueViaCluster();
+
+ /**
+ * Tells Usergrid to restrict UniqueValue related chatter to local Akka Cluster only. Skips remote regions
+ * Setting this to true by default to avoid extra complications by default.
+ */
+ @Key(UNIQUEVALUE_SKIP_REMOTE_REGIONS)
+ @Default("true")
+ boolean getSkipRemoteRegions();
/**
* Unique Value cache TTL in seconds.