blob: 847f0a43c511c8ebf8a9dc3175b9109e9fe58192 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.scheduler;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME;
/**
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to the relations between nodes.
*/
public class DagScheduler {
private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
private WorkflowDag workflowDag;
private ExecutionContext executionContext;
public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) {
this.workflowDag = workflowDag;
this.executionContext = new ExecutionContext(jsc, writerContext);
}
/**
* Method to start executing workflow DAGs.
*
* @throws Exception Thrown if schedule failed.
*/
public void schedule() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
try {
execute(service, workflowDag);
service.shutdown();
} finally {
if (!service.isShutdown()) {
log.info("Forcing shutdown of executor service, this might kill running tasks");
service.shutdownNow();
}
}
}
/**
* Method to start executing the nodes in workflow DAGs.
*
* @param service ExecutorService
* @param workflowDag instance of workflow dag that needs to be executed
* @throws Exception will be thrown if ant error occurred
*/
private void execute(ExecutorService service, WorkflowDag workflowDag) throws Exception {
// Nodes at the same level are executed in parallel
log.info("Running workloads");
List<DagNode> nodes = workflowDag.getNodeList();
int curRound = 1;
do {
log.warn("===================================================================");
log.warn("Running workloads for round num " + curRound);
log.warn("===================================================================");
Queue<DagNode> queue = new PriorityQueue<>();
for (DagNode dagNode : nodes) {
queue.add(dagNode.clone());
}
do {
List<Future> futures = new ArrayList<>();
Set<DagNode> childNodes = new HashSet<>();
while (queue.size() > 0) {
DagNode nodeToExecute = queue.poll();
log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig());
int finalCurRound = curRound;
futures.add(service.submit(() -> executeNode(nodeToExecute, finalCurRound)));
if (nodeToExecute.getChildNodes().size() > 0) {
childNodes.addAll(nodeToExecute.getChildNodes());
}
}
queue.addAll(childNodes);
childNodes.clear();
for (Future future : futures) {
future.get(1, TimeUnit.HOURS);
}
} while (queue.size() > 0);
log.info("Finished workloads for round num " + curRound);
if (curRound < workflowDag.getRounds()) {
new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext, curRound);
}
} while (curRound++ < workflowDag.getRounds());
log.info("Finished workloads");
}
/**
* Execute the given node.
*
* @param node The node to be executed
*/
protected void executeNode(DagNode node, int curRound) {
if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot re-execute");
}
try {
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
node.execute(executionContext, curRound);
log.info("Finished executing {}", node.getName());
repeatCount--;
}
node.setCompleted(true);
} catch (Exception e) {
log.error("Exception executing node", e);
throw new HoodieException(e);
}
}
}