Merge pull request #736 from apache/SCB-2394
SCB-2394 Try to fix the github action random test errors
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index f6a026b..5e05e2d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -117,6 +117,9 @@
@Autowired
private TxConsistentService consistentService;
+ // Setup the most wait time for checking the result,
+ // the number is used to fix random test errors in a slow box
+ private static final Integer waitTime = 4;
private static final AtomicInteger receivedCommandsCounter = new AtomicInteger();
private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
@@ -167,7 +170,7 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
- await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
assertThat(receivedCommands.isEmpty(), is(true));
@@ -187,7 +190,7 @@
public void closeStreamOnDisconnected() {
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
assertThat(
omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
@@ -198,20 +201,20 @@
omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
is(false));
- await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
+ await().atMost(waitTime, SECONDS).until(compensateResponseObserver::isCompleted);
}
@Test
public void closeStreamOfDisconnectedClientOnly() {
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
blockingStub.onDisconnected(serviceConfig);
@@ -234,7 +237,7 @@
consistentService.handle(someTxAbortEvent(serviceName, instanceId));
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
}
@Test
@@ -244,7 +247,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
GrpcCompensateCommand command = receivedCommands.poll();
assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -268,7 +271,7 @@
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
+ await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
assertThat(receivedCommands, containsInAnyOrder(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -283,10 +286,10 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
- await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
GrpcCompensateCommand command = receivedCommands.poll();
assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -310,10 +313,10 @@
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
assertThat(receivedCommandsCounter.get(), is(1));
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -330,7 +333,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
+ await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
String localTxId1 = UUID.randomUUID().toString();
String parentTxId1 = UUID.randomUUID().toString();
@@ -354,10 +357,10 @@
String anotherLocalTxId2 = UUID.randomUUID().toString();
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 7);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
assertThat(receivedCommandsCounter.get(), is(1));
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -375,7 +378,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
- await().atMost(1, SECONDS).until(() -> {
+ await().atMost(waitTime, SECONDS).until(() -> {
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
return events.size() == 8 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
});
@@ -386,14 +389,14 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
- await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
- await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+ await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
assertThat(timeoutEntityRepository.count(), is(1L));
Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -410,7 +413,7 @@
blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
- await().atMost(2, SECONDS).until(() -> {
+ await().atMost(waitTime, SECONDS).until(() -> {
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
});
@@ -427,7 +430,7 @@
assertThat(events.get(4).type(), is(TxCompensatedEvent.name()));
}
- await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+ await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
assertThat(timeoutEntityRepository.count(), is(1L));
Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -446,7 +449,7 @@
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 4);
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
assertThat(events.size(), is(4));