SCB-1577 Refactoring RPC connections
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
index 210dbd9..6cc6dca 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
@@ -22,6 +22,7 @@
 
 import static java.util.Collections.emptyMap;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,9 +38,12 @@
 import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
 
 import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
@@ -56,10 +60,29 @@
   }
 
   @Override
-  public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
-    omegaCallbacks
-        .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
-        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+  public StreamObserver<GrpcServiceConfig> onConnected(StreamObserver<GrpcCompensateCommand> responseObserver) {
+    return new StreamObserver<GrpcServiceConfig>() {
+      @Override
+      public void onNext(GrpcServiceConfig grpcServiceConfig) {
+        omegaCallbacks
+            .computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
+            .put(grpcServiceConfig.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+        // Respond to Omega connection request
+        responseObserver.onNext(GrpcCompensateCommand.newBuilder()
+            .setConnectedResponse(true)
+            .build());
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        LOG.error(throwable.getMessage());
+      }
+
+      @Override
+      public void onCompleted() {
+        // Do nothing here
+      }
+    };
   }
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 6c23ace..6c80fd0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -54,11 +54,32 @@
   }
 
   @Override
-  public void onConnected(
-      GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
-    omegaCallbacks
-        .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
-        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+  public StreamObserver<GrpcServiceConfig> onConnected(StreamObserver<GrpcCompensateCommand> responseObserver) {
+    return new StreamObserver<GrpcServiceConfig>(){
+
+      private GrpcOmegaCallback grpcOmegaCallback;
+
+      @Override
+      public void onNext(GrpcServiceConfig grpcServiceConfig) {
+        grpcOmegaCallback = new GrpcOmegaCallback(responseObserver);
+        omegaCallbacks
+            .computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
+            .put(grpcServiceConfig.getInstanceId(), grpcOmegaCallback);
+        responseObserver.onNext(GrpcCompensateCommand.newBuilder()
+            .setConnectedResponse(true)
+            .build());
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        LOG.error(throwable.getMessage());
+      }
+
+      @Override
+      public void onCompleted() {
+        // Do nothing here
+      }
+    };
   }
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
index eef73f8..23b41f7 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
@@ -140,7 +140,7 @@
   }
 
   @AfterClass
-  public static void tearDown() throws Exception {
+  public static void tearDown() {
     clientChannel.shutdown();
     clientChannel = null;
   }
@@ -154,7 +154,7 @@
   }
 
   @After
-  public void after() throws Exception {
+  public void after() {
     blockingStub.onDisconnected(serviceConfig);
     deleteAllTillSuccessful();
   }
@@ -180,7 +180,7 @@
 
   @Test
   public void persistsEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -201,7 +201,7 @@
 
   @Test
   public void closeStreamOnDisconnected() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
@@ -219,12 +219,13 @@
 
   @Test
   public void closeStreamOfDisconnectedClientOnly() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
     CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+    TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
+    otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
 
@@ -240,8 +241,8 @@
   }
 
   @Test
-  public void removeCallbackOnClientDown() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void removeCallbackOnClientDown() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -253,8 +254,8 @@
   }
 
   @Test
-  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
@@ -272,7 +273,7 @@
   @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
@@ -295,7 +296,7 @@
 
   @Test
   public void getCompensateCommandOnFailure() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
     await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -313,13 +314,14 @@
 
   @Test
   public void compensateOnlyFailedGlobalTransaction() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
+    TxEventServiceStub otherAsyncStub =TxEventServiceGrpc.newStub(clientChannel);
+    otherAsyncStub.onConnected(new CompensationStreamObserver()).onNext(anotherServiceConfig);
 
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -337,7 +339,7 @@
 
   @Test
   public void doNotStartSubTxOnFailure() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
 
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
@@ -356,7 +358,7 @@
 
   @Test
   public void compensateOnlyCompletedTransactions() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -378,8 +380,8 @@
   }
 
   @Test
