blob: 95508ec0874915825f205af9265326683a28eb22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.hadoop.PasswordMasker;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.event.BundleJobEvent;
import org.apache.oozie.event.CoordinatorActionEvent;
import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.EventQueue;
import org.apache.oozie.event.MemoryEventQueue;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.sla.listener.SLAEventListener;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
/**
* Service class that handles the events system - creating events queue,
* managing configured properties and managing and invoking various event
* listeners via worker threads
*/
public class EventHandlerService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
private static EventQueue eventQueue;
private XLog LOG;
private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
private Set<String> apptypes;
private static boolean eventsEnabled = false;
private int numWorkers;
@Override
public void init(Services services) throws ServiceException {
try {
Configuration conf = services.getConf();
LOG = XLog.getLog(getClass());
Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) ConfigurationService.getClass
(conf, CONF_EVENT_QUEUE);
eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
eventQueue.init(conf);
// initialize app-types to switch on events for
initApptypes(conf);
// initialize event listeners
initEventListeners(conf);
// initialize worker threads via Scheduler
initWorkerThreads(conf, services);
eventsEnabled = true;
LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
+ " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
.getName(), listenerMap.toString(), apptypes, numWorkers);
}
catch (Exception ex) {
throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
}
}
private void initApptypes(Configuration conf) {
apptypes = new HashSet<String>();
for (String jobtype : ConfigurationService.getStrings(conf, CONF_FILTER_APP_TYPES)) {
String tmp = jobtype.trim().toLowerCase();
if (tmp.length() == 0) {
continue;
}
apptypes.add(tmp);
}
}
private void initEventListeners(Configuration conf) throws Exception {
Class<?>[] listenerClass = ConfigurationService.getClasses(conf, CONF_LISTENERS);
for (int i = 0; i < listenerClass.length; i++) {
Object listener = null;
try {
listener = listenerClass[i].newInstance();
}
catch (InstantiationException e) {
LOG.warn("Could not create event listener instance, " + e);
}
catch (IllegalAccessException e) {
LOG.warn("Illegal access to event listener instance, " + e);
}
addEventListener(listener, conf, listenerClass[i].getName());
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
if (listener instanceof JobEventListener) {
List listenersList = listenerMap.get(MessageType.JOB);
if (listenersList == null) {
listenersList = new ArrayList();
listenerMap.put(MessageType.JOB, listenersList);
}
listenersList.add(listener);
((JobEventListener) listener).init(conf);
}
else if (listener instanceof SLAEventListener) {
List listenersList = listenerMap.get(MessageType.SLA);
if (listenersList == null) {
listenersList = new ArrayList();
listenerMap.put(MessageType.SLA, listenersList);
}
listenersList.add(listener);
((SLAEventListener) listener).init(conf);
}
else {
LOG.warn("Event listener [{0}] is of undefined type", name);
}
}
public static boolean isEnabled() {
return eventsEnabled;
}
private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
numWorkers = ConfigurationService.getInt(conf, CONF_WORKER_THREADS);
int interval = ConfigurationService.getInt(conf, CONF_WORKER_INTERVAL);
SchedulerService ss = services.get(SchedulerService.class);
int available = ss.getSchedulableThreads(conf);
if (numWorkers + 3 > available) {
throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
+ numWorkers + "] cannot be handled with current settings. Increase "
+ SchedulerService.SCHEDULER_THREADS);
}
Runnable eventWorker = new EventWorker();
// schedule staggered runnables every 1 min interval by default
for (int i = 0; i < numWorkers; i++) {
ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
}
}
@Override
public void destroy() {
eventsEnabled = false;
for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) {
List<?> listeners = entry.getValue();
MessageType type = entry.getKey();
for (Object listener : listeners) {
if (type == MessageType.JOB) {
((JobEventListener) listener).destroy();
} else if (type == MessageType.SLA) {
((SLAEventListener) listener).destroy();
}
}
}
}
@Override
public Class<? extends Service> getInterface() {
return EventHandlerService.class;
}
public boolean isSupportedApptype(String appType) {
if (!apptypes.contains(appType.toLowerCase())) {
return false;
}
return true;
}
public void setAppTypes(Set<String> types) {
apptypes = types;
}
public Set<String> getAppTypes() {
return apptypes;
}
public String listEventListeners() {
return listenerMap.toString();
}
public void queueEvent(Event event) {
LOG = LogUtils.setLogPrefix(LOG, event);
LOG.debug("Queueing event : {0}", event);
LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
eventQueue.add(event);
LogUtils.clearLogPrefix();
}
public EventQueue getEventQueue() {
return eventQueue;
}
public class EventWorker implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
return;
}
try {
if (!eventQueue.isEmpty()) {
List<Event> work = eventQueue.pollBatch();
for (Event event : work) {
LOG = LogUtils.setLogPrefix(LOG, event);
LOG.debug("Processing event : {0}", event);
MessageType msgType = event.getMsgType();
List<?> listeners = listenerMap.get(msgType);
if (listeners != null) {
Iterator<?> iter = listeners.iterator();
while (iter.hasNext()) {
try {
if (msgType == MessageType.JOB) {
invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
}
else if (msgType == MessageType.SLA) {
invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
}
else {
iter.next();
}
}
catch (Throwable error) {
XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
error);
XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " +
"Error message: {0}",
new PasswordMasker().maskPasswordsIfNecessary(error.getMessage()));
}
}
}
}
}
}
catch (Throwable error) {
XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
error);
XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " +
"Error message: {0}",
new PasswordMasker().maskPasswordsIfNecessary(error.getMessage()));
}
}
private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
switch (event.getAppType()) {
case WORKFLOW_JOB:
jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
break;
case WORKFLOW_ACTION:
jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
break;
case COORDINATOR_JOB:
jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
break;
case COORDINATOR_ACTION:
jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
break;
case BUNDLE_JOB:
jobListener.onBundleJobEvent((BundleJobEvent)event);
break;
default:
XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
event.getAppType());
}
}
private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
switch (event.getEventStatus()) {
case START_MET:
slaListener.onStartMet(event);
break;
case START_MISS:
slaListener.onStartMiss(event);
break;
case END_MET:
slaListener.onEndMet(event);
break;
case END_MISS:
slaListener.onEndMiss(event);
break;
case DURATION_MET:
slaListener.onDurationMet(event);
break;
case DURATION_MISS:
slaListener.onDurationMiss(event);
break;
default:
XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
}
}
}
}