blob: 64a67d6978e1fdf93ae2ded57ea6fe16193036a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordActionReadyXCommand;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
import org.apache.oozie.command.wf.ActionEndXCommand;
import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.util.ELUtils;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Attribute;
import org.jdom.Element;
/**
* The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
* queues them for execution.
*/
public class RecoveryService implements Service {
public static final String RECOVERY_SERVICE_CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
public static final String CONF_PREFIX_WF_ACTIONS = RECOVERY_SERVICE_CONF_PREFIX + "wf.actions.";
public static final String CONF_PREFIX_COORD = RECOVERY_SERVICE_CONF_PREFIX + "coord.";
public static final String CONF_PREFIX_BUNDLE = RECOVERY_SERVICE_CONF_PREFIX + "bundle.";
/**
* Time interval, in seconds, at which the recovery service will be scheduled to run.
*/
public static final String CONF_SERVICE_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "interval";
/**
* The number of callables to be queued in a batch.
*/
public static final String CONF_CALLABLE_BATCH_SIZE = RECOVERY_SERVICE_CONF_PREFIX + "callable.batch.size";
/**
* Delay for the push missing dependencies in milliseconds.
*/
public static final String CONF_PUSH_DEPENDENCY_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "push.dependency.interval";
/**
* Age of actions to queue, in seconds.
*/
public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval";
/**
* Age of coordinator jobs to recover, in seconds.
*/
public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
/**
* Age of Bundle jobs to recover, in seconds.
*/
public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
private static final String INSTRUMENTATION_GROUP = "recovery";
private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000;
/**
* RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
* queuing of commands.
*/
static class RecoveryRunnable implements Runnable {
private final long olderThan;
private final long coordOlderThan;
private final long bundleOlderThan;
private long delay = 0;
private List<XCallable<?>> callables;
private List<XCallable<?>> delayedCallables;
private StringBuilder msg = null;
private JPAService jpaService = null;
public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
this.olderThan = olderThan;
this.coordOlderThan = coordOlderThan;
this.bundleOlderThan = bundleOlderThan;
}
public void run() {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
msg = new StringBuilder();
jpaService = Services.get().get(JPAService.class);
runWFRecovery();
runCoordActionRecovery();
runBundleRecovery();
log.debug("QUEUED [{0}] for potential recovery", msg.toString());
boolean ret = false;
if (null != callables) {
ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
if (ret == false) {
log.warn("Unable to queue the callables commands for RecoveryService. "
+ "Most possibly command queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
callables = null;
}
if (null != delayedCallables) {
ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
if (ret == false) {
log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
+ "Most possibly Callable queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
delayedCallables = null;
this.delay = 0;
}
}
private void runBundleRecovery(){
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
List<BundleActionBean> bactions = null;
try {
bactions = BundleActionQueryExecutor.getInstance().getList(
BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan);
}
catch (JPAExecutorException ex) {
log.warn("Error reading bundle actions from database", ex);
return;
}
msg.append(", BUNDLE_ACTIONS : ").append(bactions.size());
for (BundleActionBean baction : bactions) {
try {
Services.get().get(InstrumentationService.class).get()
.incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) {
log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
continue;
}
if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) {
if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) {
CoordinatorJobBean coordJobs = CoordJobQueryExecutor.getInstance().getIfExist(
CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, baction.getCoordName(),
baction.getBundleId());
if (coordJobs == null) {
log.debug("Coord [{0}] for bundle [{1}] is not yet submitted , submitting new one",
baction.getCoordName(), baction.getBundleId());
BundleJobBean bundleJob = null;
if (jpaService != null) {
bundleJob = BundleJobQueryExecutor.getInstance().get(
BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
}
Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
@SuppressWarnings("unchecked")
List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
for (Element coordElem : coordElems) {
Attribute name = coordElem.getAttribute("name");
String coordName = name.getValue();
Configuration coordConf = mergeConfig(coordElem, bundleJob);
try {
coordName = ELUtils.resolveAppName(coordName, coordConf);
}
catch (Exception e) {
log.error("Error evaluating coord name " + e.getMessage(), e);
continue;
}
if (coordName.equals(baction.getCoordName())) {
coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
queueCallable(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(),
coordName));
}
}
}
else {
log.debug(
"Coord [{0}] for bundle [{1}] is submitted , but bundle action is not updated.",
baction.getCoordName(), baction.getBundleId());
coordJobs = CoordJobQueryExecutor.getInstance().getIfExist(
CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, baction.getCoordName(),
coordJobs.getId());
queueCallable(new BundleStatusUpdateXCommand(coordJobs, baction.getStatus()));
}
}
else if (baction.getStatus() == Job.Status.KILLED) {
queueCallable(new CoordKillXCommand(baction.getCoordId()));
}
else if (baction.getStatus() == Job.Status.SUSPENDED
|| baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
}
else if (baction.getStatus() == Job.Status.RUNNING
|| baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
queueCallable(new CoordResumeXCommand(baction.getCoordId()));
}
}
}
catch (Exception ex) {
log.error("Exception, {0}", ex.getMessage(), ex);
}
}
}
/**
* Recover coordinator actions that are staying in WAITING or SUBMITTED too long
*/
private void runCoordActionRecovery() {
Set<String> readyJobs = new HashSet<String>();
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL);
long pushMissingDepDelay = pushMissingDepInterval;
Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000);
List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>();
try {
cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts));
cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts));
}
catch (JPAExecutorException ex) {
log.warn("Error reading coord actions from database", ex);
return;
}
msg.append(", COORD_ACTIONS : " + cactions.size());
for (CoordinatorActionBean caction : cactions) {
try {
if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) {
Services.get().get(InstrumentationService.class).get()
.incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
log.debug("Recover a coord action from [WAITING] and resubmit CoordActionInputCheckXCommand :[{0}]"
, caction.getId());
if (caction.getPushMissingDependencies() != null
&& caction.getPushMissingDependencies().length() != 0) {
queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
pushMissingDepDelay);
pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
log.debug("Recover a coord action from [WAITING] and resubmit CoordPushDependencyCheckX :[{0}]"
, caction.getId());
}
}
else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId());
queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
coordJob.getAppName(), caction.getJobId()));
log.debug("Recover a coord action from [SUBMITTED] and resubmit CoordActionStartCommand :[{0}]",
caction.getId());
}
else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
if (caction.getExternalId() != null && caction.getPending() > 1) {
queueCallable(new SuspendXCommand(caction.getExternalId()));
log.debug("Recover a coord action from [SUSPENDED] and resubmit SuspendXCommand :[{0}]"
, caction.getId());
}
}
else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
if (caction.getExternalId() != null) {
queueCallable(new KillXCommand(caction.getExternalId()));
log.debug("Recover a coord action from [KILLED] and resubmit KillXCommand :[{0}]"
, caction.getId());
}
}
else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
if (caction.getExternalId() != null) {
queueCallable(new ResumeXCommand(caction.getExternalId()));
log.debug("Recover a coord action from [RUNNING] and resubmit ResumeXCommand :[{0}]"
, caction.getId());
}
}
else if (caction.getStatus() == CoordinatorActionBean.Status.READY) {
readyJobs.add(caction.getJobId());
}
}
}
catch (Exception ex) {
log.error("Exception, {0}", ex.getMessage(), ex);
}
}
runCoordActionRecoveryForReady(readyJobs);
}
/**
* Recover coordinator actions that are staying in READY too long
*/
private void runCoordActionRecoveryForReady(Set<String> jobIds) {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
List<String> coordJobIds = new ArrayList<String>(jobIds);
try {
coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds);
msg.append(", COORD_READY_JOBS : " + coordJobIds.size());
for (String jobid : coordJobIds) {
queueCallable(new CoordActionReadyXCommand(jobid));
log.debug("Recover a coord action from [READY] resubmit CoordActionReadyXCommand :[{0}]", jobid);
}
}
catch (Exception ex) {
log.error("Exception, {0}", ex.getMessage(), ex);
}
}
/**
* Recover wf actions
*/
private void runWFRecovery() {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
// queue command for action recovery
long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL)
* ONE_DAY_MILLISCONDS;
List<WorkflowActionBean> actions = null;
try {
actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
olderThan, createdTimeInterval);
}
catch (JPAExecutorException ex) {
log.warn("Exception while reading pending actions from storage", ex);
return;
}
// log.debug("QUEUING[{0}] pending wf actions for potential recovery",
// actions.size());
msg.append(" WF_ACTIONS " + actions.size());
for (WorkflowActionBean action : actions) {
try {
if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) {
Services.get().get(InstrumentationService.class).get()
.incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
if (action.getStatus() == WorkflowActionBean.Status.PREP
|| action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
log.debug("Recover a workflow action from [{0}] status and resubmit ActionStartXCommand :[{1}]",
action.getStatus(), action.getId());
}
else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
Date nextRunTime = action.getPendingAge();
queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
- System.currentTimeMillis());
log.debug("Recover a workflow action from [START_RETRY] status and resubmit ActionStartXCommand :[{0}]"
, action.getId());
}
else if (action.getStatus() == WorkflowActionBean.Status.DONE
|| action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
log.debug("Recover a workflow action from [{0}] status and resubmit ActionEndXCommand :[{1}]",
action.getStatus(), action.getId());
}
else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
Date nextRunTime = action.getPendingAge();
queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
- System.currentTimeMillis());
log.debug("Recover a workflow action from [END_RETRY] status and resubmit ActionEndXCommand :[{0}]",
action.getId());
}
else if (action.getStatus() == WorkflowActionBean.Status.OK
|| action.getStatus() == WorkflowActionBean.Status.ERROR) {
queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
log.debug("Recover a workflow action from [{0}] status and resubmit SignalXCommand :[{1}]",
action.getStatus(), action.getId());
}
else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
log.debug("Recover a workflow action from [USER_RETRY] status and resubmit ActionStartXCommand :[{0}]"
, action.getId());
}
}
}
catch (Exception ex) {
log.error("Exception, {0}", ex.getMessage(), ex);
}
}
}
/**
* Adds callables to a list. If the number of callables in the list reaches {@link
* RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
*
* @param callable the callable to queue.
*/
private void queueCallable(XCallable<?> callable) {
if (callables == null) {
callables = new ArrayList<XCallable<?>>();
}
callables.add(callable);
if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
if (ret == false) {
XLog.getLog(getClass()).warn(
"Unable to queue the callables commands for RecoveryService. "
+ "Most possibly command queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
callables = new ArrayList<XCallable<?>>();
}
}
/**
* Adds callables to a list. If the number of callables in the list reaches {@link
* RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
* of the callables in the list. The callables list and the delay is reset.
*
* @param callable the callable to queue.
* @param delay the delay for the callable.
*/
private void queueCallable(XCallable<?> callable, long delay) {
if (delayedCallables == null) {
delayedCallables = new ArrayList<XCallable<?>>();
}
this.delay = Math.max(this.delay, delay);
delayedCallables.add(callable);
if (delayedCallables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)){
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
if (ret == false) {
XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
+ "Most possibly Callable queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
delayedCallables = new ArrayList<XCallable<?>>();
this.delay = 0;
}
}
}
/**
* Initializes the RecoveryService.
*
* @param services services instance.
*/
@Override
public void init(Services services) {
Configuration conf = services.getConf();
Runnable recoveryRunnable = new RecoveryRunnable(
ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN),
ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN),
ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN));
services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
SchedulerService.Unit.SEC);
}
public int getRecoveryServiceInterval(Configuration conf){
return ConfigurationService.getInt(conf, CONF_SERVICE_INTERVAL);
}
/**
* Destroy the Recovery Service.
*/
@Override
public void destroy() {
}
/**
* Return the public interface for the Recovery Service.
*
* @return {@link RecoveryService}.
*/
@Override
public Class<? extends Service> getInterface() {
return RecoveryService.class;
}
/**
* Merge Bundle job config and the configuration from the coord job to pass
* to Coord Engine
*
* @param coordElem the coordinator configuration
* @return Configuration merged configuration
* @throws CommandException thrown if failed to merge configuration
*/
private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
XLog.Info.get().clear();
XLog log = XLog.getLog("RecoveryService");
String jobConf = bundleJob.getConf();
// Step 1: runConf = jobConf
Configuration runConf = null;
try {
runConf = new XConfiguration(new StringReader(jobConf));
}
catch (IOException e1) {
log.warn("Configuration parse error in:" + jobConf);
throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
}
// Step 2: Merge local properties into runConf
// extract 'property' tags under 'configuration' block in the coordElem
// convert Element to XConfiguration
Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
if (localConfigElement != null) {
String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
Configuration localConf;
try {
localConf = new XConfiguration(new StringReader(strConfig));
}
catch (IOException e1) {
log.warn("Configuration parse error in:" + strConfig);
throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
}
// copy configuration properties in the coordElem to the runConf
XConfiguration.copy(localConf, runConf);
}
// Step 3: Extract value of 'app-path' in coordElem, save it as a
// new property called 'oozie.coord.application.path', and normalize.
String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
// Normalize coordinator appPath here;
try {
JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
}
catch (IOException e) {
throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
}
return runConf;
}
}