SCB-1735 Add compensation timeout case for Integration Test
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 7dd2600..15c1a0e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -184,6 +184,33 @@
}
@Test
+ public void middleTxAbortedEventAndCompensationTimeoutTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ omegaEventSender.getOmegaEventSagaSimulator()
+ .middleTxAbortedEventAndCompensationTimeoutEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach(event -> {
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(6, SECONDS).until(() -> {
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+ .getLastSagaData();
+ return sagaData != null && sagaData.isTerminated()
+ && sagaData.getLastState() == SagaActorState.SUSPENDED;
+ });
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+ .getLastSagaData();
+ assertEquals(sagaData.getLastState(), SagaActorState.SUSPENDED);
+ assertEquals(sagaData.getTxEntities().size(), 2);
+ assertNotNull(sagaData.getBeginTime());
+ assertNotNull(sagaData.getEndTime());
+ assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_FAILED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
+ }
+
+ @Test
public void lastTxAbortedEventTest() {
omegaEventSender.onConnected();
final String globalTxId = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 34348b5..13eb3f1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -61,6 +61,19 @@
return sagaEvents;
}
+ public List<GrpcTxEvent> middleTxAbortedEventAndCompensationTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2){
+ final int localTxId_1_ReverseTimeoutSecond = 2;
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b"));
+ sagaEvents.add(txCompensateAckTimeoutEvent(globalTxId, localTxId_2, globalTxId));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
List<GrpcTxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -219,6 +232,13 @@
0, 0, 0, 0);
}
+ private GrpcTxEvent txStartedEvent(String globalTxId,
+ String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout) {
+ return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
+ parentTxId, payloads, compensationMethod, 0, "",
+ 0, 0, reverseRetries, reverseTimeout);
+ }
+
private GrpcTxEvent txEndedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
@@ -247,6 +267,13 @@
0, 0, 0, 0);
}
+ public GrpcTxEvent txCompensateAckTimeoutEvent(String globalTxId,
+ String localTxId, String parentTxId) {
+ return eventOf(EventType.CompensateAckTimeoutEvent, globalTxId, localTxId,
+ parentTxId, new byte[0], "", 0, "",
+ 0, 0, 0, 0);
+ }
+
private GrpcTxEvent eventOf(EventType eventType,
String globalTxId,
String localTxId,
diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index e585d0c..18179e3 100644
--- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -28,5 +28,6 @@
SagaTimeoutEvent,
TxCompensateEvent,
TxCompensateAckFailedEvent,
- TxCompensateAckSucceedEvent
+ TxCompensateAckSucceedEvent,
+ CompensateAckTimeoutEvent
}