blob: 9c69aca8efd0737dbfacabb1b2f849ef3cda515e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.active;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveEvent.EventKind;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class ActiveJobNotificationHandler implements Runnable {
public static final ActiveJobNotificationHandler INSTANCE = new ActiveJobNotificationHandler();
public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
private static final boolean DEBUG = false;
private final LinkedBlockingQueue<ActiveEvent> eventInbox;
private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
private ActiveJobNotificationHandler() {
this.eventInbox = new LinkedBlockingQueue<>();
this.jobId2ActiveJobInfos = new HashMap<>();
this.entityEventListener = new HashMap<>();
}
@Override
public void run() {
Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName());
while (!Thread.interrupted()) {
try {
ActiveEvent event = getEventInbox().take();
ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
EntityId entityId = jobInfo.getEntityId();
IActiveEntityEventsListener listener = entityEventListener.get(entityId);
if (DEBUG) {
LOGGER.log(Level.INFO, "Next event is of type " + event.getEventKind());
LOGGER.log(Level.INFO, "Notifying the listener");
}
listener.notify(event);
if (event.getEventKind() == EventKind.JOB_FINISH) {
removeFinishedJob(event.getJobId());
removeInactiveListener(listener);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error handling an active job event", e);
}
}
LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
}
private void removeFinishedJob(JobId jobId) {
if (DEBUG) {
LOGGER.log(Level.INFO, "Removing the job");
}
jobId2ActiveJobInfos.remove(jobId);
}
private void removeInactiveListener(IActiveEntityEventsListener listener) {
if (!listener.isEntityActive()) {
if (DEBUG) {
LOGGER.log(Level.INFO, "Removing the listener since it is not active anymore");
}
entityEventListener.remove(listener.getEntityId());
}
}
public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
if (DEBUG) {
LOGGER.log(Level.INFO, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
IActiveEntityEventsListener listener = entityEventListener.get(entityId);
LOGGER.log(Level.INFO, "Listener found: " + listener);
}
return entityEventListener.get(entityId);
}
public synchronized ActiveJob[] getActiveJobs() {
if (DEBUG) {
LOGGER.log(Level.INFO, "getActiveJobs() was called");
LOGGER.log(Level.INFO, "Number of jobs found: " + jobId2ActiveJobInfos.size());
}
return jobId2ActiveJobInfos.values().toArray(new ActiveJob[jobId2ActiveJobInfos.size()]);
}
public boolean isActiveJob(JobId jobId) {
if (DEBUG) {
LOGGER.log(Level.INFO, "isActiveJob(JobId jobId) called with jobId: " + jobId);
boolean found = jobId2ActiveJobInfos.get(jobId) != null;
LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
}
return jobId2ActiveJobInfos.get(jobId) != null;
}
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
if (DEBUG) {
LOGGER.log(Level.INFO,
"notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
+ jobId);
}
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (property == null || !(property instanceof ActiveJob)) {
if (DEBUG) {
LOGGER.log(Level.INFO, "Job was is not active. property found to be: " + property);
}
return;
} else {
monitorJob(jobId, (ActiveJob) property);
}
if (DEBUG) {
boolean found = jobId2ActiveJobInfos.get(jobId) != null;
LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
}
ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
if (jobInfo != null) {
EntityId entityId = jobInfo.getEntityId();
IActiveEntityEventsListener listener = entityEventListener.get(entityId);
listener.notifyJobCreation(jobId, jobSpecification);
if (DEBUG) {
LOGGER.log(Level.INFO, "Listener was notified" + jobId);
}
} else {
if (DEBUG) {
LOGGER.log(Level.INFO, "Listener was not notified since it was not registered for the job " + jobId);
}
}
}
public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
return eventInbox;
}
public synchronized IActiveEntityEventsListener[] getEventListeners() {
if (DEBUG) {
LOGGER.log(Level.INFO, "getEventListeners() was called");
LOGGER.log(Level.INFO, "returning " + entityEventListener.size() + " Listeners");
}
return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
}
public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
if (DEBUG) {
LOGGER.log(Level.INFO, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+ listener.getEntityId());
}
if (entityEventListener.containsKey(listener.getEntityId())) {
throw new HyracksDataException(
"Active Entity Listener " + listener.getEntityId() + " is already registered");
}
entityEventListener.put(listener.getEntityId(), listener);
}
public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
if (DEBUG) {
LOGGER.log(Level.INFO, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
boolean found = jobId2ActiveJobInfos.get(jobId) != null;
LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
}
if (entityEventListener.containsKey(activeJob.getEntityId())) {
if (jobId2ActiveJobInfos.containsKey(jobId)) {
LOGGER.severe("Job is already being monitored for job: " + jobId);
return;
}
if (DEBUG) {
LOGGER.log(Level.INFO, "monitoring started for job id: " + jobId);
}
jobId2ActiveJobInfos.put(jobId, activeJob);
} else {
LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
}
}
}