blob: a739a191cf9ae82a8f2559dba76ac2481b62798e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.control.EndActionExecutor;
import org.apache.oozie.action.control.ForkActionExecutor;
import org.apache.oozie.action.control.JoinActionExecutor;
import org.apache.oozie.action.control.KillActionExecutor;
import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
public class ActionService implements Service, Instrumentable {
public static final String CONF_ACTION_EXECUTOR_CLASSES = CONF_PREFIX + "ActionService.executor.classes";
public static final String CONF_ACTION_EXECUTOR_EXT_CLASSES = CONF_PREFIX + "ActionService.executor.ext.classes";
private Services services;
private Map<String, Class<? extends ActionExecutor>> executors;
private static XLog LOG = XLog.getLog(ActionService.class);
@SuppressWarnings({"unchecked", "deprecation"})
@Override
public void init(Services services) throws ServiceException {
this.services = services;
ActionExecutor.enableInit();
ActionExecutor.resetInitInfo();
ActionExecutor.disableInit();
executors = new HashMap<String, Class<? extends ActionExecutor>>();
Class<? extends ActionExecutor>[] classes = new Class[] { StartActionExecutor.class,
EndActionExecutor.class, KillActionExecutor.class, ForkActionExecutor.class, JoinActionExecutor.class };
registerExecutors(classes);
classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses
(services.getConf(), CONF_ACTION_EXECUTOR_CLASSES);
registerExecutors(classes);
classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses
(services.getConf(), CONF_ACTION_EXECUTOR_EXT_CLASSES);
registerExecutors(classes);
initExecutors();
}
private void registerExecutors(Class<? extends ActionExecutor>[] classes) {
if (classes != null) {
for (Class<? extends ActionExecutor> executorClass : classes) {
@SuppressWarnings("deprecation")
ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(executorClass, services.getConf());
executors.put(executor.getType(), executorClass);
}
}
}
private void initExecutors() {
for (Class<? extends ActionExecutor> executorClass : executors.values()) {
initExecutor(executorClass);
}
LOG.info("Initialized action types: " + getActionTypes());
}
@Override
public void destroy() {
ActionExecutor.enableInit();
ActionExecutor.resetInitInfo();
ActionExecutor.disableInit();
executors = null;
}
@Override
public Class<? extends Service> getInterface() {
return ActionService.class;
}
@Override
public void instrument(Instrumentation instr) {
instr.addVariable("configuration", "action.types", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
Set<String> actionTypes = getActionTypes();
if (actionTypes != null) {
return actionTypes.toString();
}
return "(unavailable)";
}
});
}
@SuppressWarnings("unchecked")
@VisibleForTesting
public void registerAndInitExecutor(Class<? extends ActionExecutor> klass) {
ActionExecutor.enableInit();
ActionExecutor.resetInitInfo();
ActionExecutor.disableInit();
registerExecutors(new Class[]{klass});
initExecutors();
}
private void initExecutor(Class<? extends ActionExecutor> klass) {
@SuppressWarnings("deprecation")
ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(klass, services.getConf());
LOG.debug("Initializing action type [{0}] class [{1}]", executor.getType(), klass);
ActionExecutor.enableInit();
executor.initActionType();
ActionExecutor.disableInit();
LOG.trace("Initialized Executor for action type [{0}] class [{1}]", executor.getType(), klass);
}
public ActionExecutor getExecutor(String actionType) {
ParamChecker.notEmpty(actionType, "actionType");
Class<? extends ActionExecutor> executorClass = executors.get(actionType);
return (executorClass != null) ? (ActionExecutor) ReflectionUtils.newInstance(executorClass, null) : null;
}
public boolean hasActionType(String actionType) {
ParamChecker.notEmpty(actionType, "actionType");
return executors.containsKey(actionType);
}
Set<String> getActionTypes() {
return executors.keySet();
}
}