SCB-1577 Remove connection blocking wait
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
index e5977c2..e04a494 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
@@ -67,10 +67,6 @@
omegaCallbacks
.computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
.put(grpcServiceConfig.getInstanceId(), new GrpcOmegaCallback(responseObserver));
- // Respond to Omega connection request
- responseObserver.onNext(GrpcCompensateCommand.newBuilder()
- .setConnectedResponse(true)
- .build());
}
@Override
@@ -80,7 +76,6 @@
@Override
public void onCompleted() {
- // Do nothing here
LOG.info("Omega client called method onCompleted of GrpcServiceConfig");
}
};
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 6c80fd0..540a8d0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -65,9 +65,6 @@
omegaCallbacks
.computeIfAbsent(grpcServiceConfig.getServiceName(), key -> new ConcurrentHashMap<>())
.put(grpcServiceConfig.getInstanceId(), grpcOmegaCallback);
- responseObserver.onNext(GrpcCompensateCommand.newBuilder()
- .setConnectedResponse(true)
- .build());
}
@Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
index 23b41f7..3717e0a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
@@ -586,12 +586,9 @@
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
- if(!command.getConnectedResponse()){
- // ignore the connection response
- consumer.accept(command);
- receivedCommands.add(command);
- receivedCommandsCounter.incrementAndGet();
- }
+ consumer.accept(command);
+ receivedCommands.add(command);
+ receivedCommandsCounter.incrementAndGet();
}
@Override
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index d3556db..13cfbf5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -568,12 +568,9 @@
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
- if(!command.getConnectedResponse()){
- // ignore the connection response
- consumer.accept(command);
- receivedCommands.add(command);
- receivedCommandsCounter.incrementAndGet();
- }
+ consumer.accept(command);
+ receivedCommands.add(command);
+ receivedCommandsCounter.incrementAndGet();
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
index 68d1592..8828dd6 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java
@@ -33,8 +33,6 @@
private final MessageSender messageSender;
- private final CountDownLatch latch = new CountDownLatch(1);
-
public ReconnectStreamObserver(
LoadBalanceContext loadContext, MessageSender messageSender) {
this.loadContext = loadContext;
@@ -45,23 +43,10 @@
public void onError(Throwable t) {
LOG.error("Failed to process grpc coordinate command.", t);
loadContext.getGrpcOnErrorHandler().handle(messageSender);
- cancelWait();
}
@Override
public void onCompleted() {
// Do nothing here
}
-
- public void cancelWait(){
- latch.countDown();
- }
-
- public void waitConnected() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
index 373250d..0cf9f42 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcCompensateStreamObserver.java
@@ -49,16 +49,11 @@
LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
- if(command.getConnectedResponse()){
- // Wait until you receive an Alpha connection response
- cancelWait();
- }else{
- messageHandler.onReceive(
- command.getGlobalTxId(),
- command.getLocalTxId(),
- command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
- command.getCompensationMethod(),
- deserializer.deserialize(command.getPayloads().toByteArray()));
- }
+ messageHandler.onReceive(
+ command.getGlobalTxId(),
+ command.getLocalTxId(),
+ command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+ command.getCompensationMethod(),
+ deserializer.deserialize(command.getPayloads().toByteArray()));
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index 0ec9619..37996a0 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -74,7 +74,6 @@
@Override
public void onConnected() {
asyncEventService.onConnected(compensateStreamObserver).onNext(serviceConfig);
- compensateStreamObserver.waitConnected();
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index 3c00030..2f19080 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -149,7 +149,6 @@
@Override
public void onNext(GrpcServiceConfig grpcServiceConfig) {
connected.add("Connected " + grpcServiceConfig.getServiceName());
- responseObserver.onNext(GrpcCompensateCommand.newBuilder().setConnectedResponse(true).build());
}
@Override
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 5049e46..dd3a73c 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -53,6 +53,5 @@
string parentTxId = 3;
string compensationMethod = 4;
bytes payloads = 5;
- bool connectedResponse = 6;
}