CAUSEWAY-3738: factors out some helper methods is all
diff --git a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
index 16c9371..4dec5bf 100644
--- a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
+++ b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
@@ -25,12 +25,15 @@
import javax.inject.Inject;
+import org.apache.causeway.commons.functional.Try;
import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntryRepository;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
+
+import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
@@ -67,6 +70,9 @@
@Log4j2
public class RunBackgroundCommandsJob implements Job {
+ final static int RETRY_COUNT = 3;
+ final static long RETRY_INTERVAL_MILLIS = 1000;
+
@Inject InteractionService interactionService;
@Inject TransactionService transactionService;
@Inject ClockService clockService;
@@ -83,8 +89,8 @@
return;
}
- UserMemento user = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role");
- InteractionContext interactionContext = InteractionContext.builder().user(user).build();
+ val userMemento = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role");
+ val interactionContext = InteractionContext.builder().user(userMemento).build();
// we obtain the list of Commands first; we use their CommandDto as it is serializable across transactions
final Optional<List<CommandDto>> commandDtosIfAny = pendingCommandDtos(interactionContext);
@@ -92,35 +98,46 @@
// for each command, we execute within its own transaction. Failure of one should not impact the next.
commandDtosIfAny.ifPresent(commandDtos -> {
for (val commandDto : commandDtos) {
- executeWithinOwnTransaction(commandDto, interactionContext);
+ executeCommandWithinOwnTransaction(commandDto, interactionContext);
}
});
}
- private Optional<List<CommandDto>> pendingCommandDtos(InteractionContext interactionContext) {
- final Optional<List<CommandDto>> commandDtosIfAny =
- interactionService.callAndCatch(interactionContext, () ->
- transactionService.callTransactional(Propagation.REQUIRES_NEW, () ->
- commandLogEntryRepository.findBackgroundAndNotYetStarted()
- .stream()
- .map(CommandLogEntry::getCommandDto)
- .collect(Collectors.toList())
- )
- .ifFailureFail()
- .valueAsNonNullElseFail()
- )
- .ifFailureFail() // we give up if unable to find these
- .getValue();
- return commandDtosIfAny;
+ private Optional<List<CommandDto>> pendingCommandDtos(final InteractionContext interactionContext) {
+ return interactionService.callAndCatch(interactionContext, () ->
+ transactionService.callTransactional(Propagation.REQUIRES_NEW, () ->
+ commandLogEntryRepository.findBackgroundAndNotYetStarted()
+ .stream()
+ .map(CommandLogEntry::getCommandDto)
+ .collect(Collectors.toList())
+ )
+ .ifFailureFail()
+ .valueAsNonNullElseFail()
+ )
+ .ifFailureFail() // we give up if unable to find these
+ .getValue();
}
- private void executeWithinOwnTransaction(CommandDto commandDto, InteractionContext interactionContext) {
- interactionService.runAndCatch(interactionContext, () -> {
- executeCommandWithinOwnTransactionElseFail(commandDto);
- })
- .ifFailure(throwable -> {
+ private void executeCommandWithinOwnTransaction(
+ final CommandDto commandDto,
+ final InteractionContext interactionContext
+ ) {
+ int retryCount = RETRY_COUNT;
+ while(retryCount > 0) {
+ Try<?> result = interactionService.runAndCatch(interactionContext, () -> {
+ executeCommandWithinOwnTransactionElseFail(commandDto);
+ });
+ if (isEncounteredDeadlock(result)) {
+ retryCount--;
+ log.debug("Deadlock occurred, retrying command: " + CommandDtoUtils.dtoMapper().toString(commandDto));
+ sleep(RETRY_INTERVAL_MILLIS);
+ } else {
+ retryCount=0; // ie break
+ result.ifFailure(throwable -> {
logAndCaptureFailure(throwable, commandDto, interactionContext);
});
+ }
+ }
}
private void executeCommandWithinOwnTransactionElseFail(CommandDto commandDto) {
@@ -157,4 +174,21 @@
});
}
+ private static boolean isEncounteredDeadlock(Try<?> result) {
+ if (!result.isFailure()) {
+ return false;
+ }
+ return result.getFailure()
+ .map(throwable -> throwable instanceof DeadlockLoserDataAccessException)
+ .orElse(false);
+ }
+
+ private static void sleep(long retryIntervalMs) {
+ try {
+ Thread.sleep(retryIntervalMs);
+ } catch (InterruptedException e) {
+ // do nothing - continue
+ }
+ }
+
}