SCB-1696 Added property retryDelayInMilliseconds to TxStartedEvent and used as reverse compensation retry interval
diff --git a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
index ebb1856..1ab6c74 100644
--- a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
+++ b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
@@ -193,27 +193,27 @@
List<TxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(
new TxEvent(EventType.SagaStartedEvent, globalTxId, globalTxId, globalTxId, "", 0, null,
- 0, 0, 0, 0));
+ 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_1, globalTxId, "service a", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_1, globalTxId, "service a", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_2, globalTxId, "service b", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_2, globalTxId, "service b", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_3, globalTxId, "service c", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_3, globalTxId, "service c", 0,
- null, 0, 0, 0, 0));
+ null, 0, 0, 0, 0, 0));
sagaEvents.add(
- new TxEvent(EventType.SagaEndedEvent, globalTxId, globalTxId, globalTxId, "", 0, null, 0, 0, 0, 0));
+ new TxEvent(EventType.SagaEndedEvent, globalTxId, globalTxId, globalTxId, "", 0, null, 0, 0, 0, 0, 0));
return sagaEvents;
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
index b2f13ac..d1fc68d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
@@ -28,6 +28,7 @@
private int forwardTimeout;
private int reverseRetries;
private int reverseTimeout;
+ private int retryDelayInMilliseconds;
public String getCompensationMethod() {
return compensationMethod;
@@ -85,6 +86,14 @@
this.reverseTimeout = reverseTimeout;
}
+ public int getRetryDelayInMilliseconds() {
+ return retryDelayInMilliseconds;
+ }
+
+ public void setRetryDelayInMilliseconds(int retryDelayInMilliseconds) {
+ this.retryDelayInMilliseconds = retryDelayInMilliseconds;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -157,6 +166,11 @@
return this;
}
+ public Builder retryDelayInMilliseconds(int retryDelayInMilliseconds) {
+ txStartedEvent.setRetryDelayInMilliseconds(retryDelayInMilliseconds);
+ return this;
+ }
+
public Builder createTime(Date createTime){
txStartedEvent.setCreateTime(createTime);
return this;
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index a1a8d08..7c6eee6 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -538,27 +538,27 @@
} catch (AlphaException ex) {
LOG.error(ex.getMessage(), ex);
try {
- Thread.sleep(1000);
+ Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
compensation(txEntity, data);
} catch (Exception ex) {
LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
- if (txEntity.getRetries() > 0) {
+ if (txEntity.getReverseRetries() > 0) {
// which means the retry number
- if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()) {
+ if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
try {
- Thread.sleep(1000);
+ Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
compensation(txEntity, data);
}
- } else if (txEntity.getRetries() == -1) {
+ } else if (txEntity.getReverseRetries() == -1) {
// which means retry it until succeed
try {
- Thread.sleep(1000);
+ Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index 17b0d86..f7ea638 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -23,7 +23,9 @@
public class AddTxEventDomain implements DomainEvent {
private TxState state = TxState.ACTIVE;
- private int retries;
+ private int reverseRetries;
+ private int reverseTimeout;
+ private int retryDelayInMilliseconds;
private String compensationMethod;
private byte[] payloads;
private BaseEvent event;
@@ -32,7 +34,9 @@
this.event = event;
this.compensationMethod = event.getCompensationMethod();
this.payloads = event.getPayloads();
- this.retries = event.getForwardRetries();
+ this.reverseRetries = event.getReverseRetries();
+ this.reverseTimeout = event.getReverseTimeout();
+ this.retryDelayInMilliseconds = event.getRetryDelayInMilliseconds();
}
public TxState getState() {
@@ -47,12 +51,28 @@
this.compensationMethod = compensationMethod;
}
- public int getRetries() {
- return retries;
+ public int getReverseRetries() {
+ return reverseRetries;
}
- public void setRetries(int retries) {
- this.retries = retries;
+ public void setReverseRetries(int reverseRetries) {
+ this.reverseRetries = reverseRetries;
+ }
+
+ public int getReverseTimeout() {
+ return reverseTimeout;
+ }
+
+ public void setReverseTimeout(int reverseTimeout) {
+ this.reverseTimeout = reverseTimeout;
+ }
+
+ public int getRetryDelayInMilliseconds() {
+ return retryDelayInMilliseconds;
+ }
+
+ public void setRetryDelayInMilliseconds(int retryDelayInMilliseconds) {
+ this.retryDelayInMilliseconds = retryDelayInMilliseconds;
}
public byte[] getPayloads() {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index 5fd6430..8c2ea04 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -34,7 +34,9 @@
private String compensationMethod;
private byte[] payloads;
private byte[] throwablePayLoads;
- private int retries;
+ private int reverseRetries;
+ private int reverseTimeout;
+ private int retryDelayInMilliseconds = 5;
private AtomicInteger retriesCounter = new AtomicInteger();
public String getServiceName() {
@@ -125,12 +127,28 @@
this.throwablePayLoads = throwablePayLoads;
}
- public int getRetries() {
- return retries;
+ public int getReverseRetries() {
+ return reverseRetries;
}
- public void setRetries(int retries) {
- this.retries = retries;
+ public void setReverseRetries(int reverseRetries) {
+ this.reverseRetries = reverseRetries;
+ }
+
+ public int getReverseTimeout() {
+ return reverseTimeout;
+ }
+
+ public void setReverseTimeout(int reverseTimeout) {
+ this.reverseTimeout = reverseTimeout;
+ }
+
+ public int getRetryDelayInMilliseconds() {
+ return retryDelayInMilliseconds;
+ }
+
+ public void setRetryDelayInMilliseconds(int retryDelayInMilliseconds) {
+ this.retryDelayInMilliseconds = retryDelayInMilliseconds;
}
public AtomicInteger getRetriesCounter() {
@@ -204,8 +222,18 @@
return this;
}
- public Builder retries(int retries) {
- txEntity.setRetries(retries);
+ public Builder reverseRetries(int reverseRetries) {
+ txEntity.setReverseRetries(reverseRetries);
+ return this;
+ }
+
+ public Builder reverseTimeout(int reverseTimeout) {
+ txEntity.setReverseTimeout(reverseTimeout);
+ return this;
+ }
+
+ public Builder retryDelayInMilliseconds(int retryDelayInMilliseconds) {
+ txEntity.setRetryDelayInMilliseconds(retryDelayInMilliseconds);
return this;
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
index cffd839..69cbe08 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
@@ -45,7 +45,7 @@
private final String localTxId = uniquify("localTxId");
private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x",
- 0, null, 0, 0, 0, 0);
+ 0, null, 0, 0, 0, 0, 0);
@Test
public void sendEventWhenSenderIsAvailable() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
index d8bb453..02cef1a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
@@ -305,7 +305,7 @@
assertThat(messageSender.send(event).aborted(), is(false));
TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0,
- 0, 0, 0, "blah");
+ 0, 0, 0, 0, "blah");
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index 32a3493..06bbc0d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -74,7 +74,7 @@
protected final String compensationMethod = getClass().getCanonicalName();
protected final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
- compensationMethod, 0, "", 0, 0, 0, 0, "blah");
+ compensationMethod, 0, "", 0, 0, 0, 0, 0, "blah");
protected final String serviceName = uniquify("serviceName");
@@ -177,6 +177,7 @@
request.getForwardTimeout(),
request.getReverseRetries(),
request.getReverseTimeout(),
+ request.getRetryDelayInMilliseconds(),
new String(request.getPayloads().toByteArray())));
sleep();
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
index 39d2f56..16ed116 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
@@ -30,7 +30,6 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import akka.actor.AbstractLoggingActor;
@@ -139,7 +138,7 @@
assertArrayEquals(
new String[] {
new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, user).toString(),
+ 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -161,7 +160,7 @@
assertArrayEquals(
new String[] {
new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, illegalUser).toString(),
+ 0, 0, 0, 0, illegalUser).toString(),
new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -184,10 +183,10 @@
assertArrayEquals(
new String[] {
new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, user).toString(),
+ 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, anotherUser).toString(),
+ 0, 0, 0, 0, anotherUser).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -234,11 +233,11 @@
assertThat(messages.size(), is(3));
assertThat(messages.get(0),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, 0, 0, 0, user, 3)
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, 0, 0, 0, 0, user, 3)
.toString()));
assertThat(messages.get(1),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, 0, 0, 0, user, 3)
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, 0, 0, 0, 0, user, 3)
.toString()));
String abortedEvent2 = messages.get(2);
@@ -268,9 +267,9 @@
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -308,9 +307,9 @@
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -335,7 +334,7 @@
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -353,7 +352,7 @@
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index cc639c7..69483b6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -30,9 +30,9 @@
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
- int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... message) {
+ int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, int retryDelayInMilliseconds, Object... message) {
return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
- timeout, retriesMethod, forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, message));
+ timeout, retriesMethod, forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, retryDelayInMilliseconds, message));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
index 54f5a87..16f568e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
@@ -58,7 +58,7 @@
AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.forwardTimeout(),
retrySignature, forwardRetries, compensable.forwardTimeout(),
- compensable.reverseRetries(), compensable.reverseTimeout(), joinPoint.getArgs());
+ compensable.reverseRetries(), compensable.reverseTimeout(), compensable.retryDelayInMilliseconds(), joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
context.setLocalTxId(parentTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
index f077b38..97abc66 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
@@ -21,7 +21,7 @@
AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout,
String retriesMethod, int forwardRetries, int forwardTimeout, int reverseRetries,
- int reverseTimeout,
+ int reverseTimeout, int retryDelayInMilliseconds,
Object... message);
void postIntercept(String parentTxId, String compensationMethod);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
index 07e2eab..8964584 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
@@ -24,7 +24,7 @@
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout,
String retriesMethod, int forwardRetries, int forwardTimeout, int reverseRetries,
- int reverseTimeout, Object... message) {
+ int reverseTimeout, int retryDelayInMilliseconds, Object... message) {
return new AlphaResponse(false);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
index 36d57a2..d73f4bb 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
@@ -27,7 +27,7 @@
public SagaAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(EventType.SagaAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, stackTrace(throwable));
+ 0, 0, 0, 0, stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
index f2f7553..21ed738 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
@@ -21,6 +21,6 @@
public class SagaEndedEvent extends TxEvent {
SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0, 0, 0, 0);
+ super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0, 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
index edbfb32..31aab19 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@
public class SagaStartedEvent extends TxEvent {
public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
// use "" instead of null as compensationMethod requires not null in sql
- super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0, 0, 0, 0);
+ super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0, 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
index c6bc5a7..9a3d555 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
@@ -28,7 +28,7 @@
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
- 0, 0, 0, stackTrace(throwable));
+ 0, 0, 0, 0, stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
index 0c4adef..db009c2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
@@ -21,6 +21,6 @@
public class TxCompensateAckFailedEvent extends TxEvent {
public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0,0 ,0 ,0);
+ super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0,0 ,0 ,0, 0);
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
index 3191999..32870f7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
@@ -21,6 +21,6 @@
public class TxCompensateAckSucceedEvent extends TxEvent {
public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0, 0, 0, 0);
+ super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0, 0, 0, 0, 0);
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
index 3355eed..2290b31 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@
public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0);
+ super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
index 7b55c51..8209905 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0);
+ super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
index c860c80..b7dabaa 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
@@ -37,10 +37,12 @@
private final int forwardTimeout;
private final int reverseRetries;
private final int reverseTimeout;
+ private final int retryDelayInMilliseconds;
-
- public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... payloads) {
+ public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId,
+ String compensationMethod,
+ int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries,
+ int reverseTimeout, int retryDelayInMilliseconds, Object... payloads) {
this.timestamp = System.currentTimeMillis();
this.type = type;
this.globalTxId = globalTxId;
@@ -53,6 +55,7 @@
this.forwardTimeout = forwardTimeout;
this.reverseRetries = reverseRetries;
this.reverseTimeout = reverseTimeout;
+ this.retryDelayInMilliseconds = retryDelayInMilliseconds;
this.payloads = payloads;
}
@@ -108,6 +111,10 @@
return reverseTimeout;
}
+ public int retryDelayInMilliseconds() {
+ return retryDelayInMilliseconds;
+ }
+
@Override
public String toString() {
return type.name() + "{" +
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
index 59112bf..d358af6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
@@ -22,8 +22,8 @@
public class TxStartedEvent extends TxEvent {
public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... payloads) {
+ int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, int retryDelayInMilliseconds, Object... payloads) {
super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retryMethod,
- forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, payloads);
+ forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, retryDelayInMilliseconds, payloads);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index f2f547c..1ea6a3e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -73,7 +73,7 @@
*/
String compensationMethod() default "";
- int retryDelayInMilliseconds() default 0;
+ int retryDelayInMilliseconds() default 5;
/**
* <code>@Compensable</code> forward compensable method timeout, in seconds. <br>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
index 941c4ab..81899e4 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
@@ -91,11 +91,14 @@
@Test
public void sendsTxStartedEventBefore() throws Exception {
+ int timeout = new Random().nextInt();
int forwardRetries = new Random().nextInt();
int forwardTimeout = new Random().nextInt();
int reverseRetries = new Random().nextInt();
int reverseTimeout = new Random().nextInt();
- interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, message);
+ int retryDelayInMilliseconds = new Random().nextInt();
+ interceptor.preIntercept(parentTxId, compensationMethod, timeout, retryMethod, forwardRetries,
+ forwardTimeout, reverseRetries, reverseTimeout, retryDelayInMilliseconds, message);
TxEvent event = messages.get(0);
@@ -106,6 +109,8 @@
assertThat(event.forwardTimeout(), is(forwardTimeout));
assertThat(event.reverseRetries(), is(reverseRetries));
assertThat(event.reverseTimeout(), is(reverseTimeout));
+ assertThat(event.timeout(), is(timeout));
+ assertThat(event.retryDelayInMilliseconds(), is(retryDelayInMilliseconds));
assertThat(event.retryMethod(), is(retryMethod));
assertThat(event.type(), is(EventType.TxStartedEvent));
assertThat(event.compensationMethod(), is(compensationMethod));
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 876ae66..0f90bdc 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -47,7 +47,8 @@
int32 forwardRetries = 12;
int32 reverseRetries = 13;
int32 reverseTimeout = 14;
- string retryMethod = 15;
+ int32 retryDelayInMilliseconds = 15;
+ string retryMethod = 16;
}
message GrpcCompensateCommand {