blob: 369885dea34f839561a49df7511460535f20e0ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.LensEvent;
import org.apache.lens.server.api.events.LensEventListener;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.health.HealthStatus;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.AbstractService;
import lombok.extern.slf4j.Slf4j;
/**
* Implementation of LensEventService
*/
@Slf4j
public class EventServiceImpl extends AbstractService implements LensEventService {
/** The event listeners. */
private final Map<Class<? extends LensEvent>, List<LensEventListener>> eventListeners
= new HashMap<Class<? extends LensEvent>, List<LensEventListener>>();
/** The event handler pool. */
private ExecutorService eventHandlerPool;
/**
* Instantiates a new event service impl.
*
* @param name the name
*/
public EventServiceImpl(String name) {
super(name);
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.AbstractService#init(org.apache.hadoop.hive.conf.HiveConf)
*/
@Override
public synchronized void init(HiveConf hiveConf) {
int numProcs = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("Event_Service_Thread-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
eventHandlerPool = Executors.newFixedThreadPool(hiveConf.getInt(LensConfConstants.EVENT_SERVICE_THREAD_POOL_SIZE,
numProcs), factory);
super.init(hiveConf);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.lens.server.api.events.LensEventService#removeListener
* (org.apache.lens.server.api.events.LensEventListener)
*/
@Override
public void removeListener(LensEventListener listener) {
synchronized (eventListeners) {
for (List<LensEventListener> listeners : eventListeners.values()) {
if (listeners.remove(listener)) {
log.info("Removed listener {}", listener);
}
}
}
}
/**
* Handle event.
*
* @param listeners the listeners
* @param evt the evt
*/
@SuppressWarnings("unchecked")
private void handleEvent(List<LensEventListener> listeners, LensEvent evt) {
if (listeners != null && !listeners.isEmpty()) {
for (LensEventListener listener : listeners) {
try {
listener.onEvent(evt);
} catch (Exception exc) {
log.error("Error in handling event {} for listener {}", evt.getEventId(), listener, exc);
}
}
}
}
/**
* The Class EventHandler.
*/
private final class EventHandler implements Runnable {
/** The event. */
final LensEvent event;
/**
* Instantiates a new event handler.
*
* @param event the event
*/
EventHandler(LensEvent event) {
this.event = event;
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
Class<? extends LensEvent> evtClass = event.getClass();
// Call listeners directly listening for this event type
handleEvent(eventListeners.get(evtClass), event);
Class<?> superClass = evtClass.getSuperclass();
// Call listeners which listen of super types of this event type
while (LensEvent.class.isAssignableFrom(superClass)) {
if (eventListeners.containsKey(superClass)) {
handleEvent(eventListeners.get(superClass), event);
}
superClass = superClass.getSuperclass();
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.events.LensEventService#notifyEvent(org.apache.lens.server.api.events.LensEvent)
*/
@SuppressWarnings("unchecked")
@Override
public void notifyEvent(final LensEvent evt) throws LensException {
if (getServiceState() != STATE.STARTED) {
throw new LensException("Event service is not in STARTED state. Current state is " + getServiceState());
}
if (evt == null) {
return;
}
eventHandlerPool.submit(new EventHandler(evt));
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.events.LensEventService#getListeners(java.lang.Class)
*/
@Override
public <T extends LensEvent> Collection<LensEventListener> getListeners(Class<T> eventType) {
return Collections.unmodifiableList(eventListeners.get(eventType));
}
@Override
public HealthStatus getHealthStatus() {
return (this.getServiceState().equals(STATE.STARTED)
&& !eventHandlerPool.isShutdown()
&& !eventHandlerPool.isTerminated())
? new HealthStatus(true, "Event service is healthy.")
: new HealthStatus(false, "Event service is unhealthy.");
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.AbstractService#start()
*/
@Override
public synchronized void start() {
super.start();
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.AbstractService#stop()
*/
@Override
public void stop() {
if (eventHandlerPool != null) {
List<Runnable> pending = eventHandlerPool.shutdownNow();
if (pending != null && !pending.isEmpty()) {
StringBuilder pendingMsg = new StringBuilder("Pending Events:");
for (Runnable handler : pending) {
if (handler instanceof EventHandler) {
pendingMsg.append(((EventHandler) handler).event.getEventId()).append(",");
}
}
log.info("Event listener service stopped while {} events still pending", pending.size());
log.info(pendingMsg.toString());
}
}
log.info("Event service stopped");
super.stop();
}
public Map<Class<? extends LensEvent>, List<LensEventListener>> getEventListeners() {
return eventListeners;
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.events.LensEventService#addListenerForType(org.apache.lens.server.api.events.
* LensEventListener, java.lang.Class)
*/
@Override
public <T extends LensEvent> void addListenerForType(LensEventListener<? super T> listener, Class<T> eventType) {
synchronized (eventListeners) {
List<LensEventListener> listeners = eventListeners.get(eventType);
if (listeners == null) {
listeners = new ArrayList<LensEventListener>();
eventListeners.put(eventType, listeners);
}
listeners.add(listener);
}
log.info("Added listener {} for type:{}", listener, eventType.getName());
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.events.LensEventService#removeListenerForType(org.apache.lens.server.api.events.
* LensEventListener, java.lang.Class)
*/
@Override
public <T extends LensEvent> void removeListenerForType(LensEventListener<? super T> listener, Class<T> eventType) {
synchronized (eventListeners) {
List<LensEventListener> listeners = eventListeners.get(eventType);
if (listeners != null) {
if (listeners.remove(listener)) {
log.info("Removed listener {}", listener);
}
}
}
}
}