blob: 40c47d5043db880f689c354b33aa78f2f9c1695c [file] [log] [blame]
/*
* Copyright 2017 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicecomb.saga.core;
import io.servicecomb.saga.core.dag.Node;
import io.servicecomb.saga.core.dag.Traveller;
import java.util.Collection;
import java.util.Set;
import kamon.annotation.EnableKamon;
import kamon.annotation.Segment;
@EnableKamon
class TaskRunner implements SagaState {
private final Traveller<SagaResponse, SagaRequest> traveller;
private final TaskConsumer taskConsumer;
TaskRunner(Traveller<SagaResponse, SagaRequest> traveller, TaskConsumer taskConsumer) {
this.traveller = traveller;
this.taskConsumer = taskConsumer;
}
@Override
public boolean hasNext() {
return traveller.hasNext();
}
@Segment(name = "runTask", category = "application", library = "kamon")
@Override
public void run() {
Collection<Node<SagaResponse, SagaRequest>> nodes = traveller.nodes();
// finish pending tasks from saga log at startup
if (!nodes.isEmpty()) {
taskConsumer.consume(nodes);
nodes.clear();
}
while (traveller.hasNext()) {
traveller.next(null);
taskConsumer.consume(nodes);
nodes.clear();
}
}
@Override
public void replay(Set<String> completedOperations) {
boolean played = false;
Collection<Node<SagaResponse, SagaRequest>> nodes = traveller.nodes();
while (traveller.hasNext() && !played) {
traveller.next(null);
played = taskConsumer.replay(nodes, completedOperations);
}
}
}