blob: 1b5a3741f0aa0c7883f3ec7c125a94415e8e36a2 [file] [log] [blame]
/*
* Copyright 2017 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicecomb.saga.core;
import io.servicecomb.saga.core.dag.ByLevelTraveller;
import io.servicecomb.saga.core.dag.FromLeafTraversalDirection;
import io.servicecomb.saga.core.dag.FromRootTraversalDirection;
import io.servicecomb.saga.core.dag.SingleLeafDirectedAcyclicGraph;
import io.servicecomb.saga.core.dag.TraversalDirection;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import kamon.annotation.EnableKamon;
import kamon.annotation.Segment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@EnableKamon
public class Saga {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final EventStore eventStore;
private final Map<String, SagaTask> tasks;
private final Set<String> completedTransactions;
private final Set<String> completedCompensations;
private final Set<String> abortedTransactions;
private final Map<String, SagaRequest> hangingOperations;
private final TaskRunner transactionTaskRunner;
private final TaskRunner compensationTaskRunner;
private volatile SagaState currentTaskRunner;
Saga(
EventStore eventStore,
Map<String, SagaTask> tasks,
SingleLeafDirectedAcyclicGraph<SagaResponse, SagaRequest> sagaTaskGraph) {
this(eventStore, new BackwardRecovery(), tasks, sagaTaskGraph);
}
Saga(EventStore eventStore,
RecoveryPolicy recoveryPolicy,
Map<String, SagaTask> tasks,
SingleLeafDirectedAcyclicGraph<SagaResponse, SagaRequest> sagaTaskGraph) {
this(eventStore, Executors.newFixedThreadPool(5), recoveryPolicy, tasks, sagaTaskGraph);
}
public Saga(EventStore eventStore,
Executor executor,
RecoveryPolicy recoveryPolicy,
Map<String, SagaTask> tasks,
SingleLeafDirectedAcyclicGraph<SagaResponse, SagaRequest> sagaTaskGraph) {
this.eventStore = eventStore;
this.tasks = tasks;
this.completedTransactions = new HashSet<>();
this.completedCompensations = new HashSet<>();
this.abortedTransactions = new HashSet<>();
this.hangingOperations = new HashMap<>();
this.transactionTaskRunner = new TaskRunner(
traveller(sagaTaskGraph, new FromRootTraversalDirection<>()),
new TransactionTaskConsumer(tasks, new ExecutorCompletionService<>(executor),
new LoggingRecoveryPolicy(recoveryPolicy)));
this.compensationTaskRunner = new TaskRunner(
traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()),
new CompensationTaskConsumer(tasks, completedTransactions));
currentTaskRunner = transactionTaskRunner;
}
@Segment(name = "runSaga", category = "application", library = "kamon")
public String run() {
String failureInfo = null;
log.info("Starting Saga");
do {
try {
currentTaskRunner.run();
} catch (TransactionFailedException e) {
failureInfo = e.getMessage();
log.error("Failed to run operation", e);
currentTaskRunner = compensationTaskRunner;
// is it possible that other parallel transactions haven't write start event to saga log by now?
// if so, not all events are gathered here and some transactions will be missed
gatherEvents(eventStore);
hangingOperations.values().forEach(request -> {
tasks.get(request.task()).commit(request);
tasks.get(request.task()).compensate(request);
});
}
} while (currentTaskRunner.hasNext());
log.info("Completed Saga");
return failureInfo;
}
public void play() {
log.info("Start playing events");
gatherEvents(eventStore);
transactionTaskRunner.replay(completedTransactions);
if (!abortedTransactions.isEmpty() || !completedCompensations.isEmpty()) {
currentTaskRunner = compensationTaskRunner;
compensationTaskRunner.replay(completedCompensations);
}
log.info("Completed playing events");
}
private void gatherEvents(Iterable<SagaEvent> events) {
for (SagaEvent event : events) {
event.gatherTo(hangingOperations, abortedTransactions, completedTransactions, completedCompensations);
}
}
private ByLevelTraveller<SagaResponse, SagaRequest> traveller(
SingleLeafDirectedAcyclicGraph<SagaResponse, SagaRequest> sagaTaskGraph,
TraversalDirection<SagaResponse, SagaRequest> traversalDirection) {
return new ByLevelTraveller<>(sagaTaskGraph, traversalDirection);
}
}