SCB-1735 Add compensation timeout case for Integration Test
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 7dd2600..15c1a0e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -184,6 +184,33 @@
   }
 
   @Test
+  public void middleTxAbortedEventAndCompensationTimeoutTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    omegaEventSender.getOmegaEventSagaSimulator()
+        .middleTxAbortedEventAndCompensationTimeoutEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach(event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(6, SECONDS).until(() -> {
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+          .getLastSagaData();
+      return sagaData != null && sagaData.isTerminated()
+          && sagaData.getLastState() == SagaActorState.SUSPENDED;
+    });
+    SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+        .getLastSagaData();
+    assertEquals(sagaData.getLastState(), SagaActorState.SUSPENDED);
+    assertEquals(sagaData.getTxEntities().size(), 2);
+    assertNotNull(sagaData.getBeginTime());
+    assertNotNull(sagaData.getEndTime());
+    assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_FAILED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
+  }
+
+  @Test
   public void lastTxAbortedEventTest() {
     omegaEventSender.onConnected();
     final String globalTxId = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 34348b5..13eb3f1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -61,6 +61,19 @@
     return sagaEvents;
   }
 
+  public List<GrpcTxEvent> middleTxAbortedEventAndCompensationTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2){
+    final int localTxId_1_ReverseTimeoutSecond = 2;
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b"));
+    sagaEvents.add(txCompensateAckTimeoutEvent(globalTxId, localTxId_2, globalTxId));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
   public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -219,6 +232,13 @@
         0, 0, 0, 0);
   }
 
+  private GrpcTxEvent txStartedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout) {
+    return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0, 0, reverseRetries, reverseTimeout);
+  }
+
   private GrpcTxEvent txEndedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
@@ -247,6 +267,13 @@
         0, 0, 0, 0);
   }
 
+  public GrpcTxEvent txCompensateAckTimeoutEvent(String globalTxId,
+      String localTxId, String parentTxId) {
+    return eventOf(EventType.CompensateAckTimeoutEvent, globalTxId, localTxId,
+        parentTxId, new byte[0], "", 0, "",
+        0, 0, 0, 0);
+  }
+
   private GrpcTxEvent eventOf(EventType eventType,
       String globalTxId,
       String localTxId,
diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index e585d0c..18179e3 100644
--- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -28,5 +28,6 @@
   SagaTimeoutEvent,
   TxCompensateEvent,
   TxCompensateAckFailedEvent,
-  TxCompensateAckSucceedEvent
+  TxCompensateAckSucceedEvent,
+  CompensateAckTimeoutEvent
 }