blob: 017ee0e9baf4e2ad58a8b47076063074888e36a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.pack.alpha.fsm;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.cluster.sharding.ShardRegion;
import akka.persistence.fsm.AbstractPersistentFSM;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension;
import org.apache.servicecomb.pack.common.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
public class SagaActor extends
AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String persistenceId;
private long sagaBeginTime;
private long sagaEndTime;
public static Props props(String persistenceId) {
return Props.create(SagaActor.class, persistenceId);
}
public SagaActor(String persistenceId) {
if (persistenceId != null) {
this.persistenceId = persistenceId;
} else {
this.persistenceId = getSelf().path().name();
}
startWith(SagaActorState.IDLE, SagaData.builder().build());
when(SagaActorState.IDLE,
matchEvent(SagaStartedEvent.class,
(event, data) -> {
sagaBeginTime = System.currentTimeMillis();
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
SagaStartedDomain domainEvent = new SagaStartedDomain(event);
if (event.getTimeout() > 0) {
data.setTimeout(event.getTimeout());
return goTo(SagaActorState.READY)
.applying(domainEvent)
.forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));
} else {
return goTo(SagaActorState.READY)
.applying(domainEvent);
}
}
)
);
when(SagaActorState.READY,
matchEvent(TxStartedEvent.class, SagaData.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent);
}
}
).event(SagaEndedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
})
);
when(SagaActorState.PARTIALLY_ACTIVE,
matchEvent(TxEndedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
.applying(domainEvent);
}
}
).event(TxStartedEvent.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED,
SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED)
.applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
})
);
when(SagaActorState.PARTIALLY_COMMITTED,
matchEvent(TxStartedEvent.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent);
}
}
).event(TxEndedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaEndedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
return goTo(SagaActorState.COMMITTED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
})
);
when(SagaActorState.FAILED,
matchEvent(SagaTimeoutEvent.class, SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(TxCompensateAckSucceedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder().build(), self());
}));
}
).event(TxCompensateAckFailedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder().build(), self());
}));
}
).event(CompensateAckTimeoutEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder().build(), self());
}));
}
).event(ComponsitedCheckEvent.class, SagaData.class,
(event, data) -> {
if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
return stay();
} else {
if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
} else {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED).applying(domainEvent);
}
}
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
data.setTerminated(true);
if (data.getTxEntities().hasCommittedTx()) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return stay()
.applying(domainEvent);
} else if (data.getTxEntities().hasCompensationSentTx()) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return stay()
.applying(domainEvent);
} else {
SagaEndedDomain domainEvent = new SagaEndedDomain(event,
SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED)
.applying(domainEvent);
}
}
).event(TxStartedEvent.class, SagaData.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
return stay().applying(domainEvent);
}
).event(TxEndedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
// call compensate
compensation(txEntity, _data);
}));
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
})
);
when(SagaActorState.COMMITTED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
when(SagaActorState.SUSPENDED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
when(SagaActorState.COMPENSATED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
whenUnhandled(
matchAnyEvent((event, data) -> {
if (event instanceof BaseEvent){
LOG.error("Unhandled event {}", event);
}
return stay();
})
);
onTransition(
matchState(null, null, (from, to) -> {
if (stateData().getGlobalTxId() != null) {
stateData().setLastState(to);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.putSagaData(stateData().getGlobalTxId(), stateData());
}
if (LOG.isDebugEnabled()) {
LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), from, to);
}
if (to == SagaActorState.COMMITTED ||
to == SagaActorState.SUSPENDED ||
to == SagaActorState.COMPENSATED) {
self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self());
}
})
);
onTermination(
matchStop(
Normal(), (state, data) -> {
LOG.info("stopped [{}] {}", data.getGlobalTxId(), state);
}
)
);
}
private void beforeStop(BaseEvent event, SagaActorState state, SagaData data){
if (LOG.isDebugEnabled()) {
LOG.debug("stop [{}] {}", data.getGlobalTxId(), state);
}
try{
sagaEndTime = System.currentTimeMillis();
data.setLastState(state);
data.setEndTime(new Date());
data.setTerminated(true);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.stopSagaData(data.getGlobalTxId(), data);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system())
.doSagaAvgTime(sagaEndTime - sagaBeginTime);
// destroy self from cluster shard region
getContext().getParent()
.tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
// clear self mailbox from persistence
// 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
// 以下基于 journal-redis 说明:
// 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
// journal:persistenceIds
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
//
// 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
// 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
// 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
// 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
//
// 何如清理:
// 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
// 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
// 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
// 并删除 journal:persisted:item:highestSequenceNr
//
// 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
//
// Lua script akka-persistence-redis-clean.lua
// local ids = redis.call('smembers','journal:persistenceIds');
// local delkeys = {};
// for k, v in pairs(ids) do
// local jpid = 'journal:persisted:' .. v;
// local jpidnr = 'journal:persisted:' .. v .. ':highestSequenceNr';
// local hasjpid = redis.call('exists',jpid);
// if(hasjpid == 0)
// then
// local hasjpidnr = redis.call('exists',jpidnr);
// if(hasjpidnr == 1)
// then
// redis.call('del',jpidnr);
// table.insert(delkeys,jpid);
// end
// end
// end
// return delkeys;
deleteMessages(lastSequenceNr());
deleteSnapshot(snapshotSequenceNr());
}catch(Exception e){
LOG.error("stop [{}] fail",data.getGlobalTxId());
throw e;
}
}
@Override
public SagaData applyEvent(DomainEvent event, SagaData data) {
try{
if (this.recoveryRunning()) {
LOG.info("recovery {}",event.getEvent());
}else if (LOG.isDebugEnabled()) {
LOG.debug("persistence {}", event.getEvent());
}
if (event.getEvent() != null && !(event
.getEvent() instanceof ComponsitedCheckEvent)) {
data.logEvent(event.getEvent());
}
if (event instanceof SagaStartedDomain) {
SagaStartedDomain domainEvent = (SagaStartedDomain) event;
data.setServiceName(domainEvent.getEvent().getServiceName());
data.setInstanceId(domainEvent.getEvent().getInstanceId());
data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
data.setBeginTime(domainEvent.getEvent().getCreateTime());
data.setExpirationTime(domainEvent.getExpirationTime());
} else if (event instanceof AddTxEventDomain) {
AddTxEventDomain domainEvent = (AddTxEventDomain) event;
if (!data.getTxEntities().exists(domainEvent.getEvent().getLocalTxId())) {
TxEntity txEntity = TxEntity.builder()
.serviceName(domainEvent.getEvent().getServiceName())
.instanceId(domainEvent.getEvent().getInstanceId())
.globalTxId(domainEvent.getEvent().getGlobalTxId())
.localTxId(domainEvent.getEvent().getLocalTxId())
.parentTxId(domainEvent.getEvent().getParentTxId())
.compensationMethod(domainEvent.getCompensationMethod())
.payloads(domainEvent.getPayloads())
.state(domainEvent.getState())
.reverseRetries(domainEvent.getReverseRetries())
.reverseTimeout(domainEvent.getReverseTimeout())
.retryDelayInMilliseconds(domainEvent.getRetryDelayInMilliseconds())
.beginTime(domainEvent.getEvent().getCreateTime())
.build();
data.getTxEntities().put(txEntity.getLocalTxId(), txEntity);
} else {
LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
}
} else if (event instanceof UpdateTxEventDomain) {
UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
TxEntity txEntity = data.getTxEntities().get(domainEvent.getLocalTxId());
txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
if (domainEvent.getState() == TxState.COMMITTED) {
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
txEntity.setState(domainEvent.getState());
txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
compensation(v, data);
}
});
} else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
// decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(domainEvent.getState());
LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
} else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(domainEvent.getState());
txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
LOG.info("compensation is failed {}", txEntity.getLocalTxId());
}
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
if (domainEvent.getState() == SagaActorState.FAILED) {
data.setTerminated(true);
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
compensation(v, data);
}
});
} else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
data.setSuspendedType(domainEvent.getSuspendedType());
} else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
} else if (domainEvent.getState() == SagaActorState.COMMITTED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
}
}
}catch (Exception ex){
LOG.error("apply {}", event.getEvent(), ex);
LOG.error(ex.getMessage(), ex);
beforeStop(event.getEvent(), SagaActorState.SUSPENDED, data);
stop();
//TODO 增加 SagaActor 处理失败指标
}
return data;
}
@Override
public void onRecoveryCompleted() {
if(stateName() != SagaActorState.IDLE){
LOG.info("recovery completed [{}] state={}", stateData().getGlobalTxId(), stateName());
}
}
@Override
public Class domainEventClass() {
return DomainEvent.class;
}
@Override
public String persistenceId() {
return persistenceId;
}
//call omega compensate method
private void compensation(TxEntity txEntity, SagaData data) {
// increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
txEntity.setState(TxState.COMPENSATION_SENT);
try {
SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
LOG.info("compensate {} {} {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getLocalTxId());
} catch (AlphaException ex) {
LOG.error(ex.getMessage(), ex);
try {
Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
compensation(txEntity, data);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
StringWriter writer = new StringWriter();
ex.printStackTrace(new PrintWriter(writer));
String stackTrace = writer.toString();
if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
}
CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
.createTime(new Date(System.currentTimeMillis()))
.globalTxId(txEntity.getGlobalTxId())
.parentTxId(txEntity.getParentTxId())
.localTxId(txEntity.getLocalTxId())
.serviceName(txEntity.getServiceName())
.instanceId(txEntity.getInstanceId())
.payloads(stackTrace.getBytes())
.build();
self().tell(event, self());
}
LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
if (txEntity.getReverseRetries() > 0) {
// which means the retry number
if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
txEntity.getRetryDelayInMilliseconds());
try {
Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
compensation(txEntity, data);
} else {
data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
}
} else {
data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
}
}
}
}