Merge pull request #737 from apache/SCB-2391
SCB-2391 Added more scenarios in TCC accept tests
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index f6a026b..5e05e2d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -117,6 +117,9 @@
@Autowired
private TxConsistentService consistentService;
+ // Setup the most wait time for checking the result,
+ // the number is used to fix random test errors in a slow box
+ private static final Integer waitTime = 4;
private static final AtomicInteger receivedCommandsCounter = new AtomicInteger();
private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
@@ -167,7 +170,7 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
- await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
assertThat(receivedCommands.isEmpty(), is(true));
@@ -187,7 +190,7 @@
public void closeStreamOnDisconnected() {
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
assertThat(
omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
@@ -198,20 +201,20 @@
omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
is(false));
- await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
+ await().atMost(waitTime, SECONDS).until(compensateResponseObserver::isCompleted);
}
@Test
public void closeStreamOfDisconnectedClientOnly() {
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
TxEventServiceStub otherAsyncStub = TxEventServiceGrpc.newStub(clientChannel);
otherAsyncStub.onConnected(anotherResponseObserver).onNext(anotherServiceConfig);
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
blockingStub.onDisconnected(serviceConfig);
@@ -234,7 +237,7 @@
consistentService.handle(someTxAbortEvent(serviceName, instanceId));
- await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
}
@Test
@@ -244,7 +247,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
GrpcCompensateCommand command = receivedCommands.poll();
assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -268,7 +271,7 @@
blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(2, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
+ await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
assertThat(receivedCommands, containsInAnyOrder(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -283,10 +286,10 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
- await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
GrpcCompensateCommand command = receivedCommands.poll();
assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -310,10 +313,10 @@
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
assertThat(receivedCommandsCounter.get(), is(1));
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -330,7 +333,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
- await().atMost(1000, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
+ await().atMost(waitTime, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
String localTxId1 = UUID.randomUUID().toString();
String parentTxId1 = UUID.randomUUID().toString();
@@ -354,10 +357,10 @@
String anotherLocalTxId2 = UUID.randomUUID().toString();
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 7);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
- await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ await().atMost(waitTime, SECONDS).until(() -> !receivedCommands.isEmpty());
assertThat(receivedCommandsCounter.get(), is(1));
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -375,7 +378,7 @@
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
- await().atMost(1, SECONDS).until(() -> {
+ await().atMost(waitTime, SECONDS).until(() -> {
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
return events.size() == 8 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
});
@@ -386,14 +389,14 @@
asyncStub.onConnected(compensateResponseObserver).onNext(serviceConfig);
blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
- await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 3);
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
- await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+ await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
assertThat(timeoutEntityRepository.count(), is(1L));
Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -410,7 +413,7 @@
blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
- await().atMost(2, SECONDS).until(() -> {
+ await().atMost(waitTime, SECONDS).until(() -> {
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name());
});
@@ -427,7 +430,7 @@
assertThat(events.get(4).type(), is(TxCompensatedEvent.name()));
}
- await().atMost(2, SECONDS).until(this::waitTillTimeoutDone);
+ await().atMost(waitTime, SECONDS).until(this::waitTillTimeoutDone);
assertThat(timeoutEntityRepository.count(), is(1L));
Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
@@ -446,7 +449,7 @@
blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+ await().atMost(waitTime, SECONDS).until(() -> eventRepo.count() == 4);
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
assertThat(events.size(), is(4));
diff --git a/pom.xml b/pom.xml
index 61c62f6..b64423c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,8 +45,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
- <!-- using log4j 2.7 with Spring Boot 1.x -->
- <log4j.version>2.7</log4j.version>
+ <log4j.version>2.17.1</log4j.version>
<disruptor.version>3.3.7</disruptor.version>
<dubbo.version>2.6.4</dubbo.version>
<spring.boot.version>2.1.6.RELEASE</spring.boot.version>