Merge pull request #736 from apache/SCB-2394

SCB-2394 Try to fix the github action random test errors
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index f6a026b..5e05e2d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -117,6 +117,9 @@
   @Autowired
   private TxConsistentService consistentService;
 
+  // Setup the most wait time for checking the result,
+  // the number is used to fix random test errors in a slow box
+  private static final Integer waitTime = 4;
   private static final AtomicInteger receivedCommandsCounter = new AtomicInteger();
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
@@ -167,7 +170,7 @@
     asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
-    await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
 
     assertThat(receivedCommands.isEmpty(), is(true));
 
@@ -187,7 +190,7 @@
   public void closeStreamOnDisconnected() {
     asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
 
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     assertThat(
         omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
@@ -198,20 +201,20 @@
         omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
         is(false));
 
-    await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
+    await().atMost(waitTime, SECONDS).until(compensateResponseObserver::isCompleted);
   }
 
   @Test
   public void closeStreamOfDisconnectedClientOnly() {
     asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
     CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
     TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
     otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
 
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+    await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
 
     blockingStub.onDisconnected(serviceConfig);
 
@@ -234,7 +237,7 @@
 
     consistentService.handle(someTxAbortEvent(serviceName, instanceId));
 
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
   }
 
   @Test
@@ -244,7 +247,7 @@
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     GrpcCompensateCommand command = receivedCommands.poll();
     assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -268,7 +271,7 @@
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
+    await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
 
     assertThat(receivedCommands, containsInAnyOrder(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -283,10 +286,10 @@
     asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
-    await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     GrpcCompensateCommand command = receivedCommands.poll();
     assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -310,10 +313,10 @@
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+    await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommandsCounter.get(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -330,7 +333,7 @@
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
-    await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
+    await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
@@ -354,10 +357,10 @@
     String anotherLocalTxId2 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
+    await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 7);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommandsCounter.get(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -375,7 +378,7 @@
 
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
 
-    await().atMost(1, SECONDS).until(() -> {
+    await().atMost(waitTime, SECONDS).until(() -> {
       List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
       return events.size() == 8 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
     });
@@ -386,14 +389,14 @@
     asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
 
-    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
+    await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
     assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
 
-    await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+    await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
 
     assertThat(timeoutEntityRepository.count(), is(1L));
     Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -410,7 +413,7 @@
     blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
 
-    await().atMost(2, SECONDS).until(() -> {
+    await().atMost(waitTime, SECONDS).until(() -> {
       List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
       return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
     });
@@ -427,7 +430,7 @@
       assertThat(events.get(4).type(), is(TxCompensatedEvent.name()));
     }
 
-    await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+    await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
 
     assertThat(timeoutEntityRepository.count(), is(1L));
     Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -446,7 +449,7 @@
     blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+    await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 4);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.size(), is(4));