blob: bc1dc2673b18ed67278d65e98a8a1292eb0ac112 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.AppType;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
import org.apache.oozie.util.XLog;
/**
* JMS Topic service to retrieve topic names from events or job id
*
*/
public class JMSTopicService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
private static XLog LOG;
private Configuration conf;
private final Map<String, String> topicMap = new HashMap<String, String>();
private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
private JPAService jpaService = Services.get().get(JPAService.class);
private String defaultTopicName = TopicType.USER.value;
private String topicPrefix;
static {
ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
}
static {
JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
}
public static enum JobType {
WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
private String value;
JobType(String value) {
this.value = value;
}
String getValue() {
return value;
}
}
public static enum TopicType {
USER("${username}"), JOBID("${jobId}");
private String value;
TopicType(String value) {
this.value = value;
}
String getValue() {
return value;
}
}
@Override
public void init(Services services) throws ServiceException {
LOG = XLog.getLog(getClass());
conf = services.getConf();
parseTopicConfiguration();
topicPrefix = conf.get(TOPIC_PREFIX, "");
}
private void parseTopicConfiguration() throws ServiceException {
String topicName = ConfigurationService.get(conf, TOPIC_NAME);
if (topicName == null) {
throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
}
LOG.info("Topic Name is [{0}]", topicName);
String[] topic = topicName.trim().split(",");
for (int i = 0; i < topic.length; i++) {
String[] split = topic[i].trim().split("=");
if (split.length == 2) {
split[0] = split[0].trim();
split[1] = split[1].trim();
if (split[0].equals("default")) {
if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
+ " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
}
defaultTopicName = split[1];
}
else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
throw new ServiceException(ErrorCode.E0100, getClass().getName(),
"Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
+ JOB_TYPE_CONSTANTS);
}
else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
+ " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
}
else {
topicMap.put(split[0], split[1]);
}
}
else {
throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
+ "has incorrect syntax; It should be specified as key value pair");
}
}
}
/**
* Retrieve topic from Job id
* @param jobId job id
* @return topicPrefix + topicName Retrieve topic from Job id
* @throws JPAExecutorException if JPA executor has a problem
*/
public String getTopic(String jobId) throws JPAExecutorException {
String topicName = null;
if (jobId.contains("-W@")) {
jobId = jobId.substring(0, jobId.indexOf('@'));
topicName = getTopicForWorkflow(jobId);
}
else if (jobId.endsWith("-W")) {
topicName = getTopicForWorkflow(jobId);
}
else if (jobId.contains("-C@")) {
jobId = jobId.substring(0, jobId.indexOf('@'));
topicName = getTopicForCoordinator(jobId);
}
else if (jobId.endsWith("C")) {
topicName = getTopicForCoordinator(jobId);
}
else if (jobId.contains("-B_")) {
jobId = jobId.substring(0, jobId.indexOf('_'));
topicName = getTopicForBundle(jobId);
}
else if (jobId.endsWith("B")) {
topicName = getTopicForBundle(jobId);
}
return topicPrefix + topicName;
}
/**
* Retrieve Topic
*
* @param appType application type
* @param user user
* @param jobId job id
* @param parentJobId parent job id
* @return topicName
*/
public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
String topicName = null;
String id = jobId;
if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
topicName = topicMap.get(JobType.COORDINATOR.value);
if (appType == AppType.COORDINATOR_ACTION) {
id = parentJobId;
}
}
else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
topicName = topicMap.get(JobType.WORKFLOW.value);
if (appType == AppType.WORKFLOW_ACTION) {
id = parentJobId;
}
}
else if (appType == AppType.BUNDLE_JOB || appType == AppType.BUNDLE_ACTION) {
topicName = topicMap.get(JobType.BUNDLE.value);
if (appType == AppType.BUNDLE_ACTION) {
id = parentJobId;
}
}
if (topicName == null) {
if (defaultTopicName.equals(TopicType.USER.value)) {
topicName = user;
}
else if (defaultTopicName.equals(TopicType.JOBID.value)) {
topicName = id;
}
}
return topicPrefix + topicName;
}
private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
String topicName = topicMap.get(JobType.WORKFLOW.value);
if (topicName == null) {
topicName = defaultTopicName;
}
if (topicName.equals(TopicType.USER.value)) {
topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
}
else if (topicName.equals(TopicType.JOBID.value)) {
topicName = jobId;
}
return topicName;
}
private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
String topicName = topicMap.get(JobType.COORDINATOR.value);
if (topicName == null) {
topicName = defaultTopicName;
}
if (topicName.equals(TopicType.USER.value)) {
topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
}
else if (topicName.equals(TopicType.JOBID.value)) {
topicName = jobId;
}
return topicName;
}
private String getTopicForBundle(String jobId) throws JPAExecutorException {
String topicName = topicMap.get(JobType.BUNDLE.value);
if (topicName == null) {
topicName = defaultTopicName;
}
if (topicName.equals(TopicType.USER.value)) {
topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
}
else if (topicName.equals(TopicType.JOBID.value)) {
topicName = jobId;
}
return topicName;
}
public Properties getTopicPatternProperties() {
Properties props = new Properties();
String wfTopic = topicMap.get(JobType.WORKFLOW.value);
wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
props.put(AppType.WORKFLOW_JOB, wfTopic);
props.put(AppType.WORKFLOW_ACTION, wfTopic);
String coordTopic = topicMap.get(JobType.COORDINATOR.value);
coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
props.put(AppType.COORDINATOR_JOB, coordTopic);
props.put(AppType.COORDINATOR_ACTION, coordTopic);
String bundleTopic = topicMap.get(JobType.BUNDLE.value);
bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
props.put(AppType.BUNDLE_JOB, bundleTopic);
props.put(AppType.BUNDLE_ACTION, bundleTopic);
return props;
}
public String getTopicPrefix() {
return topicPrefix;
}
@Override
public void destroy() {
topicMap.clear();
}
@Override
public Class<? extends Service> getInterface() {
return JMSTopicService.class;
}
}