SCB-1577 Refactoring RPC connections
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
index 210dbd9..6cc6dca 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
@@ -22,6 +22,7 @@
import static java.util.Collections.emptyMap;
+import java.lang.invoke.MethodHandles;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,9 +38,12 @@
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
@@ -56,10 +60,29 @@
}
@Override
- public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
- omegaCallbacks
- .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
- .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+ public StreamObserver<GrpcServiceConfig> onConnected(StreamObserver<GrpcCompensateCommand> responseObserver) {
+ return new StreamObserver<GrpcServiceConfig>() {
+ @Override
+ public void onNext(GrpcServiceConfig grpcServiceConfig) {
+ omegaCallbacks
+ .computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
+ .put(grpcServiceConfig.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+ // Respond to Omega connection request
+ responseObserver.onNext(GrpcCompensateCommand.newBuilder()
+ .setConnectedResponse(true)
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.error(throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ // Do nothing here
+ }
+ };
}
// TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 6c23ace..6c80fd0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -54,11 +54,32 @@
}
@Override
- public void onConnected(
- GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
- omegaCallbacks
- .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
- .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+ public StreamObserver<GrpcServiceConfig> onConnected(StreamObserver<GrpcCompensateCommand> responseObserver) {
+ return new StreamObserver<GrpcServiceConfig>(){
+
+ private GrpcOmegaCallback grpcOmegaCallback;
+
+ @Override
+ public void onNext(GrpcServiceConfig grpcServiceConfig) {
+ grpcOmegaCallback = new GrpcOmegaCallback(responseObserver);
+ omegaCallbacks
+ .computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
+ .put(grpcServiceConfig.getInstanceId(), grpcOmegaCallback);
+ responseObserver.onNext(GrpcCompensateCommand.newBuilder()
+ .setConnectedResponse(true)
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.error(throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ // Do nothing here
+ }
+ };
}
// TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
index eef73f8..23b41f7 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
@@ -140,7 +140,7 @@
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDown() {
clientChannel.shutdown();
clientChannel = null;
}
@@ -154,7 +154,7 @@
}
@After
- public void after() throws Exception {
+ public void after() {
blockingStub.onDisconnected(serviceConfig);
deleteAllTillSuccessful();
}
@@ -180,7 +180,7 @@
@Test
public void persistsEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -201,7 +201,7 @@
@Test
public void closeStreamOnDisconnected() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
@@ -219,12 +219,13 @@
@Test
public void closeStreamOfDisconnectedClientOnly() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
- TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+ TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
+ otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -240,8 +241,8 @@
}
@Test
- public void removeCallbackOnClientDown() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void removeCallbackOnClientDown() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -253,8 +254,8 @@
}
@Test
- public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void compensateImmediatelyWhenGlobalTxAlreadyAborted() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
@@ -272,7 +273,7 @@
@Test
public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
@@ -295,7 +296,7 @@
@Test
public void getCompensateCommandOnFailure() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -313,13 +314,14 @@
@Test
public void compensateOnlyFailedGlobalTransaction() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
// simulates connection from another service with different globalTxId
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
- TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
+ TxEventServiceStub otherAsyncStub =TxEventServiceGrpc.newStub(clientChannel);
+ otherAsyncStub.onConnected(new CompensationStreamObserver()).onNext(anotherServiceConfig);
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -337,7 +339,7 @@
@Test
public void doNotStartSubTxOnFailure() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -356,7 +358,7 @@
@Test
public void compensateOnlyCompletedTransactions() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -378,8 +380,8 @@
}
@Test
- public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void sagaEndedEventIsAlwaysInTheEnd() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -397,7 +399,7 @@
@Test
public void abortTimeoutSagaStartedEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
@@ -420,7 +422,7 @@
@Test
public void abortTimeoutTxStartedEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
@@ -455,8 +457,8 @@
}
@Test
- public void doNotCompensateRetryingEvents() throws InterruptedException {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void doNotCompensateRetryingEvents() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
@@ -584,9 +586,12 @@
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
- consumer.accept(command);
- receivedCommands.add(command);
- receivedCommandsCounter.incrementAndGet();
+ if(!command.getConnectedResponse()){
+ // ignore the connection response
+ consumer.accept(command);
+ receivedCommands.add(command);
+ receivedCommandsCounter.incrementAndGet();
+ }
}
@Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index 899a4fb..d3556db 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -164,7 +164,7 @@
@Test
public void persistsEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -185,7 +185,7 @@
@Test
public void closeStreamOnDisconnected() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
@@ -203,12 +203,13 @@
@Test
public void closeStreamOfDisconnectedClientOnly() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
- TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+ TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
+ otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -224,8 +225,8 @@
}
@Test
- public void removeCallbackOnClientDown() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void removeCallbackOnClientDown() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -237,8 +238,8 @@
}
@Test
- public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void compensateImmediatelyWhenGlobalTxAlreadyAborted() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
@@ -256,7 +257,7 @@
@Test
public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
@@ -267,7 +268,7 @@
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
+ await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
assertThat(receivedCommands, contains(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -279,7 +280,7 @@
@Test
public void getCompensateCommandOnFailure() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -297,13 +298,14 @@
@Test
public void compensateOnlyFailedGlobalTransaction() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
// simulates connection from another service with different globalTxId
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
- TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
+ TxEventServiceStub otherAsyncStub =TxEventServiceGrpc.newStub(clientChannel);
+ otherAsyncStub.onConnected(new CompensationStreamObserver()).onNext(anotherServiceConfig);
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -321,14 +323,14 @@
@Test
public void doNotStartSubTxOnFailure() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
+ await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
String localTxId1 = UUID.randomUUID().toString();
String parentTxId1 = UUID.randomUUID().toString();
@@ -340,7 +342,7 @@
@Test
public void compensateOnlyCompletedTransactions() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -362,8 +364,8 @@
}
@Test
- public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void sagaEndedEventIsAlwaysInTheEnd() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -381,7 +383,7 @@
@Test
public void abortTimeoutSagaStartedEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
@@ -404,7 +406,7 @@
@Test
public void abortTimeoutTxStartedEvent() {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
@@ -437,8 +439,8 @@
}
@Test
- public void doNotCompensateRetryingEvents() throws InterruptedException {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ public void doNotCompensateRetryingEvents() {
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
@@ -566,9 +568,12 @@
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
- consumer.accept(command);
- receivedCommands.add(command);
- receivedCommandsCounter.incrementAndGet();
+ if(!command.getConnectedResponse()){
+ // ignore the connection response
+ consumer.accept(command);
+ receivedCommands.add(command);
+ receivedCommandsCounter.incrementAndGet();
+ }
}
@Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
index 68d8501..90bea6a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
@@ -64,7 +64,7 @@
.setServiceName(serviceName)
.setInstanceId(instanceId)
.build();
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
omegaEventSagaSimulator = OmegaEventSagaSimulator.builder().serviceName(serviceName).instanceId(instanceId).build();
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
index 9fb1270..68d1592 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
@@ -20,6 +20,7 @@
import io.grpc.stub.StreamObserver;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
import org.apache.servicecomb.pack.omega.transaction.MessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,8 @@
private final MessageSender messageSender;
+ private final CountDownLatch latch = new CountDownLatch(1);
+
public ReconnectStreamObserver(
LoadBalanceContext loadContext, MessageSender messageSender) {
this.loadContext = loadContext;
@@ -42,10 +45,23 @@
public void onError(Throwable t) {
LOG.error("Failed to process grpc coordinate command.", t);
loadContext.getGrpcOnErrorHandler().handle(messageSender);
+ cancelWait();
}
@Override
public void onCompleted() {
// Do nothing here
}
+
+ public void cancelWait(){
+ latch.countDown();
+ }
+
+ public void waitConnected() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
index 0cf9f42..373250d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
@@ -49,11 +49,16 @@
LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
- messageHandler.onReceive(
- command.getGlobalTxId(),
- command.getLocalTxId(),
- command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
- command.getCompensationMethod(),
- deserializer.deserialize(command.getPayloads().toByteArray()));
+ if(command.getConnectedResponse()){
+ // Wait until you receive an Alpha connection response
+ cancelWait();
+ }else{
+ messageHandler.onReceive(
+ command.getGlobalTxId(),
+ command.getLocalTxId(),
+ command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+ command.getCompensationMethod(),
+ deserializer.deserialize(command.getPayloads().toByteArray()));
+ }
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index 2e81ecc..0ec9619 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -19,6 +19,8 @@
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.omega.connector.grpc.core.LoadBalanceContext;
import org.apache.servicecomb.pack.omega.context.ServiceConfig;
@@ -35,8 +37,11 @@
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GrpcSagaClientMessageSender implements SagaMessageSender {
+
private final String target;
private final TxEventServiceStub asyncEventService;
@@ -68,7 +73,8 @@
@Override
public void onConnected() {
- asyncEventService.onConnected(serviceConfig, compensateStreamObserver);
+ asyncEventService.onConnected(compensateStreamObserver).onNext(serviceConfig);
+ compensateStreamObserver.waitConnected();
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
index 6873661..2f5813d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
@@ -131,7 +131,7 @@
}
@Test
- public void broadcastConnectionAndDisconnection() throws Exception {
+ public void broadcastConnectionAndDisconnection() {
messageSender.onConnected();
await().atMost(1, SECONDS).until( new Callable<Boolean>() {
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index f29a203..3c00030 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -108,7 +108,7 @@
protected final SagaLoadBalanceSender messageSender = newMessageSender(addresses);
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDown() {
for(Server server: servers.values()) {
server.shutdown();
}
@@ -117,7 +117,7 @@
protected abstract SagaLoadBalanceSender newMessageSender(String[] addresses);
@After
- public void after() throws Exception {
+ public void after() {
messageSender.onDisconnected();
messageSender.close();
for (Queue<TxEvent> queue :eventsMap.values()) {
@@ -142,9 +142,26 @@
}
@Override
- public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+ public StreamObserver<GrpcServiceConfig> onConnected(final StreamObserver<GrpcCompensateCommand> responseObserver) {
this.responseObserver = responseObserver;
- connected.add("Connected " + request.getServiceName());
+ return new StreamObserver<GrpcServiceConfig>() {
+
+ @Override
+ public void onNext(GrpcServiceConfig grpcServiceConfig) {
+ connected.add("Connected " + grpcServiceConfig.getServiceName());
+ responseObserver.onNext(GrpcCompensateCommand.newBuilder().setConnectedResponse(true).build());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Do nothing here
+ }
+ };
}
@Override