blob: 99ebb6c38e053ab10adbb331526bb75fa1dc2389 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.FaultInjection;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StoreService;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.store.Store;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.store.WorkflowStore;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.MemoryLocks.LockToken;
/**
* Base class for all synchronous and asynchronous DagEngine commands.
*/
public abstract class Command<T, S extends Store> implements XCallable<T> {
/**
* The instrumentation group used for Commands.
*/
private static final String INSTRUMENTATION_GROUP = "commands";
private final long createdTime;
/**
* The instrumentation group used for Jobs.
*/
private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
private static final long LOCK_TIMEOUT = 1000;
protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
protected Instrumentation instrumentation;
private List<XCallable<Void>> callables;
private List<XCallable<Void>> delayedCallables;
private long delay = 0;
private List<XCallable<Void>> exceptionCallables;
private String name;
private String type;
private String key;
private int priority;
private int logMask;
private boolean withStore;
protected boolean dryrun = false;
private ArrayList<LockToken> locks = null;
/**
* This variable is package private for testing purposes only.
*/
XLog.Info logInfo;
/**
* Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
* captured for execution.
*
* @param name command name.
* @param type command type.
* @param priority priority of the command, used when queuing for asynchronous execution.
* @param logMask log mask for the command logging calls.
*/
public Command(String name, String type, int priority, int logMask) {
this(name, type, priority, logMask, true);
}
/**
* Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
*
* @param name command name.
* @param type command type.
* @param priority priority of the command, used when queuing for asynchronous execution.
* @param logMask log mask for the command logging calls.
* @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
*/
public Command(String name, String type, int priority, int logMask, boolean withStore) {
this.name = ParamChecker.notEmpty(name, "name");
this.type = ParamChecker.notEmpty(type, "type");
this.key = name + "_" + UUID.randomUUID();
this.priority = priority;
this.withStore = withStore;
this.logMask = logMask;
instrumentation = Services.get().get(InstrumentationService.class).get();
logInfo = new XLog.Info(XLog.Info.get());
createdTime = System.currentTimeMillis();
locks = new ArrayList<LockToken>();
}
/**
* Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
*
* @param name command name.
* @param type command type.
* @param priority priority of the command, used when queuing for asynchronous execution.
* @param logMask log mask for the command logging calls.
* @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
* @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
* really submitting the job
*/
public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
this(name, type, priority, logMask, withStore);
this.dryrun = dryrun;
}
/**
* Return the name of the command.
*
* @return the name of the command.
*/
@Override
public String getName() {
return name;
}
/**
* Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
* org.apache.oozie.service.CallableQueueService}.
*
* @return the callable type.
*/
@Override
public String getType() {
return type;
}
/**
* Return the priority of the command.
*
* @return the priority of the command.
*/
@Override
public int getPriority() {
return priority;
}
/**
* Returns the createdTime of the callable in milliseconds
*
* @return the callable createdTime
*/
@Override
public long getCreatedTime() {
return createdTime;
}
/**
* Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
* set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
* {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
* #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
* queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
* <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
* commands queued for exception will be effectively queued fro execution..
*
* @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
* without committing, thus doing a rollback.
*/
@SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
public final T call() throws CommandException {
XLog.Info.get().setParameters(logInfo);
XLog log = XLog.getLog(getClass());
log.trace(logMask, "Start");
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
callables = new ArrayList<XCallable<Void>>();
delayedCallables = new ArrayList<XCallable<Void>>();
exceptionCallables = new ArrayList<XCallable<Void>>();
delay = 0;
S store = null;
boolean exception = false;
try {
if (withStore) {
store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
store.beginTrx();
}
T result = execute(store);
/*
*
* if (store != null && log != null) { log.info(XLog.STD,
* "connection log from store Flush Mode {0} ",
* store.getFlushMode()); }
*/
if (withStore) {
if (store == null) {
throw new IllegalStateException("WorkflowStore should not be null");
}
if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
throw new RuntimeException("Skipping Commit for Failover Testing");
}
store.commitTrx();
}
// TODO figure out the reject due to concurrency problems and remove
// the delayed queuing for callables.
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
if (ret == false) {
logQueueCallableFalse(callables);
}
ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
if (ret == false) {
logQueueCallableFalse(delayedCallables);
}
return result;
}
catch (XException ex) {
log.error(logMask | XLog.OPS, "XException, {0}", ex);
if (store != null) {
log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
.isClosed());
}
exception = true;
if (store != null && store.isActive()) {
try {
store.rollbackTrx();
}
catch (RuntimeException rex) {
log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
}
}
// TODO figure out the reject due to concurrency problems and remove
// the delayed queuing for callables.
boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
if (ret == false) {
logQueueCallableFalse(exceptionCallables);
}
if (ex instanceof CommandException) {
throw (CommandException) ex;
}
else {
throw new CommandException(ex);
}
}
catch (Exception ex) {
log.error(logMask | XLog.OPS, "Exception, {0}", ex);
exception = true;
if (store != null && store.isActive()) {
try {
store.rollbackTrx();
}
catch (RuntimeException rex) {
log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
}
}
throw new CommandException(ErrorCode.E0607, ex);
}
catch (Error er) {
log.error(logMask | XLog.OPS, "Error, {0}", er);
exception = true;
if (store != null && store.isActive()) {
try {
store.rollbackTrx();
}
catch (RuntimeException rex) {
log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
}
}
throw er;
}
finally {
FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
cron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
incrCommandCounter(1);
log.trace(logMask, "End");
if (locks != null) {
for (LockToken lock : locks) {
lock.release();
}
locks.clear();
}
if (store != null) {
if (!store.isActive()) {
try {
store.closeTrx();
}
catch (RuntimeException rex) {
if (exception) {
log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
}
else {
throw rex;
}
}
}
else {
log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
}
}
}
}
/**
* Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
* transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
* single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
* are not queued for execution.
*
* @param callable callable to queue for execution.
*/
protected void queueCallable(XCallable<Void> callable) {
callables.add(callable);
}
/**
* Queue a list of callables for execution after the current callable call invocation completes and the {@link
* WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
* queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
* discarded, they are not queued for execution.
*
* @param callables list of callables to queue for execution.
*/
protected void queueCallable(List<? extends XCallable<Void>> callables) {
this.callables.addAll(callables);
}
/**
* Queue a callable for delayed execution after the current callable call invocation completes and the {@link
* WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
* invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
* <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
* execution.
*
* @param callable callable to queue for delayed execution.
* @param delay the queue delay in milliseconds
*/
protected void queueCallable(XCallable<Void> callable, long delay) {
this.delayedCallables.add(callable);
this.delay = Math.max(this.delay, delay);
}
/**
* Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
* an exception does not happen, all the callables queued by this method are discarded, they are not queued for
* execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
* serial execution.
*
* @param callable callable to queue for execution in the case of an exception.
*/
protected void queueCallableForException(XCallable<Void> callable) {
exceptionCallables.add(callable);
}
/**
* Logging the info if failed to queue the callables.
*
* @param callables
*/
protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
StringBuilder sb = new StringBuilder(
"Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
int size = callables.size();
for (int i = 0; i < size; i++) {
XCallable<Void> callable = callables.get(i);
sb.append(callable.getName());
if (i < size - 1) {
sb.append(", ");
}
else {
sb.append("]");
}
}
XLog.getLog(getClass()).warn(sb.toString());
}
/**
* DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
* transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
* is rolledback.
*
* @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
* store.
* @return the return value of the callable.
* @throws StoreException thrown if the workflow store could not perform an operation.
* @throws CommandException thrown if the command could not perform its operation.
*/
protected abstract T call(S store) throws StoreException, CommandException;
// to do
// need to implement on all sub commands and break down the transactions
// protected abstract T execute(String id) throws CommandException;
/**
* Command subclasses must implement this method correct Store can be passed to call(store);
*
* @return the Store class for use by Callable
* @throws CommandException thrown if the command could not perform its operation.
*/
protected abstract Class<? extends Store> getStoreClass();
/**
* Set the log info with the context of the given coordinator bean.
*
* @param cBean coordinator bean.
*/
protected void setLogInfo(CoordinatorJobBean cBean) {
if (logInfo.getParameter(XLogService.GROUP) == null) {
logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
}
if (logInfo.getParameter(XLogService.USER) == null) {
logInfo.setParameter(XLogService.USER, cBean.getUser());
}
logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
logInfo.setParameter(DagXLogInfoService.TOKEN, "");
logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
XLog.Info.get().setParameters(logInfo);
}
/**
* Set the log info with the context of the given coordinator action bean.
*
* @param action action bean.
*/
protected void setLogInfo(CoordinatorActionBean action) {
logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
// logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
XLog.Info.get().setParameters(logInfo);
}
/**
* Set the log info with the context of the given workflow bean.
*
* @param workflow workflow bean.
*/
protected void setLogInfo(WorkflowJobBean workflow) {
if (logInfo.getParameter(XLogService.GROUP) == null) {
logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
}
if (logInfo.getParameter(XLogService.USER) == null) {
logInfo.setParameter(XLogService.USER, workflow.getUser());
}
logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
XLog.Info.get().setParameters(logInfo);
}
/**
* Set the log info with the context of the given action bean.
*
* @param action action bean.
*/
protected void setLogInfo(WorkflowActionBean action) {
logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
XLog.Info.get().setParameters(logInfo);
}
/**
* Reset the action bean information from the log info.
*/
// TODO check if they are used, else delete
protected void resetLogInfoAction() {
logInfo.clearParameter(DagXLogInfoService.ACTION);
XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
}
/**
* Reset the workflow bean information from the log info.
*/
// TODO check if they are used, else delete
protected void resetLogInfoWorkflow() {
logInfo.clearParameter(DagXLogInfoService.JOB);
logInfo.clearParameter(DagXLogInfoService.APP);
logInfo.clearParameter(DagXLogInfoService.TOKEN);
XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
XLog.Info.get().clearParameter(DagXLogInfoService.APP);
XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
}
/**
* Convenience method to increment counters.
*
* @param group the group name.
* @param name the counter name.
* @param count increment count.
*/
private void incrCounter(String group, String name, int count) {
if (instrumentation != null) {
instrumentation.incr(group, name, count);
}
}
/**
* Used to increment command counters.
*
* @param count the increment count.
*/
protected void incrCommandCounter(int count) {
incrCounter(INSTRUMENTATION_GROUP, name, count);
}
/**
* Used to increment job counters. The counter name s the same as the command name.
*
* @param count the increment count.
*/
protected void incrJobCounter(int count) {
incrJobCounter(name, count);
}
/**
* Used to increment job counters.
*
* @param name the job name.
* @param count the increment count.
*/
protected void incrJobCounter(String name, int count) {
incrCounter(INSTRUMENTATION_JOB_GROUP, name, count);
}
/**
* Return the {@link Instrumentation} instance in use.
*
* @return the {@link Instrumentation} instance in use.
*/
protected Instrumentation getInstrumentation() {
return instrumentation;
}
/**
* Return the identity.
*
* @return the identity.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getType());
sb.append(",").append(getPriority());
return sb.toString();
}
protected boolean lock(String id) throws InterruptedException {
if (id == null || id.length() == 0) {
XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
return false;
}
LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
if (token != null) {
locks.add(token);
return true;
}
else {
return false;
}
}
/*
* TODO - remove store coupling to EM. Store will only contain queries
* protected EntityManager getEntityManager() { return
* store.getEntityManager(); }
*/
protected T execute(S store) throws CommandException, StoreException {
T result = call(store);
return result;
}
/**
* Get command key
*
* @return command key
*/
@Override
public String getKey(){
return this.key;
}
}