| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.workflow.engine; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.falcon.FalconException; |
| import org.apache.falcon.LifeCycle; |
| import org.apache.falcon.entity.ClusterHelper; |
| import org.apache.falcon.entity.EntityUtil; |
| import org.apache.falcon.entity.store.ConfigurationStore; |
| import org.apache.falcon.entity.v0.Entity; |
| import org.apache.falcon.entity.v0.EntityGraph; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.Frequency; |
| import org.apache.falcon.entity.v0.Frequency.TimeUnit; |
| import org.apache.falcon.entity.v0.SchemaHelper; |
| import org.apache.falcon.entity.v0.cluster.Cluster; |
| import org.apache.falcon.entity.v0.feed.Feed; |
| import org.apache.falcon.hadoop.HadoopClientFactory; |
| import org.apache.falcon.oozie.OozieBundleBuilder; |
| import org.apache.falcon.oozie.OozieEntityBuilder; |
| import org.apache.falcon.oozie.bundle.BUNDLEAPP; |
| import org.apache.falcon.oozie.bundle.CONFIGURATION.Property; |
| import org.apache.falcon.oozie.bundle.COORDINATOR; |
| import org.apache.falcon.resource.APIResult; |
| import org.apache.falcon.resource.InstancesResult; |
| import org.apache.falcon.resource.InstancesResult.Instance; |
| import org.apache.falcon.resource.InstancesResult.WorkflowStatus; |
| import org.apache.falcon.resource.InstancesSummaryResult; |
| import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary; |
| import org.apache.falcon.security.CurrentUser; |
| import org.apache.falcon.update.UpdateHelper; |
| import org.apache.falcon.util.DateUtil; |
| import org.apache.falcon.util.OozieUtils; |
| import org.apache.falcon.util.RuntimeProperties; |
| import org.apache.falcon.util.StartupProperties; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.oozie.client.BundleJob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.CoordinatorJob.Timeunit; |
| import org.apache.oozie.client.JMSConnectionInfo; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.Job.Status; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.client.rest.RestConstants; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Calendar; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TimeZone; |
| |
| /** |
| * Workflow engine which uses oozies APIs. |
| */ |
| public class OozieWorkflowEngine extends AbstractWorkflowEngine { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(OozieWorkflowEngine.class); |
| |
| private static final BundleJob MISSING = new NullBundleJob(); |
| |
| private static final List<WorkflowJob.Status> WF_KILL_PRECOND = |
| Arrays.asList(WorkflowJob.Status.PREP, WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED, |
| WorkflowJob.Status.FAILED); |
| private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING); |
| private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED); |
| private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = |
| Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED, |
| CoordinatorAction.Status.KILLED, CoordinatorAction.Status.SUCCEEDED, CoordinatorAction.Status.IGNORED); |
| private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = |
| Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED, |
| Job.Status.RUNNINGWITHERROR, Job.Status.PAUSED, Status.PREPPAUSED, Status.PAUSEDWITHERROR); |
| private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS = |
| Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Status.SUSPENDEDWITHERROR); |
| private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, |
| Job.Status.RUNNINGWITHERROR); |
| private static final List<Job.Status> BUNDLE_SUCCEEDED_STATUS = Arrays.asList(Job.Status.SUCCEEDED); |
| private static final List<Job.Status> BUNDLE_FAILED_STATUS = Arrays.asList(Job.Status.FAILED, |
| Job.Status.DONEWITHERROR); |
| private static final List<Job.Status> BUNDLE_KILLED_STATUS = Arrays.asList(Job.Status.KILLED); |
| |
| private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND = |
| Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR); |
| private static final List<Job.Status> BUNDLE_RESUME_PRECOND = |
| Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED); |
| private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; |
| private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; |
| private static final String FALCON_SKIP_DRYRUN = "falcon.skip.dryrun"; |
| |
| private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100; // milliseconds |
| private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count"; |
| |
| private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList( |
| "pre-processing", |
| "recordsize", |
| "succeeded-post-processing", |
| "failed-post-processing" |
| ); |
| |
| private static final String[] BUNDLE_UPDATEABLE_PROPS = |
| new String[]{"parallel", "clusters.clusters[\\d+].validity.end", }; |
| |
| public static final ConfigurationStore STORE = ConfigurationStore.get(); |
| |
| public OozieWorkflowEngine() { |
| registerListener(new OozieHouseKeepingService()); |
| } |
| |
| @Override |
| public boolean isAlive(Cluster cluster) throws FalconException { |
| try { |
| return OozieClientFactory.get(cluster).getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL; |
| } catch (OozieClientException e) { |
| throw new FalconException("Unable to reach Oozie server.", e); |
| } |
| } |
| |
| @Override |
| public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException { |
| if (StartupProperties.isServerInSafeMode()) { |
| throwSafemodeException("SCHEDULE"); |
| } |
| //Adding group information to pass to oozie |
| if (entity.getACL() != null && entity.getACL().getGroup() != null) { |
| if (suppliedProps == null) { |
| suppliedProps = new HashMap<>(); |
| } |
| suppliedProps.put(OozieClient.GROUP_NAME, entity.getACL().getGroup()); |
| } |
| Map<String, BundleJob> bundleMap = findLatestBundle(entity); |
| List<String> schedClusters = new ArrayList<String>(); |
| for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) { |
| String cluster = entry.getKey(); |
| BundleJob bundleJob = entry.getValue(); |
| if (bundleJob == MISSING) { |
| schedClusters.add(cluster); |
| } else { |
| LOG.debug("Entity {} is already scheduled on cluster {}", entity.getName(), cluster); |
| } |
| } |
| |
| if (!schedClusters.isEmpty()) { |
| OozieEntityBuilder builder = OozieEntityBuilder.get(entity); |
| for (String clusterName: schedClusters) { |
| Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); |
| prepareEntityBuildPath(entity, cluster); |
| Path buildPath = EntityUtil.getNewStagingPath(cluster, entity); |
| Properties properties = builder.build(cluster, buildPath, suppliedProps); |
| if (properties == null) { |
| LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster); |
| continue; |
| } |
| |
| //Do dryRun of coords before schedule as schedule is asynchronous |
| LOG.debug("The properties passed to oozie are: {}", properties.stringPropertyNames().toString()); |
| dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun); |
| scheduleEntity(clusterName, properties, entity); |
| } |
| } |
| } |
| |
| private void throwSafemodeException(String operation) throws FalconException { |
| String error = "Workflow Engine does not allow " + operation + " opeartion when Falcon server is in safemode"; |
| LOG.error(error); |
| throw new FalconException(error); |
| } |
| |
| /** |
| * Prepare the staging and logs dir for this entity with default permissions. |
| * |
| * @param entity entity |
| * @param cluster cluster entity |
| * @throws FalconException |
| */ |
| private void prepareEntityBuildPath(Entity entity, Cluster cluster) throws FalconException { |
| Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity); |
| Path logPath = EntityUtil.getLogPath(cluster, entity); |
| |
| try { |
| FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( |
| ClusterHelper.getConfiguration(cluster)); |
| HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath); |
| HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath); |
| } catch (IOException e) { |
| throw new FalconException("Error preparing base staging dirs: " + stagingPath, e); |
| } |
| } |
| |
| @Override |
| public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { |
| if (StartupProperties.isServerInSafeMode()) { |
| throwSafemodeException("DRYRUN"); |
| } |
| OozieEntityBuilder builder = OozieEntityBuilder.get(entity); |
| Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis()); |
| Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); |
| Properties props = builder.build(cluster, buildPath); |
| if (props != null) { |
| dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun); |
| } |
| } |
| |
| private void dryRunInternal(Cluster cluster, Path buildPath, Boolean skipDryRun) throws FalconException { |
| if (null != skipDryRun && skipDryRun) { |
| LOG.info("Skipping dryrun as directed by param in cli/RestApi."); |
| return; |
| } else { |
| String skipDryRunStr = RuntimeProperties.get().getProperty(FALCON_SKIP_DRYRUN, "false").toLowerCase(); |
| if (Boolean.valueOf(skipDryRunStr)) { |
| LOG.info("Skipping dryrun as directed by Runtime properties."); |
| return; |
| } |
| } |
| |
| BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath); |
| OozieClient client = OozieClientFactory.get(cluster.getName()); |
| for (COORDINATOR coord : bundle.getCoordinator()) { |
| Properties props = new Properties(); |
| props.setProperty(OozieClient.COORDINATOR_APP_PATH, coord.getAppPath()); |
| for (Property prop : coord.getConfiguration().getProperty()) { |
| props.setProperty(prop.getName(), prop.getValue()); |
| } |
| try { |
| LOG.info("dryRun with properties {}", props); |
| client.dryrun(props); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isActive(Entity entity) throws FalconException { |
| return isBundleInState(findLatestBundle(entity), BundleStatus.ACTIVE); |
| } |
| |
| @Override |
| public boolean isSuspended(Entity entity) throws FalconException { |
| return isBundleInState(findLatestBundle(entity), BundleStatus.SUSPENDED); |
| } |
| |
| @Override |
| public boolean isCompleted(Entity entity) throws FalconException { |
| Map<String, BundleJob> bundles = findLatestBundle(entity); |
| return (isBundleInState(bundles, BundleStatus.SUCCEEDED) |
| || isBundleInState(bundles, BundleStatus.FAILED) |
| || isBundleInState(bundles, BundleStatus.KILLED)); |
| } |
| |
| @Override |
| public boolean isMissing(Entity entity) throws FalconException { |
| List<String> bundlesToRemove = new ArrayList<>(); |
| Map<String, BundleJob> bundles = findLatestBundle(entity); |
| for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) { |
| if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster |
| bundlesToRemove.add(clusterBundle.getKey()); |
| } |
| } |
| for (String bundleToRemove : bundlesToRemove) { |
| bundles.remove(bundleToRemove); |
| } |
| if (bundles.size() == 0) { |
| return true; |
| } |
| return false; |
| } |
| |
| private enum BundleStatus { |
| ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED |
| } |
| |
| private boolean isBundleInState(Map<String, BundleJob> bundles, |
| BundleStatus status) throws FalconException { |
| |
| // Need a separate list to avoid concurrent modification. |
| List<String> bundlesToRemove = new ArrayList<>(); |
| // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is not scheduled. Return false. |
| for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) { |
| if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster |
| bundlesToRemove.add(clusterBundle.getKey()); |
| } |
| } |
| for (String bundleToRemove : bundlesToRemove) { |
| bundles.remove(bundleToRemove); |
| } |
| if (bundles.size() == 0) { |
| return false; |
| } |
| |
| for (BundleJob bundle : bundles.values()) { |
| switch (status) { |
| case ACTIVE: |
| if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| |
| case RUNNING: |
| if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| |
| case SUSPENDED: |
| if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| |
| case FAILED: |
| if (!BUNDLE_FAILED_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| |
| case KILLED: |
| if (!BUNDLE_KILLED_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| |
| case SUCCEEDED: |
| if (!BUNDLE_SUCCEEDED_STATUS.contains(bundle.getStatus())) { |
| return false; |
| } |
| break; |
| default: |
| } |
| LOG.debug("Bundle {} is in state {}", bundle.getAppName(), status.name()); |
| } |
| return true; |
| } |
| |
| //Return all bundles for the entity in the requested cluster |
| private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException { |
| Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); |
| List<BundleJob> filteredJobs = new ArrayList<BundleJob>(); |
| try { |
| List<BundleJob> jobs = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME |
| + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256); |
| if (jobs != null) { |
| for (BundleJob job : jobs) { |
| // Path is extracted twice as to handle changes in hadoop configurations for nameservices. |
| if (EntityUtil.isStagingPath(cluster, entity, |
| new Path((new Path(job.getAppPath())).toUri().getPath()))) { |
| //Load bundle as coord info is not returned in getBundleJobsInfo() |
| BundleJob bundle = getBundleInfo(clusterName, job.getId()); |
| filteredJobs.add(bundle); |
| LOG.trace("Found bundle {} with app path {} and status {}", |
| job.getId(), job.getAppPath(), job.getStatus()); |
| } |
| } |
| } |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| return filteredJobs; |
| } |
| |
| //Return all bundles for the entity for each cluster |
| private Map<String, List<BundleJob>> findBundles(Entity entity) throws FalconException { |
| Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); |
| Map<String, List<BundleJob>> jobMap = new HashMap<String, List<BundleJob>>(); |
| for (String cluster : clusters) { |
| jobMap.put(cluster, findBundles(entity, cluster)); |
| } |
| return jobMap; |
| } |
| |
| //Return latest bundle(last created) for the entity for each cluster |
| private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException { |
| Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); |
| Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>(); |
| for (String cluster : clusters) { |
| BundleJob bundleJob = findLatestBundle(entity, cluster); |
| jobMap.put(cluster, bundleJob); |
| } |
| return jobMap; |
| } |
| |
| //Return latest bundle(last created) for the entity in the requested cluster |
| private BundleJob findLatestBundle(Entity entity, String cluster) throws FalconException { |
| List<BundleJob> bundles = findBundles(entity, cluster); |
| if (bundles == null || bundles.isEmpty()) { |
| return MISSING; |
| } |
| |
| return Collections.max(bundles, new Comparator<BundleJob>() { |
| @Override |
| public int compare(BundleJob o1, BundleJob o2) { |
| return o1.getCreatedTime().compareTo(o2.getCreatedTime()); |
| } |
| }); |
| } |
| |
| @Override |
| public String suspend(Entity entity) throws FalconException { |
| return doBundleAction(entity, BundleAction.SUSPEND); |
| } |
| |
| @Override |
| public String resume(Entity entity) throws FalconException { |
| return doBundleAction(entity, BundleAction.RESUME); |
| } |
| |
| @Override |
| public String delete(Entity entity) throws FalconException { |
| return doBundleAction(entity, BundleAction.KILL); |
| } |
| |
| @Override |
| public String delete(Entity entity, String cluster) throws FalconException { |
| return doBundleAction(entity, BundleAction.KILL, cluster); |
| } |
| |
| private enum BundleAction { |
| SUSPEND, RESUME, KILL |
| } |
| |
| private String doBundleAction(Entity entity, BundleAction action) throws FalconException { |
| if (StartupProperties.isServerInSafeMode() && !action.equals(BundleAction.SUSPEND)) { |
| throwSafemodeException(action.name()); |
| } |
| Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); |
| String result = null; |
| for (String cluster : clusters) { |
| result = doBundleAction(entity, action, cluster); |
| } |
| return result; |
| } |
| |
| private String doBundleAction(Entity entity, BundleAction action, String cluster) throws FalconException { |
| List<BundleJob> jobs = findBundles(entity, cluster); |
| beforeAction(entity, action, cluster); |
| for (BundleJob job : jobs) { |
| switch (action) { |
| case SUSPEND: |
| // not already suspended and preconditions are true |
| if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus()) && BUNDLE_SUSPEND_PRECOND.contains( |
| job.getStatus())) { |
| suspend(cluster, job.getId()); |
| } |
| break; |
| |
| case RESUME: |
| // not already running and preconditions are true |
| if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus()) && BUNDLE_RESUME_PRECOND.contains( |
| job.getStatus())) { |
| resume(cluster, job.getId()); |
| } |
| break; |
| |
| case KILL: |
| // not already killed and preconditions are true |
| killBundle(cluster, job); |
| break; |
| |
| default: |
| } |
| } |
| afterAction(entity, action, cluster); |
| return "SUCCESS"; |
| } |
| |
| private void killBundle(String clusterName, BundleJob job) throws FalconException { |
| OozieClient client = OozieClientFactory.get(clusterName); |
| try { |
| //kill all coords |
| for (CoordinatorJob coord : job.getCoordinators()) { |
| client.kill(coord.getId()); |
| LOG.debug("Killed coord {} on cluster {}", coord.getId(), clusterName); |
| } |
| |
| //kill bundle |
| client.kill(job.getId()); |
| LOG.debug("Killed bundle {} on cluster {}", job.getId(), clusterName); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void beforeAction(Entity entity, BundleAction action, String cluster) throws FalconException { |
| |
| for (WorkflowEngineActionListener listener : listeners) { |
| switch (action) { |
| case SUSPEND: |
| listener.beforeSuspend(entity, cluster); |
| break; |
| |
| case RESUME: |
| listener.beforeResume(entity, cluster); |
| break; |
| |
| case KILL: |
| listener.beforeDelete(entity, cluster); |
| break; |
| default: |
| } |
| } |
| } |
| |
| private void afterAction(Entity entity, BundleAction action, String cluster) throws FalconException { |
| |
| for (WorkflowEngineActionListener listener : listeners) { |
| switch (action) { |
| case SUSPEND: |
| listener.afterSuspend(entity, cluster); |
| break; |
| |
| case RESUME: |
| listener.afterResume(entity, cluster); |
| break; |
| |
| case KILL: |
| listener.afterDelete(entity, cluster); |
| break; |
| default: |
| } |
| } |
| } |
| |
| @Override |
| public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException { |
| try { |
| Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); |
| List<Instance> runInstances = new ArrayList<Instance>(); |
| |
| for (String cluster : clusters) { |
| OozieClient client = OozieClientFactory.get(cluster); |
| List<String> wfNames = EntityUtil.getWorkflowNames(entity); |
| List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames); |
| if (wfs != null) { |
| for (WorkflowJob job : wfs) { |
| WorkflowJob wf = client.getJobInfo(job.getId()); |
| if (StringUtils.isEmpty(wf.getParentId())) { |
| continue; |
| } |
| |
| CoordinatorAction action = client.getCoordActionInfo(wf.getParentId()); |
| String nominalTimeStr = SchemaHelper.formatDateUTC(action.getNominalTime()); |
| Instance instance = new Instance(cluster, nominalTimeStr, WorkflowStatus.RUNNING); |
| instance.startTime = wf.getStartTime(); |
| if (entity.getEntityType() == EntityType.FEED) { |
| instance.sourceCluster = getSourceCluster(cluster, action, entity); |
| } |
| runInstances.add(instance); |
| } |
| } |
| } |
| InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances"); |
| result.setInstances(runInstances.toArray(new Instance[runInstances.size()])); |
| return result; |
| |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public InstancesResult killInstances(Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles) throws FalconException { |
| return doJobAction(JobAction.KILL, entity, start, end, props, lifeCycles); |
| } |
| |
| @Override |
| public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| return doJobAction(JobAction.IGNORE, entity, start, end, props, lifeCycles); |
| } |
| @Override |
| public InstancesResult reRunInstances(Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles, |
| Boolean isForced) throws FalconException { |
| if (isForced == null) { |
| isForced = false; |
| } |
| return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, false, isForced); |
| } |
| |
| @Override |
| public InstancesResult suspendInstances(Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles) throws FalconException { |
| return doJobAction(JobAction.SUSPEND, entity, start, end, props, lifeCycles); |
| } |
| |
| @Override |
| public InstancesResult resumeInstances(Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles) throws FalconException { |
| return doJobAction(JobAction.RESUME, entity, start, end, props, lifeCycles); |
| } |
| |
| @Override |
| public InstancesResult getStatus(Entity entity, Date start, Date end, |
| List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException { |
| |
| return doJobAction(JobAction.STATUS, entity, start, end, null, lifeCycles, allAttempts); |
| } |
| |
| @Override |
| public InstancesSummaryResult getSummary(Entity entity, Date start, Date end, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| |
| return doSummaryJobAction(entity, start, end, null, lifeCycles); |
| } |
| |
| @Override |
| public InstancesResult getInstanceParams(Entity entity, Date start, Date end, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles); |
| } |
| |
| @Override |
| public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException { |
| OozieClient client = OozieClientFactory.get(cluster); |
| try { |
| JMSConnectionInfo jmsConnection = client.getJMSConnectionInfo(); |
| if (jmsConnection != null && !jmsConnection.getJNDIProperties().isEmpty()){ |
| String falconTopic = StartupProperties.get().getProperty("entity.topic", "FALCON.ENTITY.TOPIC"); |
| String oozieTopic = client.getJMSTopicName(jobID); |
| if (falconTopic.equals(oozieTopic)) { |
| return true; |
| } |
| } |
| } catch (OozieClientException e) { |
| LOG.debug("Error while retrieving JMS connection info", e); |
| } |
| |
| return false; |
| } |
| |
| private static enum JobAction { |
| KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS, IGNORE |
| } |
| |
| private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException { |
| try { |
| return OozieClientFactory.get(cluster).getJobInfo(wfId); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private List<WorkflowJob> getWfsForCoordAction(String cluster, String coordActionId) throws FalconException { |
| try { |
| return OozieClientFactory.get(cluster).getWfsForCoordAction(coordActionId); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles) throws FalconException { |
| if (StartupProperties.isServerInSafeMode() |
| && (action.equals(JobAction.RERUN) || action.equals(JobAction.RESUME))) { |
| throwSafemodeException(action.name()); |
| } |
| return doJobAction(action, entity, start, end, props, lifeCycles, null); |
| } |
| |
| //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck |
| private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, |
| Properties props, List<LifeCycle> lifeCycles, |
| Boolean allAttempts, boolean isForced) throws FalconException { |
| Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles); |
| List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); |
| List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS); |
| APIResult.Status overallStatus = APIResult.Status.SUCCEEDED; |
| int instanceCount = 0; |
| |
| List<Instance> instances = new ArrayList<Instance>(); |
| for (Map.Entry<String, List<CoordinatorAction>> entry : actionsMap.entrySet()) { |
| String cluster = entry.getKey(); |
| if (clusterList.size() != 0 && !clusterList.contains(cluster)) { |
| continue; |
| } |
| |
| List<CoordinatorAction> actions = entry.getValue(); |
| String sourceCluster = null; |
| for (CoordinatorAction coordinatorAction : actions) { |
| if (entity.getEntityType() == EntityType.FEED) { |
| sourceCluster = getSourceCluster(cluster, coordinatorAction, entity); |
| if (sourceClusterList.size() != 0 && !sourceClusterList.contains(sourceCluster)) { |
| continue; |
| } |
| } |
| instanceCount++; |
| String nominalTimeStr = SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime()); |
| List<InstancesResult.Instance> instanceList = new ArrayList<>(); |
| InstancesResult.Instance instance = |
| new InstancesResult.Instance(cluster, nominalTimeStr, null); |
| instance.sourceCluster = sourceCluster; |
| if (action.equals(JobAction.STATUS) && Boolean.TRUE.equals(allAttempts)) { |
| try { |
| performAction(cluster, action, coordinatorAction, props, instance, isForced); |
| instanceList = getAllInstances(cluster, coordinatorAction, nominalTimeStr); |
| // Happens when the action is in READY/WAITING, when no workflow is kicked off yet. |
| if (instanceList.isEmpty() || StringUtils.isBlank(coordinatorAction.getExternalId())) { |
| instanceList.add(instance); |
| } |
| } catch (FalconException e) { |
| LOG.warn("Unable to perform action {} on cluster", action, e); |
| instance.status = WorkflowStatus.ERROR; |
| overallStatus = APIResult.Status.PARTIAL; |
| } |
| for (InstancesResult.Instance instanceResult : instanceList) { |
| instanceResult.details = coordinatorAction.getMissingDependencies(); |
| instanceResult.sourceCluster = sourceCluster; |
| instances.add(instanceResult); |
| } |
| } else { |
| try { |
| performAction(cluster, action, coordinatorAction, props, instance, isForced); |
| } catch (FalconException e) { |
| LOG.warn("Unable to perform action {} on cluster", action, e); |
| instance.status = WorkflowStatus.ERROR; |
| overallStatus = APIResult.Status.PARTIAL; |
| } |
| instance.details = coordinatorAction.getMissingDependencies(); |
| instances.add(instance); |
| } |
| } |
| } |
| if (instanceCount < 2 && overallStatus == APIResult.Status.PARTIAL) { |
| overallStatus = APIResult.Status.FAILED; |
| } |
| InstancesResult instancesResult = new InstancesResult(overallStatus, action.name()); |
| instancesResult.setInstances(instances.toArray(new Instance[instances.size()])); |
| return instancesResult; |
| } |
| |
| //RESUME CHECKSTYLE CHECK ParameterNumberCheck |
| |
| private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, |
| List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException { |
| return doJobAction(action, entity, start, end, props, lifeCycles, allAttempts, false); |
| } |
| |
| private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, |
| Date end, Properties props, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| |
| Map<String, List<BundleJob>> bundlesMap = findBundles(entity); |
| List<InstanceSummary> instances = new ArrayList<InstanceSummary>(); |
| List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); |
| |
| for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) { |
| Map<String, Long> instancesSummary = new HashMap<String, Long>(); |
| String cluster = entry.getKey(); |
| if (clusterList.size() != 0 && !clusterList.contains(cluster)) { |
| continue; |
| } |
| |
| List<BundleJob> bundles = entry.getValue(); |
| OozieClient client = OozieClientFactory.get(cluster); |
| List<CoordinatorJob> applicableCoords = getApplicableCoords(client, start, end, |
| bundles, lifeCycles); |
| long unscheduledInstances = 0; |
| |
| for (int i = 0; i < applicableCoords.size(); i++) { |
| boolean isLastCoord = false; |
| CoordinatorJob coord = applicableCoords.get(i); |
| Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit()); |
| TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone()); |
| Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start); |
| Date iterEnd = (coord.getLastActionTime() != null && coord.getLastActionTime().before(end) |
| ? coord.getLastActionTime() : end); |
| |
| if (i == 0) { |
| isLastCoord = true; |
| } |
| |
| int startActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterStart); |
| int lastMaterializedActionNumber = |
| EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd); |
| int endActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, end); |
| |
| if (lastMaterializedActionNumber < startActionNumber) { |
| continue; |
| } |
| |
| if (isLastCoord && endActionNumber != lastMaterializedActionNumber) { |
| unscheduledInstances = endActionNumber - lastMaterializedActionNumber; |
| } |
| |
| CoordinatorJob coordJob; |
| try { |
| coordJob = client.getCoordJobInfo(coord.getId(), null, startActionNumber, |
| (lastMaterializedActionNumber - startActionNumber)); |
| } catch (OozieClientException e) { |
| LOG.debug("Unable to get details for coordinator {}", coord.getId(), e); |
| throw new FalconException(e); |
| } |
| |
| if (coordJob != null) { |
| updateInstanceSummary(coordJob, instancesSummary); |
| } |
| } |
| |
| if (unscheduledInstances > 0) { |
| instancesSummary.put("UNSCHEDULED", unscheduledInstances); |
| } |
| |
| InstanceSummary summary = new InstanceSummary(cluster, instancesSummary); |
| instances.add(summary); |
| } |
| |
| InstancesSummaryResult instancesSummaryResult = |
| new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name()); |
| instancesSummaryResult.setInstancesSummary(instances.toArray(new InstanceSummary[instances.size()])); |
| return instancesSummaryResult; |
| } |
| |
| private void populateInstanceActions(String cluster, WorkflowJob wfJob, Instance instance) |
| throws FalconException { |
| |
| List<InstancesResult.InstanceAction> instanceActions = new ArrayList<InstancesResult.InstanceAction>(); |
| |
| List<WorkflowAction> wfActions = wfJob.getActions(); |
| |
| // We wanna capture job urls for all user-actions & non succeeded actions of the main workflow |
| for (WorkflowAction action : wfActions) { |
| if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId())) { |
| // if the action is sub-workflow, get job urls of all actions within the sub-workflow |
| List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster, |
| action.getExternalId()).getActions(); |
| for (WorkflowAction subWfAction : subWorkFlowActions) { |
| if (!subWfAction.getType().startsWith(":")) { |
| InstancesResult.InstanceAction instanceAction = |
| new InstancesResult.InstanceAction(subWfAction.getName(), |
| subWfAction.getExternalStatus(), subWfAction.getConsoleUrl()); |
| instanceActions.add(instanceAction); |
| } |
| } |
| } else if (!action.getType().startsWith(":")) { |
| // if the action is a transition node it starts with :, we don't need their statuses |
| if (PARENT_WF_ACTION_NAMES.contains(action.getName()) |
| && !Status.SUCCEEDED.toString().equals(action.getExternalStatus())) { |
| // falcon actions in the main workflow are defined in the list |
| // get job urls for all non succeeded actions of the main workflow |
| InstancesResult.InstanceAction instanceAction = |
| new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), |
| action.getConsoleUrl()); |
| instanceActions.add(instanceAction); |
| } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName()) |
| && !StringUtils.equals(action.getExternalId(), "-")) { |
| // if user-action is pig/hive there is no sub-workflow, we wanna capture their urls as well |
| InstancesResult.InstanceAction instanceAction = |
| new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), |
| action.getConsoleUrl()); |
| instanceActions.add(instanceAction); |
| } |
| } |
| } |
| instance.actions = instanceActions.toArray(new InstancesResult.InstanceAction[instanceActions.size()]); |
| } |
| |
| private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob jobInfo) { |
| Configuration conf = new Configuration(false); |
| conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes())); |
| InstancesResult.KeyValuePair[] wfParams = new InstancesResult.KeyValuePair[conf.size()]; |
| int i = 0; |
| for (Map.Entry<String, String> entry : conf) { |
| wfParams[i++] = new InstancesResult.KeyValuePair(entry.getKey(), entry.getValue()); |
| } |
| return wfParams; |
| } |
| |
| private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary) { |
| List<CoordinatorAction> actions = coordJob.getActions(); |
| |
| for (CoordinatorAction coordAction : actions) { |
| if (instancesSummary.containsKey(coordAction.getStatus().name())) { |
| instancesSummary.put(coordAction.getStatus().name(), |
| instancesSummary.get(coordAction.getStatus().name()) + 1L); |
| } else { |
| instancesSummary.put(coordAction.getStatus().name(), 1L); |
| } |
| } |
| } |
| |
| private List<InstancesResult.Instance> getAllInstances(String cluster, CoordinatorAction coordinatorAction, |
| String nominalTimeStr) throws FalconException { |
| List<InstancesResult.Instance> instanceList = new ArrayList<>(); |
| if (StringUtils.isNotBlank(coordinatorAction.getId())) { |
| List<WorkflowJob> workflowJobList = getWfsForCoordAction(cluster, coordinatorAction.getId()); |
| if (workflowJobList != null && workflowJobList.size()>0) { |
| for (WorkflowJob workflowJob : workflowJobList) { |
| InstancesResult.Instance newInstance = new InstancesResult.Instance(cluster, nominalTimeStr, null); |
| WorkflowJob wfJob = getWorkflowInfo(cluster, workflowJob.getId()); |
| if (wfJob!=null) { |
| newInstance.startTime = wfJob.getStartTime(); |
| newInstance.endTime = wfJob.getEndTime(); |
| newInstance.logFile = wfJob.getConsoleUrl(); |
| populateInstanceActions(cluster, wfJob, newInstance); |
| newInstance.status = WorkflowStatus.valueOf(mapActionStatus(wfJob.getStatus().name())); |
| instanceList.add(newInstance); |
| } |
| } |
| } |
| } |
| return instanceList; |
| } |
| |
| private void performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction, |
| Properties props, InstancesResult.Instance instance, boolean isForced) throws FalconException { |
| WorkflowJob jobInfo = null; |
| String status = coordinatorAction.getStatus().name(); |
| if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { |
| jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId()); |
| status = jobInfo.getStatus().name(); |
| instance.startTime = jobInfo.getStartTime(); |
| instance.endTime = jobInfo.getEndTime(); |
| instance.logFile = jobInfo.getConsoleUrl(); |
| instance.runId = jobInfo.getRun(); |
| } |
| |
| switch (action) { |
| case KILL: |
| if (jobInfo == null) { |
| StringBuilder scope = new StringBuilder(); |
| scope.append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())).append("::") |
| .append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())); |
| kill(cluster, coordinatorAction.getJobId(), "date", scope.toString()); |
| status = Status.KILLED.name(); |
| break; |
| } |
| if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) { |
| break; |
| } |
| |
| |
| kill(cluster, jobInfo.getId()); |
| status = Status.KILLED.name(); |
| break; |
| |
| case IGNORE: |
| if (!status.equals(Status.IGNORED.name())) { |
| ignore(cluster, coordinatorAction.getJobId(), coordinatorAction.getActionNumber()); |
| } |
| status = mapActionStatus(Status.IGNORED.name()); |
| break; |
| |
| case SUSPEND: |
| if (jobInfo == null || !WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) { |
| break; |
| } |
| |
| suspend(cluster, jobInfo.getId()); |
| status = Status.SUSPENDED.name(); |
| break; |
| |
| case RESUME: |
| if (jobInfo == null || !WF_RESUME_PRECOND.contains(jobInfo.getStatus())) { |
| break; |
| } |
| |
| resume(cluster, jobInfo.getId()); |
| status = Status.RUNNING.name(); |
| break; |
| |
| case RERUN: |
| if (COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) { |
| status = reRunCoordAction(cluster, coordinatorAction, props, isForced).name(); |
| } |
| break; |
| |
| case STATUS: |
| if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { |
| populateInstanceActions(cluster, jobInfo, instance); |
| } |
| break; |
| |
| case PARAMS: |
| if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { |
| instance.wfParams = getWFParams(jobInfo); |
| } |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unhandled action " + action); |
| } |
| |
| // status can be CoordinatorAction.Status, WorkflowJob.Status or Job.Status |
| try { |
| instance.status = WorkflowStatus.valueOf(mapActionStatus(status)); |
| } catch (IllegalArgumentException e) { |
| LOG.error("Job status not defined in Instance status: {}", status); |
| instance.status = WorkflowStatus.UNDEFINED; |
| } |
| } |
| |
| public CoordinatorAction.Status reRunCoordAction(String cluster, CoordinatorAction coordinatorAction, |
| Properties props, boolean isForced) throws FalconException { |
| try { |
| OozieClient client = OozieClientFactory.get(cluster); |
| if (props == null) { |
| props = new Properties(); |
| } |
| // In case if both props exists, throw an exception. |
| // This case will occur when user runs workflow with skip-nodes property and |
| // try to do force rerun or rerun with fail-nodes property. |
| if (props.containsKey(OozieClient.RERUN_FAIL_NODES) |
| && props.containsKey(OozieClient.RERUN_SKIP_NODES)) { |
| String msg = "Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES |
| + " are present in workflow params for " + coordinatorAction.getExternalId(); |
| LOG.error(msg); |
| throw new FalconException(msg); |
| } |
| |
| //if user has set any of these oozie rerun properties then force rerun flag is ignored |
| if (props.containsKey(OozieClient.RERUN_FAIL_NODES)) { |
| isForced = false; |
| } |
| Properties jobprops; |
| // Get conf when workflow is launched. |
| if (coordinatorAction.getExternalId() != null) { |
| WorkflowJob jobInfo = client.getJobInfo(coordinatorAction.getExternalId()); |
| |
| jobprops = OozieUtils.toProperties(jobInfo.getConf()); |
| // Clear the rerun properties from existing configuration |
| jobprops.remove(OozieClient.RERUN_FAIL_NODES); |
| jobprops.remove(OozieClient.RERUN_SKIP_NODES); |
| jobprops.putAll(props); |
| jobprops.remove(OozieClient.BUNDLE_APP_PATH); |
| } else { |
| jobprops = props; |
| } |
| |
| client.reRunCoord(coordinatorAction.getJobId(), RestConstants.JOB_COORD_SCOPE_ACTION, |
| Integer.toString(coordinatorAction.getActionNumber()), true, true, !isForced, jobprops); |
| LOG.info("Rerun job {} on cluster {}", coordinatorAction.getId(), cluster); |
| return assertCoordActionStatus(cluster, coordinatorAction.getId(), |
| org.apache.oozie.client.CoordinatorAction.Status.RUNNING, |
| org.apache.oozie.client.CoordinatorAction.Status.WAITING, |
| org.apache.oozie.client.CoordinatorAction.Status.READY); |
| } catch (Exception e) { |
| LOG.error("Unable to rerun workflows", e); |
| throw new FalconException(e); |
| } |
| } |
| |
| private CoordinatorAction.Status assertCoordActionStatus(String cluster, String coordActionId, |
| org.apache.oozie.client.CoordinatorAction.Status... statuses) throws FalconException, OozieClientException { |
| OozieClient client = OozieClientFactory.get(cluster); |
| CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId); |
| for (int counter = 0; counter < 3; counter++) { |
| for (org.apache.oozie.client.CoordinatorAction.Status status : statuses) { |
| if (status.equals(actualStatus.getStatus())) { |
| return status; |
| } |
| } |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| actualStatus = client.getCoordActionInfo(coordActionId); |
| } |
| throw new FalconException("For Job" + coordActionId + ", actual statuses: " + actualStatus + ", " |
| + "expected statuses: " + Arrays.toString(statuses)); |
| } |
| |
| private String getSourceCluster(String cluster, CoordinatorAction coordinatorAction, Entity entity) |
| throws FalconException { |
| try { |
| CoordinatorJob coordJob = OozieClientFactory.get(cluster).getCoordJobInfo(coordinatorAction.getJobId()); |
| return EntityUtil.getWorkflowNameSuffix(coordJob.getAppName(), entity); |
| } catch (OozieClientException e) { |
| throw new FalconException("Unable to get oozie job id:" + e); |
| } |
| } |
| |
| private List<String> getIncludedClusters(Properties props, String clustersType) { |
| String clusters = props == null ? "" : props.getProperty(clustersType, ""); |
| List<String> clusterList = new ArrayList<String>(); |
| for (String cluster : clusters.split(",")) { |
| if (StringUtils.isNotEmpty(cluster)) { |
| clusterList.add(cluster.trim()); |
| } |
| } |
| return clusterList; |
| } |
| |
| private String mapActionStatus(String status) { |
| if (CoordinatorAction.Status.READY.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.READY.name(); |
| } else if (CoordinatorAction.Status.WAITING.toString().equals(status) |
| || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.WAITING.name(); |
| } else if (CoordinatorAction.Status.KILLED.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.KILLED.name(); |
| } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.KILLED_OR_IGNORED.name(); |
| } else if (CoordinatorAction.Status.TIMEDOUT.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.TIMEDOUT.name(); |
| } else if (WorkflowJob.Status.PREP.toString().equals(status)) { |
| return InstancesResult.WorkflowStatus.RUNNING.name(); |
| } else { |
| return status; |
| } |
| } |
| |
| @SuppressWarnings("MagicConstant") |
| protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| Map<String, List<BundleJob>> bundlesMap = findBundles(entity); |
| Map<String, List<CoordinatorAction>> actionsMap = new HashMap<String, List<CoordinatorAction>>(); |
| |
| for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) { |
| String cluster = entry.getKey(); |
| List<BundleJob> bundles = entry.getValue(); |
| OozieClient client = OozieClientFactory.get(cluster); |
| List<CoordinatorJob> applicableCoords = |
| getApplicableCoords(client, start, end, bundles, lifeCycles); |
| List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>(); |
| int maxRetentionInstancesCount = |
| Integer.parseInt(RuntimeProperties.get().getProperty("retention.instances.displaycount", "2")); |
| int retentionInstancesCount = 0; |
| |
| for (CoordinatorJob coord : applicableCoords) { |
| Date nextMaterializedTime = coord.getNextMaterializedTime(); |
| if (nextMaterializedTime == null) { |
| continue; |
| } |
| |
| boolean retentionCoord = isRetentionCoord(coord); |
| Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit()); |
| TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone()); |
| |
| Date iterEnd = ((nextMaterializedTime.before(end) || retentionCoord) ? nextMaterializedTime : end); |
| Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone())); |
| endCal.setTime(EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, iterEnd)); |
| endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.parseInt((coord.getFrequency())))); |
| |
| while (start.compareTo(endCal.getTime()) <= 0) { |
| if (retentionCoord) { |
| if (retentionInstancesCount >= maxRetentionInstancesCount) { |
| break; |
| } |
| retentionInstancesCount++; |
| } |
| |
| int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, endCal.getTime()); |
| String actionId = coord.getId() + "@" + sequence; |
| addCoordAction(client, actions, actionId); |
| endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.parseInt((coord.getFrequency())))); |
| } |
| } |
| actionsMap.put(cluster, actions); |
| } |
| return actionsMap; |
| } |
| |
| private boolean isRetentionCoord(CoordinatorJob coord){ |
| return coord.getAppName().contains(LifeCycle.EVICTION.getTag().name()); |
| } |
| |
| private void addCoordAction(OozieClient client, List<CoordinatorAction> actions, String actionId) { |
| CoordinatorAction coordActionInfo = null; |
| try { |
| coordActionInfo = client.getCoordActionInfo(actionId); |
| } catch (OozieClientException e) { |
| LOG.debug("Unable to get action for " + actionId + " " + e.getMessage()); |
| } |
| if (coordActionInfo != null) { |
| actions.add(coordActionInfo); |
| } |
| } |
| |
| private Frequency createFrequency(String frequency, Timeunit timeUnit) { |
| return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name()).getFalconTimeUnit()); |
| } |
| |
| /** |
| * TimeUnit as understood by Oozie. |
| */ |
| private enum OozieTimeUnit { |
| MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(null), MONTH(TimeUnit.months), |
| END_OF_DAY(null), END_OF_MONTH(null), NONE(null); |
| |
| private TimeUnit falconTimeUnit; |
| |
| private OozieTimeUnit(TimeUnit falconTimeUnit) { |
| this.falconTimeUnit = falconTimeUnit; |
| } |
| |
| public TimeUnit getFalconTimeUnit() { |
| if (falconTimeUnit == null) { |
| throw new IllegalStateException("Invalid coord frequency: " + name()); |
| } |
| return falconTimeUnit; |
| } |
| } |
| |
| private List<CoordinatorJob> getApplicableCoords(OozieClient client, Date start, Date end, |
| List<BundleJob> bundles, |
| List<LifeCycle> lifeCycles) throws FalconException { |
| List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>(); |
| try { |
| for (BundleJob bundle : bundles) { |
| List<CoordinatorJob> coords = client.getBundleJobInfo(bundle.getId()).getCoordinators(); |
| for (CoordinatorJob coord : coords) { |
| // ignore coords in PREP state, not yet running and retention coord |
| |
| if (coord.getStatus() == Status.PREP |
| || !isCoordApplicable(coord.getAppName(), lifeCycles)) { |
| continue; |
| } |
| |
| // if end time is before coord-start time or start time is |
| // after coord-end time ignore. |
| if (!(end.compareTo(coord.getStartTime()) <= 0 || start.compareTo(coord.getEndTime()) >= 0)) { |
| applicableCoords.add(coord); |
| } |
| } |
| } |
| |
| sortDescByStartTime(applicableCoords); |
| return applicableCoords; |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private boolean isCoordApplicable(String appName, List<LifeCycle> lifeCycles) { |
| if (lifeCycles != null && !lifeCycles.isEmpty()) { |
| for (LifeCycle lifeCycle : lifeCycles) { |
| if (appName.contains(lifeCycle.getTag().name())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| protected void sortDescByStartTime(List<CoordinatorJob> consideredCoords) { |
| Collections.sort(consideredCoords, new Comparator<CoordinatorJob>() { |
| @Override |
| public int compare(CoordinatorJob left, CoordinatorJob right) { |
| Date leftStart = left.getStartTime(); |
| Date rightStart = right.getStartTime(); |
| return rightStart.compareTo(leftStart); |
| } |
| }); |
| } |
| |
| |
| private boolean canUpdateBundle(Entity oldEntity, Entity newEntity) throws FalconException { |
| return EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS); |
| } |
| |
| @Override |
| public String update(Entity oldEntity, Entity newEntity, |
| String cluster, Boolean skipDryRun) throws FalconException { |
| BundleJob bundle = findLatestBundle(oldEntity, cluster); |
| |
| boolean entityUpdated = false; |
| if (bundle != MISSING) { |
| entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster, new Path(bundle.getAppPath())); |
| } |
| |
| Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); |
| StringBuilder result = new StringBuilder(); |
| //entity is scheduled before and entity is updated |
| if (bundle != MISSING && entityUpdated) { |
| LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString()); |
| Date newEndTime = EntityUtil.getEndTime(newEntity, cluster); |
| if (newEndTime.before(DateUtil.now())) { |
| throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime) |
| + " is before current time. Entity can't be updated. Use remove and add"); |
| } |
| |
| LOG.debug("Updating for cluster: {}, bundle: {}", cluster, bundle.getId()); |
| |
| if (canUpdateBundle(oldEntity, newEntity)) { |
| // only concurrency and endtime are changed. So, change coords |
| LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId()); |
| updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity), |
| EntityUtil.getEndTime(newEntity, cluster), newEntity); |
| return getUpdateString(newEntity, new Date(), bundle, bundle); |
| } |
| |
| LOG.debug("Going to update! : {} for cluster {}, bundle: {}", |
| newEntity.toShortString(), cluster, bundle.getId()); |
| result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle, |
| bundle.getUser(), skipDryRun)).append("\n"); |
| LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, |
| bundle.getId()); |
| } |
| |
| result.append(updateDependents(clusterEntity, oldEntity, newEntity, skipDryRun)); |
| return result.toString(); |
| } |
| |
| @Override |
| public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException { |
| BundleJob bundle = findLatestBundle(entity, cluster); |
| Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); |
| StringBuilder result = new StringBuilder(); |
| if (bundle != MISSING) { |
| LOG.info("Updating entity {} for cluster: {}, bundle: {}", |
| entity.toShortString(), cluster, bundle.getId()); |
| String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser(), skipDryRun); |
| result.append(output).append("\n"); |
| LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(), cluster, |
| bundle.getId()); |
| } |
| return result.toString(); |
| } |
| |
| private String getUpdateString(Entity entity, Date date, BundleJob oldBundle, BundleJob newBundle) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date)); |
| if (StringUtils.isNotEmpty(oldBundle.getId())) { |
| builder.append(". Old bundle id: " + oldBundle.getId()); |
| } |
| builder.append(". Old coordinator id: "); |
| List<String> coords = new ArrayList<String>(); |
| for (CoordinatorJob coord : oldBundle.getCoordinators()) { |
| coords.add(coord.getId()); |
| } |
| builder.append(StringUtils.join(coords, ',')); |
| |
| if (newBundle != null) { |
| coords.clear(); |
| for (CoordinatorJob coord : newBundle.getCoordinators()) { |
| coords.add(coord.getId()); |
| } |
| if (coords.isEmpty()) { |
| builder.append(". New bundle id: "); |
| builder.append(newBundle.getId()); |
| } else { |
| builder.append(". New coordinator id: "); |
| builder.append(StringUtils.join(coords, ',')); |
| } |
| } |
| return builder.toString(); |
| } |
| |
| private String updateDependents(Cluster cluster, Entity oldEntity, |
| Entity newEntity, Boolean skipDryRun) throws FalconException { |
| //Update affected entities |
| Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity); |
| StringBuilder result = new StringBuilder(); |
| for (Entity affectedEntity : affectedEntities) { |
| if (affectedEntity.getEntityType() != EntityType.PROCESS) { |
| continue; |
| } |
| |
| LOG.info("Dependent entity {} need to be updated", affectedEntity.toShortString()); |
| if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity, cluster.getName())) { |
| continue; |
| } |
| |
| BundleJob affectedProcBundle = findLatestBundle(affectedEntity, cluster.getName()); |
| if (affectedProcBundle == MISSING) { |
| LOG.info("Dependent entity {} is not scheduled", affectedEntity.getName()); |
| continue; |
| } |
| |
| LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId()); |
| |
| result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, |
| affectedProcBundle.getUser(), skipDryRun)).append("\n"); |
| LOG.info("Entity update complete: {} for cluster {}, bundle: {}", |
| affectedEntity.toShortString(), cluster, affectedProcBundle.getId()); |
| } |
| LOG.info("All dependent entities updated for: {}", oldEntity.toShortString()); |
| return result.toString(); |
| } |
| |
| @SuppressWarnings("MagicConstant") |
| private Date getCoordLastActionTime(CoordinatorJob coord) { |
| if (coord.getNextMaterializedTime() != null) { |
| Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone())); |
| cal.setTime(coord.getLastActionTime()); |
| Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit()); |
| cal.add(freq.getTimeUnit().getCalendarUnit(), -freq.getFrequencyAsInt()); |
| return cal.getTime(); |
| } |
| return null; |
| } |
| |
| private void updateCoords(String cluster, BundleJob bundle, |
| int concurrency, Date endTime, Entity entity) throws FalconException { |
| if (endTime.compareTo(DateUtil.now()) <= 0) { |
| throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past"); |
| } |
| |
| if (bundle.getCoordinators() == null || bundle.getCoordinators().isEmpty()) { |
| throw new FalconException("Invalid state. Oozie coords are still not created. Try again later"); |
| } |
| |
| // change coords |
| for (CoordinatorJob coord : bundle.getCoordinators()) { |
| |
| Frequency delay = null; |
| //get Delay to calculate coordinator end time in case of feed replication with delay. |
| if (entity.getEntityType().equals(EntityType.FEED)) { |
| delay = getDelay((Feed) entity, coord); |
| } |
| |
| //calculate next start time based on delay. |
| endTime = (delay == null) ? endTime |
| : EntityUtil.getNextInstanceTimeWithDelay(endTime, delay, EntityUtil.getTimeZone(entity)); |
| LOG.debug("Updating endtime of coord {} to {} on cluster {}", |
| coord.getId(), SchemaHelper.formatDateUTC(endTime), cluster); |
| |
| Date lastActionTime = getCoordLastActionTime(coord); |
| if (lastActionTime == null) { // nothing is materialized |
| LOG.info("Nothing is materialized for this coord: {}", coord.getId()); |
| if (endTime.compareTo(coord.getStartTime()) <= 0) { |
| LOG.info("Setting end time to START TIME {}", SchemaHelper.formatDateUTC(coord.getStartTime())); |
| change(cluster, coord.getId(), concurrency, coord.getStartTime(), null); |
| } else { |
| LOG.info("Setting end time to START TIME {}", SchemaHelper.formatDateUTC(endTime)); |
| change(cluster, coord.getId(), concurrency, endTime, null); |
| } |
| } else { |
| LOG.info("Actions have materialized for this coord: {}, last action {}", |
| coord.getId(), SchemaHelper.formatDateUTC(lastActionTime)); |
| if (!endTime.after(lastActionTime)) { |
| Date pauseTime = DateUtil.offsetTime(endTime, -1*60); |
| // set pause time which deletes future actions |
| LOG.info("Setting pause time on coord: {} to {}", |
| coord.getId(), SchemaHelper.formatDateUTC(pauseTime)); |
| change(cluster, coord.getId(), concurrency, null, SchemaHelper.formatDateUTC(pauseTime)); |
| } |
| change(cluster, coord.getId(), concurrency, endTime, null); |
| } |
| } |
| } |
| |
| private Frequency getDelay(Feed entity, CoordinatorJob coord) { |
| Feed feed = entity; |
| for (org.apache.falcon.entity.v0.feed.Cluster entityCluster : feed.getClusters().getClusters()){ |
| if (coord.getAppName().contains(entityCluster.getName()) && coord.getAppName().contains("REPLICATION") |
| && entityCluster.getDelay() != null){ |
| return entityCluster.getDelay(); |
| } |
| } |
| return null; |
| } |
| |
| private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle, |
| String user, Boolean skipDryRun) throws FalconException { |
| String currentUser = CurrentUser.getUser(); |
| switchUser(user); |
| |
| String clusterName = cluster.getName(); |
| |
| Date effectiveTime = getEffectiveTime(cluster, newEntity); |
| LOG.info("Effective time " + effectiveTime); |
| try { |
| //Validate that new entity can be scheduled |
| dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun); |
| |
| boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus()); |
| |
| //Set end times for old coords |
| updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity); |
| //schedule new entity |
| String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime); |
| BundleJob newBundle = null; |
| if (newJobId != null) { |
| newBundle = getBundleInfo(clusterName, newJobId); |
| } |
| |
| //Sometimes updateCoords() resumes the suspended coords. So, if already suspended, resume now |
| //Also suspend new bundle |
| if (suspended) { |
| doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName()); |
| } |
| return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle); |
| } finally { |
| // Switch back to current user in case of exception. |
| switchUser(currentUser); |
| } |
| } |
| |
| private Date getEffectiveTime(Cluster cluster, Entity newEntity) { |
| //pick effective time as now() + 3 min to handle any time diff between falcon and oozie |
| //oozie rejects changes with endtime < now |
| Date effectiveTime = DateUtil.offsetTime(DateUtil.now(), 3*60); |
| |
| //pick start time for new bundle which is after effectiveTime |
| return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime); |
| } |
| |
| private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime, |
| Boolean skipDryRun) throws FalconException { |
| Entity clone = entity.copy(); |
| EntityUtil.setStartDate(clone, cluster.getName(), startTime); |
| try { |
| dryRun(clone, cluster.getName(), skipDryRun); |
| } catch (FalconException e) { |
| throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e); |
| } |
| } |
| |
| private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate) throws FalconException { |
| Entity clone = entity.copy(); |
| EntityUtil.setStartDate(clone, cluster.getName(), startDate); |
| Path buildPath = EntityUtil.getNewStagingPath(cluster, clone); |
| OozieEntityBuilder builder = OozieEntityBuilder.get(clone); |
| Properties properties = builder.build(cluster, buildPath); |
| if (properties != null) { |
| LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(), |
| properties); |
| return scheduleEntity(cluster.getName(), properties, entity); |
| } else { |
| LOG.info("No new workflow to be scheduled for this " + entity.toShortString()); |
| return null; |
| } |
| } |
| |
| private void switchUser(String user) { |
| if (!CurrentUser.getUser().equals(user)) { |
| CurrentUser.authenticate(user); |
| } |
| } |
| |
| private BundleJob getBundleInfo(String cluster, String bundleId) throws FalconException { |
| try { |
| return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws |
| FalconException { |
| StringBuilder filter = new StringBuilder(); |
| filter.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING.name()); |
| for (String wfName : wfNames) { |
| filter.append(';').append(OozieClient.FILTER_NAME).append('=').append(wfName); |
| } |
| |
| try { |
| return OozieClientFactory.get(cluster).getJobsInfo(filter.toString(), 1, 1000); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public String reRun(String cluster, String id, Properties props, boolean isForced) throws FalconException { |
| OozieClient client = OozieClientFactory.get(cluster); |
| String actionId = id; |
| try { |
| // If a workflow job is supplied, get its parent coord action |
| if (id.endsWith("-W")) { |
| actionId = client.getJobInfo(id).getParentId(); |
| } |
| if (StringUtils.isBlank(actionId) || !actionId.contains("-C@")) { |
| throw new FalconException("coord action id supplied for rerun, " + actionId + ", is not valid."); |
| } |
| return reRunCoordAction(cluster, client.getCoordActionInfo(actionId), props, isForced).name(); |
| } catch (Exception e) { |
| LOG.error("Unable to rerun action " + actionId, e); |
| throw new FalconException(e); |
| } |
| } |
| |
| |
| private String assertStatus(String cluster, String jobId, Status... statuses) throws FalconException { |
| String actualStatus = null; |
| int retryCount; |
| String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30"); |
| try { |
| retryCount = Integer.parseInt(retry); |
| } catch (NumberFormatException nfe) { |
| throw new FalconException("Invalid value provided for runtime property \"" |
| + WORKFLOW_STATUS_RETRY_COUNT + "\". Please provide an integer value."); |
| } |
| for (int counter = 0; counter < retryCount; counter++) { |
| actualStatus = getWorkflowStatus(cluster, jobId); |
| if (!statusEquals(actualStatus, statuses)) { |
| try { |
| Thread.sleep(WORKFLOW_STATUS_RETRY_DELAY_MS); |
| } catch (InterruptedException ignore) { |
| //ignore |
| } |
| } else { |
| return actualStatus; |
| } |
| } |
| throw new FalconException("For Job" + jobId + ", actual statuses: " + actualStatus + ", expected statuses: " |
| + Arrays.toString(statuses)); |
| } |
| |
| private boolean statusEquals(String left, Status... right) { |
| for (Status rightElement : right) { |
| if (left.equals(rightElement.name())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public String getWorkflowStatus(String cluster, String jobId) throws FalconException { |
| |
| OozieClient client = OozieClientFactory.get(cluster); |
| try { |
| if (jobId.endsWith("-W")) { |
| WorkflowJob jobInfo = client.getJobInfo(jobId); |
| return jobInfo.getStatus().name(); |
| } else if (jobId.endsWith("-C")) { |
| CoordinatorJob coord = client.getCoordJobInfo(jobId); |
| return coord.getStatus().name(); |
| } else if (jobId.endsWith("-B")) { |
| BundleJob bundle = client.getBundleJobInfo(jobId); |
| return bundle.getStatus().name(); |
| } else if (jobId.contains("-C@")) { |
| return client.getCoordActionInfo(jobId).getStatus().name(); |
| } |
| throw new IllegalArgumentException("Unhandled jobs id: " + jobId); |
| } catch (Exception e) { |
| LOG.error("Unable to get status of workflows", e); |
| throw new FalconException(e); |
| } |
| } |
| |
| private String scheduleEntity(String cluster, Properties props, Entity entity) throws FalconException { |
| for (WorkflowEngineActionListener listener : listeners) { |
| listener.beforeSchedule(entity, cluster); |
| } |
| String jobId = run(cluster, props); |
| for (WorkflowEngineActionListener listener : listeners) { |
| listener.afterSchedule(entity, cluster); |
| } |
| return jobId; |
| } |
| |
| private String run(String cluster, Properties props) throws FalconException { |
| try { |
| LOG.info("Scheduling on cluster {} with properties {}", cluster, props); |
| props.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); |
| String jobId = OozieClientFactory.get(cluster).run(props); |
| LOG.info("Submitted {} on cluster {}", jobId, cluster); |
| return jobId; |
| } catch (OozieClientException e) { |
| LOG.error("Unable to schedule workflows", e); |
| throw new FalconException("Unable to schedule workflows", e); |
| } |
| } |
| |
| private void suspend(String cluster, String jobId) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).suspend(jobId); |
| assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED, Status.FAILED, |
| Status.KILLED); |
| LOG.info("Suspended job {} on cluster {}", jobId, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void resume(String cluster, String jobId) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).resume(jobId); |
| assertStatus(cluster, jobId, Status.PREP, Status.RUNNING, Status.SUCCEEDED, Status.FAILED, Status.KILLED); |
| LOG.info("Resumed job {} on cluster {}", jobId, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void ignore(String cluster, String jobId, int instanceNumber) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).ignore(jobId, String.valueOf(instanceNumber)); |
| assertStatus(cluster, jobId + "@" + instanceNumber, |
| Status.IGNORED, Status.FAILED, Status.SUCCEEDED, Status.DONEWITHERROR); |
| LOG.info("Ignored job {} on cluster {}", jobId, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void kill(String cluster, String jobId, String rangeType, String scope) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).kill(jobId, rangeType, scope); |
| LOG.info("Killed job {} for instances {} on cluster {}", jobId, scope, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void kill(String cluster, String jobId) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).kill(jobId); |
| assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED, Status.FAILED); |
| LOG.info("Killed job {} on cluster {}", jobId, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void change(String cluster, String jobId, String changeValue) throws FalconException { |
| try { |
| OozieClientFactory.get(cluster).change(jobId, changeValue); |
| LOG.info("Changed bundle/coord {}: {} on cluster {}", jobId, changeValue, cluster); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| private void change(String cluster, String id, int concurrency, Date endTime, String pauseTime) |
| throws FalconException { |
| StringBuilder changeValue = new StringBuilder(); |
| changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=").append(concurrency).append(";"); |
| if (endTime != null) { |
| String endTimeStr = SchemaHelper.formatDateUTC(endTime); |
| changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=").append(endTimeStr).append(";"); |
| } |
| if (pauseTime != null) { |
| changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=").append(pauseTime); |
| } |
| |
| String changeValueStr = changeValue.toString(); |
| if (changeValue.toString().endsWith(";")) { |
| changeValueStr = changeValue.substring(0, changeValueStr.length() - 1); |
| } |
| |
| change(cluster, id, changeValueStr); |
| |
| // assert that its really changed |
| try { |
| OozieClient client = OozieClientFactory.get(cluster); |
| CoordinatorJob coord = client.getCoordJobInfo(id); |
| for (int counter = 0; counter < 3; counter++) { |
| Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime)); |
| if (coord.getConcurrency() != concurrency || (endTime != null && !coord.getEndTime().equals(endTime)) |
| || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignore) { |
| //ignore |
| } |
| } else { |
| return; |
| } |
| coord = client.getCoordJobInfo(id); |
| } |
| LOG.error("Failed to change coordinator. Current value {}, {}, {}", |
| coord.getConcurrency(), SchemaHelper.formatDateUTC(coord.getEndTime()), |
| SchemaHelper.formatDateUTC(coord.getPauseTime())); |
| throw new FalconException("Failed to change coordinator " + id + " with change value " + changeValueStr); |
| } catch (OozieClientException e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException { |
| try { |
| WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId); |
| String conf = jobInfo.getConf(); |
| return OozieUtils.toProperties(conf); |
| } catch (Exception e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException { |
| Instance[] instances = new Instance[1]; |
| Instance instance = new Instance(); |
| try { |
| WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId); |
| instance.startTime = jobInfo.getStartTime(); |
| if (jobInfo.getStatus().name().equals(Status.RUNNING.name())) { |
| instance.endTime = new Date(); |
| } else { |
| instance.endTime = jobInfo.getEndTime(); |
| } |
| instance.cluster = cluster; |
| instance.runId = jobInfo.getRun(); |
| instance.status = WorkflowStatus.valueOf(jobInfo.getStatus().name()); |
| instance.wfParams = getWFParams(jobInfo); |
| instances[0] = instance; |
| InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, |
| "Instance for workflow id:" + jobId); |
| result.setInstances(instances); |
| return result; |
| } catch (Exception e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException { |
| // In case of a kill being issued from falcon api, the state will be moved to IGNORE |
| // In case of a failure, the Oozie action has an errorCode. |
| // In case of no errorCode in any of the actions would mean its killed by user |
| try { |
| OozieClient oozieClient = OozieClientFactory.get(cluster); |
| String parentId = oozieClient.getJobInfo(jobId).getParentId(); |
| if (oozieClient.getCoordActionInfo(parentId).getStatus().equals(CoordinatorAction.Status.IGNORED)) { |
| return true; |
| } |
| // Check for error code in all the actions in main workflow |
| List<WorkflowAction> wfActions = oozieClient.getJobInfo(jobId).getActions(); |
| for (WorkflowAction subWfAction : wfActions) { |
| if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) { |
| return false; |
| } |
| } |
| // Assumption taken, there are no sub workflows in user action. |
| String subWfId = getUserWorkflowAction(wfActions); |
| List<WorkflowAction> subWfActions; |
| // Check for error code in all the user-workflow(sub-workflow)'s actions. |
| if (StringUtils.isNotBlank(subWfId)) { |
| subWfActions = oozieClient.getJobInfo(subWfId).getActions(); |
| for (WorkflowAction subWfAction : subWfActions) { |
| if (StringUtils.isNotEmpty(subWfAction.getErrorCode())) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } catch (Exception e) { |
| throw new FalconException(e); |
| } |
| } |
| |
| @Override |
| public String getName() { |
| return "oozie"; |
| } |
| |
| private String getUserWorkflowAction(List<WorkflowAction> actionsList){ |
| for (WorkflowAction wfAction : actionsList) { |
| if (StringUtils.equals(wfAction.getName(), "user-action")) { |
| return wfAction.getExternalId(); |
| } |
| } |
| return null; |
| } |
| } |