blob: 1c812a0514ae2446f5abc1240df72a65cee25ea9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.metadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveJob;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobInfo;
import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.log4j.Logger;
public class ChannelEventsListener implements IActiveEntityEventsListener {
private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
private final List<IActiveLifecycleEventSubscriber> subscribers;
private final Map<Long, ActiveJob> jobs;
private final Map<EntityId, ChannelJobInfo> jobInfos;
private EntityId entityId;
public ChannelEventsListener(EntityId entityId) {
this.entityId = entityId;
subscribers = new ArrayList<>();
jobs = new HashMap<>();
jobInfos = new HashMap<>();
}
@Override
public void notify(ActiveEvent event) {
try {
switch (event.getEventKind()) {
case JOB_START:
handleJobStartEvent(event);
break;
case JOB_FINISH:
handleJobFinishEvent(event);
break;
case PARTITION_EVENT:
LOGGER.warn("Partition Channel Event");
break;
default:
break;
}
} catch (Exception e) {
LOGGER.error("Unhandled Exception", e);
}
}
private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
ActiveJob jobInfo = jobs.get(message.getJobId().getId());
handleJobStartMessage((ChannelJobInfo) jobInfo);
}
private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
ActiveJob jobInfo = jobs.get(message.getJobId().getId());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job finished for " + jobInfo);
}
handleJobFinishMessage((ChannelJobInfo) jobInfo);
}
private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
EntityId channelJobId = cInfo.getEntityId();
IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
JobStatus status = info.getStatus();
boolean failure = status != null && status.equals(JobStatus.FAILURE);
jobInfos.remove(channelJobId);
jobs.remove(cInfo.getJobId().getId());
// notify event listeners
ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
: ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
notifyEventSubscribers(event);
}
private void notifyEventSubscribers(ActiveLifecycleEvent event) {
if (subscribers != null && !subscribers.isEmpty()) {
for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
subscriber.handleEvent(event);
}
}
}
private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
channelOperatorIds.add(opDesc.getOperatorId());
}
}
IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
List<String> locations = new ArrayList<>();
for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
int nOperatorInstances = operatorLocations.size();
for (int i = 0; i < nOperatorInstances; i++) {
locations.add(operatorLocations.get(i));
}
}
cInfo.setLocations(locations);
cInfo.setState(ActivityState.ACTIVE);
}
@Override
public void notifyJobCreation(JobId jobId, JobSpecification spec) {
try {
registerJob(jobId, spec);
return;
} catch (Exception e) {
LOGGER.error(e);
}
}
public synchronized void registerJob(JobId jobId, JobSpecification jobSpec) {
if (jobs.get(jobId.getId()) != null) {
throw new IllegalStateException("Channel job already registered");
}
if (jobInfos.containsKey(jobId.getId())) {
throw new IllegalStateException("Channel job already registered");
}
ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
jobs.put(jobId.getId(), cInfo);
jobInfos.put(entityId, cInfo);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
}
notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
}
public JobSpecification getJobSpecification(EntityId activeJobId) {
return jobInfos.get(activeJobId).getSpec();
}
public ChannelJobInfo getJobInfo(EntityId activeJobId) {
return jobInfos.get(activeJobId);
}
public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
subscribers.add(subscriber);
}
public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
subscribers.remove(subscriber);
}
public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
boolean active = false;
ChannelJobInfo cInfo = jobInfos.get(activeJobId);
if (cInfo != null) {
active = cInfo.getState().equals(ActivityState.ACTIVE);
}
if (active) {
registerEventSubscriber(eventSubscriber);
}
return active;
}
public FeedConnectionId[] getConnections() {
return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
}
@Override
public boolean isEntityActive() {
return !jobs.isEmpty();
}
@Override
public EntityId getEntityId() {
return entityId;
}
@Override
public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
if (entityId.getDataverse().equals(dataverseName)) {
String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
return true;
}
}
return false;
}
}