blob: 037f51f70366c8aa22cf871b43324e129f5ff6ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.lite.internal.dag;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent;
import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Job dag service.
*/
@Slf4j
public class DagService implements CuratorCacheListener {
public static final String ROOT_JOB = "self";
private static final String DAG_LATCH_PATH = "/daglatch/";
private static final int DEFAULT_RETRY_INTERVAL = 30;
private static final String RETRY_PATH = "/dagretry/%s/%s";
private final DagNodeStorage dagNodeStorage;
private final JobDagConfiguration jobDagConfig;
private final String jobName;
private final String dagName;
private final InterProcessMutex mutex;
private final CoordinatorRegistryCenter regCenter;
private final JobEventBus jobEventBus;
private CuratorCache jobStateCache;
private DistributedDelayQueue<String> delayQueue;
public DagService(final CoordinatorRegistryCenter regCenter, final String jobName, final JobEventBus jobEventBus, final JobDagConfiguration jobDagConfig) {
this.jobDagConfig = jobDagConfig;
this.regCenter = regCenter;
this.jobName = jobName;
this.dagNodeStorage = new DagNodeStorage(regCenter, jobDagConfig.getDagName(), jobName);
this.dagName = jobDagConfig.getDagName();
this.jobEventBus = jobEventBus;
if (StringUtils.equals(jobDagConfig.getDagDependencies(), ROOT_JOB)) {
this.mutex = new InterProcessMutex((CuratorFramework) regCenter.getRawClient(), DAG_LATCH_PATH + dagName);
} else {
this.mutex = null;
}
regDagConfig();
}
public DagService(final CoordinatorRegistryCenter regCenter, final String dagName, final DagNodeStorage dagNodeStorage) {
this.regCenter = regCenter;
this.dagName = dagName;
this.dagNodeStorage = dagNodeStorage;
this.jobName = "";
this.jobDagConfig = null;
this.mutex = null;
this.jobStateCache = null;
this.delayQueue = null;
this.jobEventBus = null;
}
/**
* Init delay queue for retry jobs.
*
* @return DistributedDelayQueue
*/
private DistributedDelayQueue<String> initDelayQueue() {
String retryPath = String.format(RETRY_PATH, dagName, jobName);
DistributedDelayQueue<String> delayQueue = QueueBuilder.builder((CuratorFramework) regCenter.getRawClient(), new JobRetryTrigger(regCenter, dagName), new QueueSerializer<String>() {
@Override
public byte[] serialize(final String item) {
try {
return item.getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
log.error("Dag-{}[{}] Init delay queue exception.", dagName, jobName, e);
}
return null;
}
@Override
public String deserialize(final byte[] bytes) {
return new String(bytes);
}
}, retryPath).buildDelayQueue();
try {
delayQueue.start();
log.info("Dag-{}[{}] start delay queue, path={}", dagName, jobName, retryPath);
//CHECKSTYLE:OFF
} catch (Exception e) {
//CHECKSTYLE:ON
log.error("Dag-{}[{}] start delay queue Exception, path={}", dagName, jobName, retryPath, e);
}
return delayQueue;
}
private void startJobStateListener() {
jobStateCache.listenable().addListener(this);
try {
jobStateCache.start();
postEvent(DagJobStates.REG.getValue(), "Job register success");
//CHECKSTYLE:OFF
} catch (Exception exp) {
//CHECKSTYLE:ON
log.error("Start dag-{} job-{} state path listener Exception.", dagName, jobName, exp);
// ignore
postEvent(DagJobStates.REG.getValue(), "Job register Error:" + exp.getMessage());
}
log.info("Dag-{} job-{} state path listener has started success.", dagName, jobName);
}
private void stopJobStateListener() {
jobStateCache.close();
}
/**
* Is dag root job.
*
* @return boolean is dag root job
*/
public boolean isDagRootJob() {
return StringUtils.equals(jobDagConfig.getDagDependencies(), "self");
}
/**
* current dag status.
*
* @return DagStates
*/
public DagStates getDagStates() {
return DagStates.of(this.dagNodeStorage.currentDagStates());
}
/**
* Persist Dag config into zk.
* always overwrite.
*/
private void regDagConfig() {
this.dagNodeStorage.persistDagConfig(genDependenciesString());
this.delayQueue = initDelayQueue();
this.jobStateCache = CuratorCache.build((CuratorFramework) regCenter.getRawClient(), this.dagNodeStorage.pathOfJobNodeState());
this.startJobStateListener();
}
private String genDependenciesString() {
return jobDagConfig.getDagDependencies();
}
/**
* 1. select leader ;
* 2. ReGraph DAG ;
* 3. Change DAG states to running
*/
public void changeDagStatesAndReGraph() {
if (null == mutex) {
log.error("Need root job when change dag states and regraph!");
throw new DagRuntimeException("Need root job when change dag states and regraph!");
}
if (!acquireDagLeader()) {
blockUntilCompleted();
return;
}
if (getDagStates() == DagStates.RUNNING) {
log.info("Dag-{} states already RUNNING", dagName);
return;
}
try {
String batchNo = getBatchNo();
Map<String, Set<String>> allDagNode = dagNodeStorage.getAllDagConfigJobs();
checkCycle(allDagNode);
dagNodeStorage.initDagGraph(allDagNode, batchNo);
dagNodeStorage.updateDagStates(DagStates.RUNNING);
dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
// create graph event
postEvent(DagJobStates.INIT.getValue(), "Create graph success");
//CHECKSTYLE:OFF
} catch (Exception ex) {
//CHECKSTYLE:ON
postEvent(DagJobStates.INIT.getValue(), "Create graph error:" + ex.getMessage());
} finally {
releaseDagLeader();
}
}
private void blockUntilCompleted() {
int count = 0;
while (getDagStates() != DagStates.RUNNING) {
count++;
log.debug("DAG '{}' sleep short time until DAG graph completed. {}", dagName, count);
BlockUtils.sleep(300L);
if (count > 200) {
log.error("Dag-{} Wait a long time with Dag graph NOT complete", dagName);
throw new DagRuntimeException("Dag graph not complete!");
}
}
}
private boolean acquireDagLeader() {
try {
return mutex.acquire(200, TimeUnit.MILLISECONDS);
//CHECKSTYLE:OFF
} catch (Exception exp) {
//CHECKSTYLE:ON
log.debug("Dag-{} acquire lock error!", dagName, exp);
}
return false;
}
private void releaseDagLeader() {
try {
if (mutex.isAcquiredInThisProcess()) {
mutex.release();
}
//CHECKSTYLE:OFF
} catch (Exception exp) {
//CHECKSTYLE:ON
log.debug("Dag-{} release lock error!", dagName, exp);
}
}
/**
* Check dag has cycle.
*
* @param allDagNode dag config info.
*/
private void checkCycle(final Map<String, Set<String>> allDagNode) {
Map<String, Set<String>> cloneMap = Maps.newHashMap();
allDagNode.forEach((key, value) -> cloneMap.put(key, Sets.newHashSet(value)));
while (removeSelf(cloneMap)) {
if (log.isDebugEnabled()) {
log.debug("Dag-{} remove root job.", dagName);
}
}
if (!cloneMap.isEmpty()) {
log.error("Dag {} find cycle {}", dagName, cloneMap.keySet().size());
printCycleNode(cloneMap);
throw new DagRuntimeException("Dag job find cycles");
}
log.info("Dag {} checkCycle success", dagName);
}
private void printCycleNode(final Map<String, Set<String>> cloneMap) {
cloneMap.forEach((k, v) -> {
log.error("{} has cycle with {}", k, Joiner.on("|").join(v));
});
}
private boolean removeSelf(final Map<String, Set<String>> cloneMap) {
Iterator<Map.Entry<String, Set<String>>> iterator = cloneMap.entrySet().iterator();
boolean removed = false;
while (iterator.hasNext()) {
Map.Entry<String, Set<String>> next = iterator.next();
Set<String> value = next.getValue();
value.remove("self");
if (value.isEmpty()) {
markKeyAsSelf(cloneMap, next.getKey());
iterator.remove();
removed = true;
}
}
return removed;
}
private void markKeyAsSelf(final Map<String, Set<String>> cloneMap, final String key) {
cloneMap.values().forEach(s -> s.remove(key));
}
private String getBatchNo() {
String date = DateFormatUtils.format(new Date(), "yyMMddHHmmss");
return dagName + IpUtils.getIp() + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + date;
}
/**
* When dag job start run ,check it's dependencies jobs states.
*/
public void checkJobDependenciesState() {
DagJobStates currentJobRunStates = dagNodeStorage.getDagJobRunStates(jobName);
if (currentJobRunStates == DagJobStates.SUCCESS || currentJobRunStates == DagJobStates.FAIL) {
log.info("DAG- {} job- {} 's states is {},Can not run again!", jobDagConfig.getDagName(), jobName, currentJobRunStates);
throw new DagRuntimeException("Dag job has been completed");
}
if (isDagRootJob()) {
log.debug("DAG {} job {} is root,No deps.", jobDagConfig.getDagName(), jobName);
return;
}
// 要求dep skip 或 success
String[] deps = dagNodeStorage.getJobDenpendencies();
for (String dep : deps) {
if (StringUtils.equals(dep, "self")) {
continue;
}
DagJobStates jobRunStates = dagNodeStorage.getDagJobRunStates(dep);
if (jobRunStates != DagJobStates.SUCCESS && jobRunStates != DagJobStates.SKIP) {
log.info("DAG- {} job- {} Dependens job- {} Not ready!", dagName, jobName, dep);
throw new DagRuntimeException("Dag dependencies jobs not Ready");
}
}
}
private boolean retryJob() {
// get zk config ,check can retry?
JobDagConfiguration jobDagConfig = getJobDagConfig(false);
int times = dagNodeStorage.getJobRetryTimes();
if (jobDagConfig.getRetryTimes() < 1 || jobDagConfig.getRetryTimes() <= times) {
log.debug("Dag-{}[{}] config retry times{}, current times {} , skip retry!", dagName, jobName, jobDagConfig.getRetryTimes(), times);
if (jobDagConfig.isDagSkipWhenFail()) {
log.info("Dag-{}[{}] fail, mark as SKIP!", dagName, jobName);
dagNodeStorage.updateDagJobStates(JobStateEnum.SKIP);
} else {
dagNodeStorage.updateDagJobStates(JobStateEnum.FAIL);
}
return false;
}
// send to retry queue
try {
long interval = (jobDagConfig.getRetryInterval() <= 0 ? DEFAULT_RETRY_INTERVAL : jobDagConfig.getRetryInterval()) * 1000L;
delayQueue.put(dagName + "||" + jobName, System.currentTimeMillis() + interval);
//CHECKSTYLE:OFF
} catch (Exception exp) {
//CHECKSTYLE:ON
log.error("Dag-{}[{}] retry job to Delay queue Exception!", dagName, jobName, exp);
return false;
}
dagNodeStorage.updateJobRetryTimes(times + 1);
log.info("Dag-{}[{}] Retry job to delay queue success, times-[{}]", dagName, jobName, times + 1);
postEvent("retry", "Put to DQ");
return true;
}
/**
* Get JobDagConfig from local or zk.
*
* @param fromLocal From local or register center.
* @return dag config.
*/
private JobDagConfiguration getJobDagConfig(final boolean fromLocal) {
if (fromLocal) {
return jobDagConfig;
}
ConfigurationService configurationService = new ConfigurationService(this.regCenter, this.jobName);
JobConfiguration jobConfiguration = configurationService.load(false);
if (jobConfiguration == null || jobConfiguration.getJobDagConfiguration() == null) {
return jobDagConfig;
}
return jobConfiguration.getJobDagConfiguration();
}
/**
* There is no dag job running.
*
* @return true if no job running.
*/
private boolean hasNoJobRunning() {
return dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING).isEmpty();
}
/**
* Acquire next should trigger jobs.
*
* @return next should trigger jobs
*/
public List<String> nextShouldTriggerJob() {
final Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
final List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
final List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
final List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
final List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
List<String> nextList = Lists.newLinkedList();
allDagRunJobs.values().forEach(s -> {
if (s.removeIf(x -> successList.contains(x) || skipList.contains(x))) {
s.add("self");
}
});
successList.stream().forEach(j -> allDagRunJobs.remove(j));
skipList.stream().forEach(j -> allDagRunJobs.remove(j));
allDagRunJobs.entrySet().forEach(x -> {
if (x.getValue().isEmpty() || (x.getValue().size() == 1 && x.getValue().contains("self"))) {
nextList.add(x.getKey());
}
});
nextList.removeAll(runningList);
nextList.removeAll(failList);
log.info("Dag-{} acquire next should Trigger jobs: {}", dagName, nextList);
if (log.isDebugEnabled()) {
log.debug("Dag-{} NextTrigger all job nodes size:{}", dagName, allDagRunJobs.size());
allDagRunJobs.forEach((key, value) -> log.debug("Dag-{} acquire next should Trigger JOBS, Graph- {} = {}", dagName, key, Joiner.on(",").join(value)));
log.debug("Dag-{} acquire next should Trigger JOBS, SUCC-[{}], FAIL-[{}], RUN-[{}] , SKIP-[{}] ,Will Trigger-[{}].", dagName, successList, failList, runningList, skipList, nextList);
}
return nextList;
}
/**
* When Dag jobs all completed, Statistics dag state.
*
*/
private void statisticsDagState() {
DagStates dagStates = null;
Map<String, Set<String>> allDagRunJobs = dagNodeStorage.getAllDagGraphJobs();
List<String> successList = dagNodeStorage.getDagJobListByState(DagJobStates.SUCCESS);
List<String> runningList = dagNodeStorage.getDagJobListByState(DagJobStates.RUNNING);
List<String> failList = dagNodeStorage.getDagJobListByState(DagJobStates.FAIL);
List<String> skipList = dagNodeStorage.getDagJobListByState(DagJobStates.SKIP);
int totalJob = allDagRunJobs.size();
int succJob = successList.size();
int failJob = failList.size();
int skipJob = skipList.size();
int runningJob = runningList.size();
log.info("Dag-{}[Statistics] totalJob-{}, successJob-{}, failJob-{}, runningJob-{}, skipJob-{}", dagName, totalJob, succJob, failJob, runningJob, skipJob);
if (log.isDebugEnabled()) {
log.debug("Dag-{}[Statistics] SUCC-[{}], FAIL-[{}], RUNNING-[{}], SKIP-[{}]", dagName, successList, failList, runningList, skipList);
}
if ((succJob + skipJob) == totalJob) {
dagStates = DagStates.SUCCESS;
} else if (failJob > 0) {
dagStates = DagStates.FAIL;
} else {
dagStates = DagStates.RUNNING;
}
postEvent(dagStates.getValue(), "Dag Complete");
log.info("Dag-{}[Statistics] DAG run complete, Final State-[{}]!", dagName, dagStates);
dagNodeStorage.updateDagStates(dagStates);
}
private void postEvent(final String state, final String message) {
if (null != jobEventBus) {
jobEventBus.post(new DagJobExecutionEvent(dagName, jobName, dagNodeStorage.currentDagBatchNo(), state, message));
}
}
/**
* zkpath: jobName/state listener.
* @param type event type.
* @param oldData old data
* @param data new data.
*/
@Override
public void event(final Type type, final ChildData oldData, final ChildData data) {
if (!(type == Type.NODE_CHANGED || type == Type.NODE_CREATED)) {
return;
}
JobStateEnum jobState = JobStateEnum.of(new String(data.getData()));
log.info("Dag-{}[{}] receive job state EVENT-{}", dagName, jobName, jobState);
if (jobState == JobStateEnum.RUNNING || jobState == JobStateEnum.NONE) {
log.debug("Dag-{}[{}] receive job state EVENT NOT last state-[{}], skip.", dagName, jobName, jobState);
return;
}
// deal with retry when fail
if (jobState == JobStateEnum.FAIL) {
if (retryJob()) {
log.info("Dag-{}[{}] Put FAIL job to DQ Success! Waiting Triggered!", dagName, jobName);
return;
}
} else {
dagNodeStorage.updateDagJobStates(jobState);
}
postEvent(jobState.getValue(), "Job Complete");
DagStates dagState = getDagStates();
if (dagState == DagStates.PAUSE) {
log.info("Dag-{} current dag state is PAUSE, Do not trigger next jobs!", dagName);
return;
}
// Acquire next should trigger jobs
List<String> willTriggerJobs = nextShouldTriggerJob();
// If their is none job should trigger and current running job list is empty , start statistics dag state
if (willTriggerJobs.isEmpty()) {
if (hasNoJobRunning()) {
log.info("Dag-{}, No job running, Start statistics The DAG State.", dagName);
statisticsDagState();
return;
}
log.info("Dag-{}[{}] No trigger job, Wating for other running jobs.", dagName, jobName);
} else {
// Else register next trigger jobs.
log.info("Dag-{}[{}] trigger job list [{}].", dagName, jobName, willTriggerJobs);
willTriggerJobs.forEach(job -> {
postEvent("trigger", "Trigger Job");
dagNodeStorage.triggerJob(job);
});
}
}
}