SCB-1735 Catch the compensation method timeout exception and send a CompensateAckTimeoutEvent event and record
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java
new file mode 100644
index 0000000..d21503b
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/CompensateAckTimeoutEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.core.fsm.event.internal;
+
+import java.util.Date;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
+
+public class CompensateAckTimeoutEvent extends TxEvent {
+
+ private byte[] payloads;
+
+ public byte[] getPayloads() {
+ return payloads;
+ }
+
+ public void setPayloads(byte[] payloads) {
+ this.payloads = payloads;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private CompensateAckTimeoutEvent txCompensatedEvent;
+
+ private Builder() {
+ txCompensatedEvent = new CompensateAckTimeoutEvent();
+ }
+
+ public Builder serviceName(String serviceName) {
+ txCompensatedEvent.setServiceName(serviceName);
+ return this;
+ }
+
+ public Builder instanceId(String instanceId) {
+ txCompensatedEvent.setInstanceId(instanceId);
+ return this;
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txCompensatedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txCompensatedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txCompensatedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public Builder createTime(Date createTime){
+ txCompensatedEvent.setCreateTime(createTime);
+ return this;
+ }
+
+ public Builder payloads(byte[] payloads){
+ txCompensatedEvent.setPayloads(payloads);
+ return this;
+ }
+
+ public CompensateAckTimeoutEvent build() {
+ return txCompensatedEvent;
+ }
+ }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index a7c74e2..5e8aee8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -21,17 +21,21 @@
import akka.actor.Props;
import akka.cluster.sharding.ShardRegion;
import akka.persistence.fsm.AbstractPersistentFSM;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
@@ -55,7 +59,7 @@
public class SagaActor extends
AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
-
+ protected static final int PAYLOADS_MAX_LENGTH = 10240;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String persistenceId;
private long sagaBeginTime;
@@ -245,6 +249,13 @@
self().tell(ComponsitedCheckEvent.builder().build(), self());
}));
}
+ ).event(CompensateAckTimeoutEvent.class, SagaData.class,
+ (event, data) -> {
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+ return stay().applying(domainEvent).andThen(exec(_data -> {
+ self().tell(ComponsitedCheckEvent.builder().build(), self());
+ }));
+ }
).event(ComponsitedCheckEvent.class, SagaData.class,
(event, data) -> {
if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
@@ -485,15 +496,16 @@
compensation(v, data);
}
});
- } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED || domainEvent.getState() == TxState.COMPENSATED_FAILED) {
+ } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
// decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(domainEvent.getState());
- if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
- LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
- } else {
- LOG.info("compensation is failed {}", txEntity.getLocalTxId());
- }
+ LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+ } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
+ data.getCompensationRunningCounter().decrementAndGet();
+ txEntity.setState(domainEvent.getState());
+ txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
+ LOG.info("compensation is failed {}", txEntity.getLocalTxId());
}
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
@@ -562,6 +574,24 @@
}
compensation(txEntity, data);
} catch (Exception ex) {
+ if (ex instanceof TimeoutException) {
+ StringWriter writer = new StringWriter();
+ ex.printStackTrace(new PrintWriter(writer));
+ String stackTrace = writer.toString();
+ if (stackTrace.length() > PAYLOADS_MAX_LENGTH) {
+ stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH);
+ }
+ CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
+ .createTime(new Date(System.currentTimeMillis()))
+ .globalTxId(txEntity.getGlobalTxId())
+ .parentTxId(txEntity.getParentTxId())
+ .localTxId(txEntity.getLocalTxId())
+ .serviceName(txEntity.getServiceName())
+ .instanceId(txEntity.getInstanceId())
+ .payloads(stackTrace.getBytes())
+ .build();
+ self().tell(event, self());
+ }
LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
if (txEntity.getReverseRetries() > 0) {
// which means the retry number
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 79d5dc0..24877f7 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -21,6 +21,7 @@
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
@@ -60,6 +61,14 @@
this.state = TxState.COMPENSATED_FAILED;
}
+ public UpdateTxEventDomain(CompensateAckTimeoutEvent event) {
+ this.event = event;
+ this.parentTxId = event.getParentTxId();
+ this.localTxId = event.getLocalTxId();
+ this.throwablePayLoads = event.getPayloads();
+ this.state = TxState.COMPENSATED_FAILED;
+ }
+
public String getParentTxId() {
return parentTxId;
}