-  public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void sagaEndedEventIsAlwaysInTheEnd() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -397,7 +399,7 @@
 
   @Test
   public void abortTimeoutSagaStartedEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
 
     await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
@@ -420,7 +422,7 @@
 
   @Test
   public void abortTimeoutTxStartedEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
 
@@ -455,8 +457,8 @@
   }
 
   @Test
-  public void doNotCompensateRetryingEvents() throws InterruptedException {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void doNotCompensateRetryingEvents() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
@@ -584,9 +586,12 @@
     @Override
     public void onNext(GrpcCompensateCommand command) {
       // intercept received command
-      consumer.accept(command);
-      receivedCommands.add(command);
-      receivedCommandsCounter.incrementAndGet();
+      if(!command.getConnectedResponse()){
+        // ignore the connection response
+        consumer.accept(command);
+        receivedCommands.add(command);
+        receivedCommandsCounter.incrementAndGet();
+      }
     }
 
     @Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index 899a4fb..d3556db 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -164,7 +164,7 @@
 
   @Test
   public void persistsEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -185,7 +185,7 @@
 
   @Test
   public void closeStreamOnDisconnected() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
@@ -203,12 +203,13 @@
 
   @Test
   public void closeStreamOfDisconnectedClientOnly() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
     CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+    TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
+    otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
 
@@ -224,8 +225,8 @@
   }
 
   @Test
-  public void removeCallbackOnClientDown() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void removeCallbackOnClientDown() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -237,8 +238,8 @@
   }
 
   @Test
-  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
@@ -256,7 +257,7 @@
   @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
@@ -267,7 +268,7 @@
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
+    await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
 
     assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -279,7 +280,7 @@
 
   @Test
   public void getCompensateCommandOnFailure() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
     await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
@@ -297,13 +298,14 @@
 
   @Test
   public void compensateOnlyFailedGlobalTransaction() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
+    TxEventServiceStub otherAsyncStub =TxEventServiceGrpc.newStub(clientChannel);
+    otherAsyncStub.onConnected(new CompensationStreamObserver()).onNext(anotherServiceConfig);
 
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -321,14 +323,14 @@
 
   @Test
   public void doNotStartSubTxOnFailure() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
 
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
+    await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
@@ -340,7 +342,7 @@
 
   @Test
   public void compensateOnlyCompletedTransactions() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -362,8 +364,8 @@
   }
 
   @Test
-  public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void sagaEndedEventIsAlwaysInTheEnd() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
@@ -381,7 +383,7 @@
 
   @Test
   public void abortTimeoutSagaStartedEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
 
     await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
@@ -404,7 +406,7 @@
 
   @Test
   public void abortTimeoutTxStartedEvent() {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
 
@@ -437,8 +439,8 @@
   }
 
   @Test
