blob: 823cc5f5e9a9f83ca9c12267e4658a35707190b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.command.bundle.BundlePauseXCommand;
import org.apache.oozie.command.bundle.BundleStartXCommand;
import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
import org.apache.oozie.command.coord.CoordPauseXCommand;
import org.apache.oozie.command.coord.CoordUnpauseXCommand;
import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
/**
* PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles to
* see if they should be paused, un-paused or started.
*/
public class PauseTransitService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
private final static XLog LOG = XLog.getLog(PauseTransitService.class);
public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
/**
* PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all bundles
* to see if they should be paused, un-paused or started.
*/
@VisibleForTesting
public static class PauseTransitRunnable implements Runnable {
private JPAService jpaService = null;
private LockToken lock;
private List<XCallable<Void>> callables;
public PauseTransitRunnable() {
jpaService = Services.get().get(JPAService.class);
if (jpaService == null) {
LOG.error("Missing JPAService");
}
}
public void run() {
try {
// first check if there is some other running instance from the same service;
lock = Services.get().get(MemoryLocksService.class)
.getWriteLock(PauseTransitService.class.getName(), lockTimeout);
if (lock == null) {
LOG.info("This PauseTransitService instance will"
+ "not run since there is already an instance running");
}
else {
LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
updateBundle();
updateCoord();
if (null != callables) {
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
if (ret == false) {
XLog.getLog(getClass()).warn(
"Unable to queue the callables commands for PauseTransitService. "
+ "Most possibly command queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
callables = null;
}
}
}
catch (Exception ex) {
LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
}
finally {
// release lock;
if (lock != null) {
lock.release();
LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
}
}
}
private void updateBundle() {
Date d = new Date(); // records the start time of this service run;
List<BundleJobBean> jobList = null;
// pause bundles as needed;
try {
jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
if (jobList != null) {
for (BundleJobBean bundleJob : jobList) {
if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
queueCallable(new BundlePauseXCommand(bundleJob));
LOG.debug("Queuing BundlePauseXCommand for bundle job = " + bundleJob.getId());
}
}
}
}
catch (JPAExecutorException ex) {
LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
}
// unpause bundles as needed;
try {
jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
if (jobList != null) {
for (BundleJobBean bundleJob : jobList) {
if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
queueCallable(new BundleUnpauseXCommand(bundleJob));
LOG.debug("Queuing BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
}
}
}
}
catch (JPAExecutorException ex) {
LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
}
// start bundles as needed;
try {
jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
if (jobList != null) {
for (BundleJobBean bundleJob : jobList) {
queueCallable(new BundleStartXCommand(bundleJob.getId()));
LOG.debug("Queuing BundleStartXCommand for bundle job = " + bundleJob.getId());
}
}
}
catch (JPAExecutorException ex) {
LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
}
}
private void updateCoord() {
Date d = new Date(); // records the start time of this service run;
List<CoordinatorJobBean> jobList = null;
boolean backwardSupportForCoordStatus = ConfigUtils.isBackwardSupportForCoordStatus();
// pause coordinators as needed;
try {
jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
if (jobList != null) {
for (CoordinatorJobBean coordJob : jobList) {
// if namespace 0.1 is used and backward support is true, then ignore this coord job
if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
&& coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
continue;
}
if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
queueCallable(new CoordPauseXCommand(coordJob));
LOG.debug("Queuing CoordPauseXCommand for coordinator job = " + coordJob.getId());
}
}
}
}
catch (JPAExecutorException ex) {
LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
}
// unpause coordinators as needed;
try {
jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
if (jobList != null) {
for (CoordinatorJobBean coordJob : jobList) {
// if namespace 0.1 is used and backward support is true, then ignore this coord job
if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
&& coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
continue;
}
if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
queueCallable(new CoordUnpauseXCommand(coordJob));
LOG.debug("Queuing CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
}
}
}
}
catch (JPAExecutorException ex) {
LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
}
}
private void queueCallable(XCallable<Void> callable) {
if (callables == null) {
callables = new ArrayList<XCallable<Void>>();
}
callables.add(callable);
if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
if (ret == false) {
XLog.getLog(getClass()).warn(
"Unable to queue the callables commands for PauseTransitService. "
+ "Most possibly command queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
callables = new ArrayList<XCallable<Void>>();
}
}
}
/**
* Initializes the {@link PauseTransitService}.
*
* @param services services instance.
*/
@Override
public void init(Services services) {
Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
ConfigurationService.getInt(services.getConf(), CONF_BUNDLE_PAUSE_START_INTERVAL),
SchedulerService.Unit.SEC);
}
/**
* Destroy the StateTransit Jobs Service.
*/
@Override
public void destroy() {
}
/**
* Return the public interface for the purge jobs service.
*
* @return {@link PauseTransitService}.
*/
@Override
public Class<? extends Service> getInterface() {
return PauseTransitService.class;
}
}