blob: 3a81c099d60aa92ad879e7f496f03c6f987ea86b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.active;
import java.util.EnumSet;
import java.util.List;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.lang.common.statement.StartFeedStatement;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.utils.FeedOperations;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class FeedEventsListener extends ActiveEntityEventsListener {
private final Feed feed;
private final List<FeedConnection> feedConnections;
private final ILangExtension.Language translatorLang;
public FeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx,
IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets,
AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory,
Feed feed, final List<FeedConnection> feedConnections, ILangExtension.Language translatorLang)
throws HyracksDataException {
super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory);
this.feed = feed;
this.feedConnections = feedConnections;
this.translatorLang = translatorLang;
}
@Override
public synchronized boolean remove(Dataset dataset) throws HyracksDataException {
super.remove(dataset);
feedConnections.removeIf(o -> o.getDatabaseName().equals(dataset.getDatabaseName())
&& o.getDataverseName().equals(dataset.getDataverseName())
&& o.getDatasetName().equals(dataset.getDatasetName()));
return false;
}
public synchronized void addFeedConnection(FeedConnection feedConnection) {
feedConnections.add(feedConnection);
}
public Feed getFeed() {
return feed;
}
@Override
public synchronized void start(MetadataProvider metadataProvider)
throws HyracksDataException, InterruptedException {
super.start(metadataProvider);
// Note: The current implementation of the wait for completion flag is problematic due to locking issues:
// Locks obtained during the start of the feed are not released, and so, the feed can't be stopped
// and also, read locks over dataverses, datasets, etc, are never released.
boolean wait =
Boolean.parseBoolean((String) metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
if (wait) {
IActiveEntityEventSubscriber stoppedSubscriber =
new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
stoppedSubscriber.sync();
}
}
@Override
protected JobId compileAndStartJob(MetadataProvider mdProvider) throws HyracksDataException {
try {
Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
JobSpecification feedJob = jobInfo.getLeft();
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
// TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
// We will need to design general exception handling mechanism for feeds.
setLocations(jobInfo.getRight());
return JobUtils.runJob(hcc, feedJob, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
@Override
protected void setRunning(MetadataProvider metadataProvider, boolean running) {
// No op
}
@Override
protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
@Override
protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
@Override
protected ActiveRuntimeId getActiveRuntimeId(int partition) {
return new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition);
}
}