SCB-1508 Omega uses the onGetServerMeta method to get the Alpha configuration
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml b/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml
index e9008cb..dde0a3b 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml
@@ -185,7 +185,6 @@
-Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true
-javaagent:/maven/saga/byteman.jar=port:9092,address:0.0.0.0,listener:true
</JAVA_OPTS>
- <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled>
</env>
<wait>
<log>Started [a-zA-Z]+ in [0-9.]+ seconds</log>
@@ -218,7 +217,6 @@
-Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true
-javaagent:/maven/saga/byteman.jar=port:9093,address:0.0.0.0,listener:true
</JAVA_OPTS>
- <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled>
</env>
<wait>
<log>Started [a-zA-Z]+ in [0-9.]+ seconds</log>
@@ -251,7 +249,6 @@
-Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true
-javaagent:/maven/saga/byteman.jar=port:9091,address:0.0.0.0,listener:true
</JAVA_OPTS>
- <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled>
</env>
<wait>
<log>Started [a-zA-Z]+ in [0-9.]+ seconds</log>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index ea82b4d..ce9f688 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -35,6 +35,8 @@
import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
import org.apache.servicecomb.pack.alpha.server.tcc.service.TccEventScanner;
import org.apache.servicecomb.pack.alpha.server.tcc.service.TccTxEventService;
+import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -153,8 +155,10 @@
ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus) throws IOException {
+ ServerMeta serverMeta = ServerMeta.newBuilder()
+ .putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(false)).build();
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
- new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), grpcTccEventService);
+ new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, serverMeta), grpcTccEventService);
new Thread(bootstrap::start).start();
tccPendingTaskRunner.start();
tccEventScanner.start();
@@ -171,8 +175,10 @@
ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
+ ServerMeta serverMeta = ServerMeta.newBuilder()
+ .putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(true)).build();
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
- new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService);
+ new GrpcSagaEventService(actorEventChannel, omegaCallbacks, serverMeta), grpcTccEventService);
new Thread(bootstrap::start).start();
tccPendingTaskRunner.start();
tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
index 73f4419..210dbd9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
@@ -33,6 +33,7 @@
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
import io.grpc.stub.StreamObserver;
@@ -45,11 +46,13 @@
private final TxConsistentService txConsistentService;
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+ private final ServerMeta serverMeta;
GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks, ServerMeta serverMeta) {
this.txConsistentService = txConsistentService;
this.omegaCallbacks = omegaCallbacks;
+ this.serverMeta = serverMeta;
}
@Override
@@ -93,4 +96,10 @@
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
}
+
+ @Override
+ public void onGetServerMeta(GrpcServiceConfig request, StreamObserver<ServerMeta> responseObserver){
+ responseObserver.onNext(this.serverMeta);
+ responseObserver.onCompleted();
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index f711670..6c23ace 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -32,6 +32,7 @@
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,13 @@
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
private final ActorEventChannel actorEventChannel;
+ private final ServerMeta serverMeta;
public GrpcSagaEventService(ActorEventChannel actorEventChannel,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks, ServerMeta serverMeta) {
this.actorEventChannel = actorEventChannel;
this.omegaCallbacks = omegaCallbacks;
+ this.serverMeta = serverMeta;
}
@Override
@@ -150,4 +153,10 @@
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
}
+
+ @Override
+ public void onGetServerMeta(GrpcServiceConfig request, StreamObserver<ServerMeta> responseObserver){
+ responseObserver.onNext(this.serverMeta);
+ responseObserver.onCompleted();
+ }
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
index fa501c3..eef73f8 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
@@ -30,6 +30,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.protobuf.ByteString;
@@ -48,10 +49,12 @@
import org.apache.servicecomb.pack.alpha.core.*;
import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.common.AlphaMetaKeys;
import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
@@ -170,6 +173,12 @@
}
@Test
+ public void serverMetaTest(){
+ ServerMeta serverMeta = blockingStub.onGetServerMeta(serviceConfig);
+ assertEquals(Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name())),false);
+ }
+
+ @Test
public void persistsEvent() {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index e76eadd..15fedc3 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -36,6 +36,8 @@
import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -106,6 +108,12 @@
}
@Test
+ public void serverMetaTest(){
+ ServerMeta serverMeta = omegaEventSender.onGetServerMeta();
+ assertEquals(Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name())),true);
+ }
+
+ @Test
public void successfulTest() {
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
index fe1f965..68d8501 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
@@ -29,6 +29,7 @@
import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
@@ -72,6 +73,10 @@
blockingStub.onDisconnected(serviceConfig);
}
+ public ServerMeta onGetServerMeta(){
+ return blockingStub.onGetServerMeta(serviceConfig);
+ }
+
public void setOmegaCallbacks(
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
this.omegaCallbacks = omegaCallbacks;
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
index 9284323..ec8c624 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
@@ -18,6 +18,8 @@
package org.apache.servicecomb.pack.omega.spring;
import com.google.common.collect.ImmutableList;
+import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterDiscovery;
import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterConfig;
import org.apache.servicecomb.pack.omega.connector.grpc.core.FastestSender;
@@ -55,8 +57,6 @@
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- @Value("${alpha.feature.akka.enabled:false}")
- private boolean alphaFeatureAkkaEnabled;
@Bean(name = {"omegaUniqueIdGenerator"})
IdGenerator<String> idGenerator() {
@@ -64,9 +64,10 @@
}
@Bean
- OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator<String> idGenerator) {
- LOG.info("alpha.feature.akka.enabled={}",alphaFeatureAkkaEnabled);
- return new OmegaContext(idGenerator,alphaFeatureAkkaEnabled);
+ OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator<String> idGenerator, SagaMessageSender messageSender) {
+ ServerMeta serverMeta = messageSender.onGetServerMeta();
+ boolean akkaEnabeld = Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name()));
+ return new OmegaContext(idGenerator,akkaEnabeld);
}
@Bean(name = {"compensationContext"})