-  public void doNotCompensateRetryingEvents() throws InterruptedException {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+  public void doNotCompensateRetryingEvents() {
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
@@ -566,9 +568,12 @@
     @Override
     public void onNext(GrpcCompensateCommand command) {
       // intercept received command
-      consumer.accept(command);
-      receivedCommands.add(command);
-      receivedCommandsCounter.incrementAndGet();
+      if(!command.getConnectedResponse()){
+        // ignore the connection response
+        consumer.accept(command);
+        receivedCommands.add(command);
+        receivedCommandsCounter.incrementAndGet();
+      }
     }
 
     @Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
index 68d8501..90bea6a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
@@ -64,7 +64,7 @@
         .setServiceName(serviceName)
         .setInstanceId(instanceId)
         .build();
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     omegaEventSagaSimulator = OmegaEventSagaSimulator.builder().serviceName(serviceName).instanceId(instanceId).build();
 
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
index 9fb1270..68d1592 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
@@ -20,6 +20,7 @@
 import io.grpc.stub.StreamObserver;
 import java.lang.invoke.MethodHandles;
 
+import java.util.concurrent.CountDownLatch;
 import org.apache.servicecomb.pack.omega.transaction.MessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,8 @@
 
   private final MessageSender messageSender;
 
+  private final CountDownLatch latch = new CountDownLatch(1);
+
   public ReconnectStreamObserver(
       LoadBalanceContext loadContext, MessageSender messageSender) {
     this.loadContext = loadContext;
@@ -42,10 +45,23 @@
   public void onError(Throwable t) {
     LOG.error("Failed to process grpc coordinate command.", t);
     loadContext.getGrpcOnErrorHandler().handle(messageSender);
+    cancelWait();
   }
 
   @Override
   public void onCompleted() {
     // Do nothing here
   }
+
+  public void cancelWait(){
+    latch.countDown();
+  }
+
+  public void waitConnected() {
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
index 0cf9f42..373250d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
@@ -49,11 +49,16 @@
     LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
         command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
 
-    messageHandler.onReceive(
-        command.getGlobalTxId(),
-        command.getLocalTxId(),
-        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensationMethod(),
-        deserializer.deserialize(command.getPayloads().toByteArray()));
+    if(command.getConnectedResponse()){
+      // Wait until you receive an Alpha connection response
+      cancelWait();
+    }else{
+      messageHandler.onReceive(
+          command.getGlobalTxId(),
+          command.getLocalTxId(),
+          command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+          command.getCompensationMethod(),
+          deserializer.deserialize(command.getPayloads().toByteArray()));
+    }
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index 2e81ecc..0ec9619 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -19,6 +19,8 @@
 
 import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
 import org.apache.servicecomb.pack.omega.connector.grpc.core.LoadBalanceContext;
 import org.apache.servicecomb.pack.omega.context.ServiceConfig;
@@ -35,8 +37,11 @@
 import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
 import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GrpcSagaClientMessageSender implements SagaMessageSender {
+
   private final String target;
 
   private final TxEventServiceStub asyncEventService;
@@ -68,7 +73,8 @@
 
   @Override
   public void onConnected() {
-    asyncEventService.onConnected(serviceConfig, compensateStreamObserver);
+    asyncEventService.onConnected(compensateStreamObserver).onNext(serviceConfig);
+    compensateStreamObserver.waitConnected();
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
index 6873661..2f5813d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalanceSenderWithTLSTest.java
@@ -131,7 +131,7 @@
   }
 
   @Test
-  public void broadcastConnectionAndDisconnection() throws Exception {
+  public void broadcastConnectionAndDisconnection() {
     messageSender.onConnected();
     await().atMost(1, SECONDS).until( new Callable<Boolean>() {
       @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index f29a203..3c00030 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -108,7 +108,7 @@
   protected final SagaLoadBalanceSender messageSender = newMessageSender(addresses);
 
   @AfterClass
-  public static void tearDown() throws Exception {
+  public static void tearDown() {
     for(Server server: servers.values()) {
       server.shutdown();
     }
@@ -117,7 +117,7 @@
   protected abstract SagaLoadBalanceSender newMessageSender(String[] addresses);
 
   @After
-  public void after() throws Exception {
+  public void after() {
     messageSender.onDisconnected();
     messageSender.close();
     for (Queue<TxEvent> queue :eventsMap.values()) {
@@ -142,9 +142,26 @@
     }
 
     @Override
-    public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+    public StreamObserver<GrpcServiceConfig> onConnected(final StreamObserver<GrpcCompensateCommand> responseObserver) {
       this.responseObserver = responseObserver;
-      connected.add("Connected " + request.getServiceName());
+      return new StreamObserver<GrpcServiceConfig>() {
+
+        @Override
+        public void onNext(GrpcServiceConfig grpcServiceConfig) {
+          connected.add("Connected " + grpcServiceConfig.getServiceName());
+          responseObserver.onNext(GrpcCompensateCommand.newBuilder().setConnectedResponse(true).build());
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          throw new RuntimeException(throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+          // Do nothing here
+        }
+      };
     }
 
     @Override