| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.app.active; |
| |
| import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.asterix.active.ActiveEvent; |
| import org.apache.asterix.active.ActiveEvent.Kind; |
| import org.apache.asterix.active.ActiveRuntimeId; |
| import org.apache.asterix.active.ActivityState; |
| import org.apache.asterix.active.EntityId; |
| import org.apache.asterix.active.IActiveEntityEventSubscriber; |
| import org.apache.asterix.active.IRetryPolicyFactory; |
| import org.apache.asterix.active.NoRetryPolicyFactory; |
| import org.apache.asterix.active.message.ActiveManagerMessage; |
| import org.apache.asterix.active.message.ActivePartitionMessage; |
| import org.apache.asterix.active.message.ActivePartitionMessage.Event; |
| import org.apache.asterix.active.message.ActiveStatsRequestMessage; |
| import org.apache.asterix.active.message.StopRuntimeParameters; |
| import org.apache.asterix.common.api.IMetadataLockManager; |
| import org.apache.asterix.common.cluster.IClusterStateManager; |
| import org.apache.asterix.common.dataflow.ICcApplicationContext; |
| import org.apache.asterix.common.exceptions.ErrorCode; |
| import org.apache.asterix.common.exceptions.RuntimeDataException; |
| import org.apache.asterix.common.messaging.api.ICCMessageBroker; |
| import org.apache.asterix.common.messaging.api.INcAddressedMessage; |
| import org.apache.asterix.common.metadata.DataverseName; |
| import org.apache.asterix.common.metadata.IDataset; |
| import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; |
| import org.apache.asterix.metadata.api.IActiveEntityController; |
| import org.apache.asterix.metadata.declared.MetadataProvider; |
| import org.apache.asterix.metadata.entities.Dataset; |
| import org.apache.asterix.translator.IStatementExecutor; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.api.client.IHyracksClientConnection; |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.api.job.JobId; |
| import org.apache.hyracks.api.job.JobStatus; |
| import org.apache.hyracks.api.util.InvokeUtil; |
| import org.apache.hyracks.util.ExitUtil; |
| import org.apache.hyracks.util.Span; |
| import org.apache.logging.log4j.Level; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| public abstract class ActiveEntityEventsListener implements IActiveEntityController { |
| |
| private static final Logger LOGGER = LogManager.getLogger(); |
| private static final Level level = Level.DEBUG; |
| private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null); |
| private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, |
| ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING); |
| private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}"; |
| protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; |
| protected final IClusterStateManager clusterStateManager; |
| protected final ActiveNotificationHandler handler; |
| protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>(); |
| protected final IStatementExecutor statementExecutor; |
| protected final ICcApplicationContext appCtx; |
| protected final MetadataProvider metadataProvider; |
| protected final IHyracksClientConnection hcc; |
| protected final EntityId entityId; |
| private final Set<Dataset> datasets; |
| protected final ActiveEvent statsUpdatedEvent; |
| protected final String runtimeName; |
| protected final IRetryPolicyFactory retryPolicyFactory; |
| // mutables |
| protected volatile ActivityState state; |
| private AlgebricksAbsolutePartitionConstraint locations; |
| protected ActivityState prevState; |
| protected JobId jobId; |
| protected volatile long statsTimestamp; |
| protected volatile String stats; |
| protected volatile boolean isFetchingStats; |
| protected int numRegistered; |
| protected int numDeRegistered; |
| protected volatile RecoveryTask rt; |
| protected volatile boolean suspended = false; |
| // failures |
| protected Exception jobFailure; |
| protected Exception resumeFailure; |
| protected Exception startFailure; |
| protected Exception stopFailure; |
| protected Exception recoverFailure; |
| |
| public ActiveEntityEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, |
| IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, |
| AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory) |
| throws HyracksDataException { |
| this.statementExecutor = statementExecutor; |
| this.appCtx = appCtx; |
| this.clusterStateManager = appCtx.getClusterStateManager(); |
| this.metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx); |
| this.hcc = hcc; |
| this.entityId = entityId; |
| this.datasets = new HashSet<>(datasets); |
| this.retryPolicyFactory = retryPolicyFactory; |
| this.state = ActivityState.STOPPED; |
| this.statsTimestamp = -1; |
| this.isFetchingStats = false; |
| this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null); |
| this.stats = DEFAULT_ACTIVE_STATS; |
| this.runtimeName = runtimeName; |
| this.locations = locations; |
| this.numRegistered = 0; |
| this.numDeRegistered = 0; |
| this.handler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| handler.registerListener(this); |
| } |
| |
| protected synchronized void setState(ActivityState newState) { |
| LOGGER.log(level, "State of {} is being set to {} from {}", getEntityId(), newState, state); |
| this.prevState = state; |
| this.state = newState; |
| if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING |
| || newState == ActivityState.RESUMING) { |
| jobFailure = null; |
| } else if (newState == ActivityState.SUSPENDED) { |
| suspended = true; |
| } |
| notifySubscribers(STATE_CHANGED); |
| } |
| |
| @Override |
| public synchronized void notify(ActiveEvent event) { |
| try { |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "EventListener is notified."); |
| } |
| ActiveEvent.Kind eventKind = event.getEventKind(); |
| switch (eventKind) { |
| case JOB_CREATED: |
| case JOB_STARTED: |
| break; |
| case JOB_FINISHED: |
| finish(event); |
| break; |
| case PARTITION_EVENT: |
| handle((ActivePartitionMessage) event.getEventObject()); |
| break; |
| default: |
| LOGGER.log(Level.DEBUG, "Unhandled feed event notification: " + event); |
| break; |
| } |
| notifySubscribers(event); |
| } catch (Exception e) { |
| LOGGER.log(Level.ERROR, "Unhandled Exception", e); |
| } |
| } |
| |
| protected synchronized void handle(ActivePartitionMessage message) { |
| if (message.getEvent() == Event.RUNTIME_REGISTERED) { |
| numRegistered++; |
| if (allPartitionsRegisteredAndNotCancelling()) { |
| setState(ActivityState.RUNNING); |
| } |
| } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) { |
| numDeRegistered++; |
| } |
| } |
| |
| private boolean allPartitionsRegisteredAndNotCancelling() { |
| return numRegistered == locations.getLocations().length && state != ActivityState.CANCELLING; |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void finish(ActiveEvent event) throws HyracksDataException { |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "Active job {} finished", jobId); |
| } |
| JobId lastJobId = jobId; |
| if (numRegistered != numDeRegistered) { |
| LOGGER.log(Level.WARN, |
| "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, |
| numRegistered, numDeRegistered); |
| } |
| jobId = null; |
| Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); |
| JobStatus jobStatus = status.getLeft(); |
| List<Exception> exceptions = status.getRight(); |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus); |
| } |
| if (!jobSuccessfullyTerminated(jobStatus)) { |
| jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) |
| : exceptions.get(0); |
| LOGGER.error("Active Job {} failed", lastJobId, jobFailure); |
| setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED |
| : ActivityState.TEMPORARILY_FAILED); |
| if (prevState == ActivityState.RUNNING) { |
| recover(); |
| } |
| } else { |
| setState(state == ActivityState.SUSPENDING ? ActivityState.SUSPENDED : ActivityState.STOPPED); |
| } |
| } |
| |
| private boolean jobSuccessfullyTerminated(JobStatus jobStatus) { |
| return jobStatus.equals(JobStatus.TERMINATED); |
| } |
| |
| @Override |
| public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) { |
| subscriber.subscribed(this); |
| if (!subscriber.isDone()) { |
| subscribers.add(subscriber); |
| } |
| } |
| |
| @Override |
| public EntityId getEntityId() { |
| return entityId; |
| } |
| |
| @Override |
| public ActivityState getState() { |
| return state; |
| } |
| |
| @Override |
| public synchronized boolean isEntityUsingDataset(IDataset dataset) { |
| return isActive() && getDatasets().contains(dataset); |
| } |
| |
| @Override |
| public synchronized boolean remove(Dataset dataset) throws HyracksDataException { |
| if (isActive()) { |
| throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state); |
| } |
| return getDatasets().remove(dataset); |
| } |
| |
| @Override |
| public synchronized boolean add(Dataset dataset) throws HyracksDataException { |
| if (isActive()) { |
| throw new RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, state); |
| } |
| return getDatasets().add(dataset); |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| @Override |
| public String getStats() { |
| return stats; |
| } |
| |
| @Override |
| public long getStatsTimeStamp() { |
| return statsTimestamp; |
| } |
| |
| public String formatStats(List<String> responses) { |
| StringBuilder strBuilder = new StringBuilder(); |
| strBuilder.append("{\"Stats\": [").append(responses.get(0)); |
| for (int i = 1; i < responses.size(); i++) { |
| strBuilder.append(", ").append(responses.get(i)); |
| } |
| strBuilder.append("]}"); |
| return strBuilder.toString(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void refreshStats(long timeout) throws HyracksDataException { |
| LOGGER.log(level, "refreshStats called"); |
| // first check state & if we are fetching outside of the lock- in the event we are recovering it may take some |
| // time to obtain the lock... |
| ensureRunning(); |
| if (isFetchingStats) { |
| LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats); |
| return; |
| } |
| synchronized (this) { |
| // now that we have the lock, again verify the state & ensure we are not already fetching new stats |
| ensureRunning(); |
| if (isFetchingStats) { |
| LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats); |
| return; |
| } else { |
| isFetchingStats = true; |
| } |
| } |
| ICCMessageBroker messageBroker = |
| (ICCMessageBroker) metadataProvider.getApplicationContext().getServiceContext().getMessageBroker(); |
| long reqId = messageBroker.newRequestId(); |
| List<INcAddressedMessage> requests = new ArrayList<>(); |
| List<String> ncs = Arrays.asList(locations.getLocations()); |
| for (int i = 0; i < ncs.size(); i++) { |
| requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); |
| } |
| try { |
| List<String> responses = |
| (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout, false); |
| stats = formatStats(responses); |
| statsTimestamp = System.currentTimeMillis(); |
| notifySubscribers(statsUpdatedEvent); |
| } catch (Exception e) { |
| throw HyracksDataException.create(e); |
| } |
| isFetchingStats = false; |
| } |
| |
| protected void ensureRunning() throws RuntimeDataException { |
| if (state != ActivityState.RUNNING) { |
| throw new RuntimeDataException(ACTIVE_ENTITY_NOT_RUNNING, runtimeName, String.valueOf(state).toLowerCase()); |
| } |
| } |
| |
| protected synchronized void notifySubscribers(ActiveEvent event) { |
| notifyAll(); |
| Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator(); |
| while (it.hasNext()) { |
| IActiveEntityEventSubscriber subscriber = it.next(); |
| if (subscriber.isDone()) { |
| it.remove(); |
| } else { |
| subscriber.notify(event); |
| if (subscriber.isDone()) { |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| public AlgebricksAbsolutePartitionConstraint getLocations() { |
| return locations; |
| } |
| |
| /** |
| * this method is called whenever an action is requested. It ensures no interleaved requests |
| * |
| * @throws InterruptedException |
| */ |
| protected synchronized void waitForNonTransitionState() throws InterruptedException { |
| while (TRANSITION_STATES.contains(state) || suspended) { |
| this.wait(); |
| } |
| } |
| |
| @Override |
| public synchronized void recover() { |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "Recover is called on {}", entityId); |
| } |
| if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { |
| LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); |
| setState(ActivityState.STOPPED); |
| } else { |
| ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); |
| setState(ActivityState.TEMPORARILY_FAILED); |
| LOGGER.log(level, "Recovery task has been submitted"); |
| rt = createRecoveryTask(); |
| executor.submit(rt.recover()); |
| } |
| } |
| |
| @Override |
| public synchronized void start(MetadataProvider metadataProvider) |
| throws HyracksDataException, InterruptedException { |
| waitForNonTransitionState(); |
| if (state != ActivityState.STOPPED) { |
| throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state); |
| } |
| try { |
| setState(ActivityState.STARTING); |
| doStart(metadataProvider); |
| setRunning(metadataProvider, true); |
| } catch (Exception e) { |
| setState(ActivityState.STOPPED); |
| LOGGER.log(Level.ERROR, "Failed to start the entity " + entityId, e); |
| throw HyracksDataException.create(e); |
| } |
| } |
| |
| @SuppressWarnings("squid:S1181") |
| protected synchronized void doStart(MetadataProvider metadataProvider) throws HyracksDataException { |
| WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, |
| EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED)); |
| jobId = compileAndStartJob(metadataProvider); |
| numRegistered = 0; |
| numDeRegistered = 0; |
| try { |
| subscriber.sync(); |
| if (subscriber.getFailure() != null) { |
| throw subscriber.getFailure(); |
| } |
| } catch (InterruptedException ie) { |
| // interrupted.. check if the subscriber is done |
| if (subscriber.isDone()) { |
| if (subscriber.getFailure() != null) { |
| throw HyracksDataException.create(subscriber.getFailure()); |
| } |
| Thread.currentThread().interrupt(); |
| } else { |
| // Subscriber is not done yet. so, we need to cancel, we have the jobId |
| setState(ActivityState.CANCELLING); |
| cancelJob(ie); |
| throw HyracksDataException.create(ie); |
| } |
| } catch (Throwable e) { |
| throw HyracksDataException.create(e); |
| } |
| } |
| |
| protected synchronized void doRecover(MetadataProvider metadataProvider) throws HyracksDataException { |
| doStart(metadataProvider); |
| } |
| |
| private void cancelJob(Throwable th) { |
| cancelJobSafely(metadataProvider, th); |
| // we can come here due to a failure while in suspending state |
| final WaitForStateSubscriber cancelSubscriber = |
| new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.TEMPORARILY_FAILED)); |
| final Span span = Span.start(2, TimeUnit.MINUTES); |
| InvokeUtil.doUninterruptibly(() -> { |
| if (!cancelSubscriber.sync(span)) { |
| ExitUtil.halt(ExitUtil.EC_FAILED_TO_CANCEL_ACTIVE_START_STOP); |
| } |
| }); |
| } |
| |
| @SuppressWarnings("squid:S1181") |
| protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable e) { |
| try { |
| metadataProvider.getApplicationContext().getHcc().cancelJob(jobId); |
| } catch (Throwable th) { |
| LOGGER.warn("Failed to cancel active job {}", jobId, th); |
| e.addSuppressed(th); |
| } |
| } |
| |
| protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException; |
| |
| @SuppressWarnings("squid:S1181") |
| protected synchronized void doStop(MetadataProvider metadataProvider, long timeout, TimeUnit unit) |
| throws HyracksDataException { |
| ActivityState intention = state; |
| Set<ActivityState> waitFor; |
| if (intention == ActivityState.STOPPING) { |
| waitFor = EnumSet.of(ActivityState.STOPPED); |
| } else if (intention == ActivityState.SUSPENDING) { |
| waitFor = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED); |
| } else { |
| throw new IllegalStateException("stop with what intention?? Current state is " + intention); |
| } |
| WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor); |
| // Note: once we start sending stop messages, we can't go back until the entity is stopped |
| final String nameBefore = Thread.currentThread().getName(); |
| try { |
| Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); |
| sendStopMessages(metadataProvider, timeout, unit); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Waiting for its state to become " + waitFor); |
| } |
| subscriber.sync(); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Disconnect has been completed " + waitFor); |
| } |
| } catch (InterruptedException ie) { |
| forceStop(subscriber, ie); |
| Thread.currentThread().interrupt(); |
| } catch (Throwable e) { |
| LOGGER.error("forcing active job stop due to", e); |
| forceStop(subscriber, e); |
| } finally { |
| Thread.currentThread().setName(nameBefore); |
| } |
| } |
| |
| private void forceStop(WaitForStateSubscriber subscriber, Throwable e) { |
| if (!subscriber.isDone()) { |
| cancelJob(e); |
| } |
| // Stop should not through an exception if the entity was stopped.. |
| // Simply log |
| LOGGER.warn("Failure encountered while stopping {}", this, e); |
| } |
| |
| protected void sendStopMessages(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws Exception { |
| ICcApplicationContext applicationCtx = metadataProvider.getApplicationContext(); |
| ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker(); |
| AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations(); |
| int partition = 0; |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations); |
| } |
| for (String location : runtimeLocations.getLocations()) { |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.log(Level.INFO, "Sending to " + location); |
| } |
| ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); |
| messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, |
| runtimeId, new StopRuntimeParameters(timeout, unit)), location); |
| } |
| } |
| |
| protected abstract ActiveRuntimeId getActiveRuntimeId(int partition); |
| |
| protected abstract void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException; |
| |
| protected abstract void doResume(MetadataProvider metadataProvider) throws HyracksDataException; |
| |
| protected abstract void setRunning(MetadataProvider metadataProvider, boolean running); |
| |
| @Override |
| public final synchronized void stop(MetadataProvider metadataProvider) |
| throws HyracksDataException, InterruptedException { |
| waitForNonTransitionState(); |
| if (state != ActivityState.RUNNING && state != ActivityState.TEMPORARILY_FAILED) { |
| throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); |
| } |
| if (state == ActivityState.TEMPORARILY_FAILED) { |
| if (rt != null) { |
| setState(ActivityState.STOPPING); |
| rt.cancel(); |
| rt = null; |
| } |
| setState(ActivityState.STOPPED); |
| try { |
| setRunning(metadataProvider, false); |
| } catch (Exception e) { |
| LOGGER.log(Level.ERROR, "Failed to set the entity state as not running " + entityId, e); |
| throw HyracksDataException.create(e); |
| } |
| } else if (state == ActivityState.RUNNING) { |
| setState(ActivityState.STOPPING); |
| try { |
| doStop(metadataProvider, appCtx.getActiveProperties().getActiveStopTimeout(), TIMEOUT_UNIT); |
| } catch (Exception e) { |
| setState(ActivityState.STOPPED); |
| LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e); |
| throw HyracksDataException.create(e); |
| } finally { |
| setRunning(metadataProvider, false); |
| } |
| } else { |
| throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); |
| } |
| this.stats = DEFAULT_ACTIVE_STATS; |
| notifySubscribers(statsUpdatedEvent); |
| } |
| |
| public RecoveryTask getRecoveryTask() { |
| return rt; |
| } |
| |
| @Override |
| public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { |
| WaitForStateSubscriber subscriber; |
| Future<Void> suspendTask; |
| synchronized (this) { |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "suspending entity " + entityId); |
| LOGGER.log(level, "Waiting for ongoing activities"); |
| } |
| waitForNonTransitionState(); |
| if (LOGGER.isEnabled(level)) { |
| LOGGER.log(level, "Proceeding with suspension. Current state is " + state); |
| } |
| if (state == ActivityState.STOPPED) { |
| suspended = true; |
| return; |
| } |
| if (state == ActivityState.SUSPENDED) { |
| throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_SUSPENDED, entityId, state); |
| } |
| if (state == ActivityState.TEMPORARILY_FAILED) { |
| suspended = true; |
| setState(ActivityState.SUSPENDED); |
| return; |
| } |
| setState(ActivityState.SUSPENDING); |
| subscriber = new WaitForStateSubscriber(this, |
| EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED)); |
| suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() |
| .getExecutor().submit(() -> { |
| doSuspend(metadataProvider); |
| return null; |
| }); |
| LOGGER.log(level, "Suspension task has been submitted"); |
| } |
| try { |
| LOGGER.log(level, "Waiting for suspension task to complete"); |
| suspendTask.get(); |
| LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); |
| subscriber.sync(); |
| suspended = true; |
| } catch (Exception e) { |
| synchronized (this) { |
| if (LOGGER.isErrorEnabled()) { |
| LOGGER.log(Level.ERROR, "Failure while waiting for " + entityId + " to become suspended", e); |
| } |
| // failed to suspend |
| if (state == ActivityState.SUSPENDING) { |
| if (jobId != null) { |
| // job is still running |
| // restore state |
| setState(prevState); |
| } else { |
| setState(ActivityState.STOPPED); |
| } |
| } |
| throw HyracksDataException.create(e); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException { |
| if (state == ActivityState.STOPPED) { |
| suspended = false; |
| notifyAll(); |
| return; |
| } |
| if (state != ActivityState.SUSPENDED && state != ActivityState.TEMPORARILY_FAILED) { |
| throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, entityId, state); |
| } |
| try { |
| if (prevState == ActivityState.TEMPORARILY_FAILED) { |
| setState(ActivityState.TEMPORARILY_FAILED); |
| return; |
| } |
| setState(ActivityState.RESUMING); |
| rt = new RecoveryTask(appCtx, this, retryPolicyFactory); |
| try { |
| rt.resumeOrRecover(metadataProvider); |
| } catch (Exception e) { |
| LOGGER.log(Level.WARN, "Failure while attempting to resume " + entityId, e); |
| } |
| } finally { |
| suspended = false; |
| notifyAll(); |
| } |
| } |
| |
| @Override |
| public boolean isActive() { |
| return state != ActivityState.STOPPED && state != ActivityState.CANCELLING; |
| } |
| |
| @Override |
| public void unregister() throws HyracksDataException { |
| handler.unregisterListener(this); |
| } |
| |
| public void setLocations(AlgebricksAbsolutePartitionConstraint locations) { |
| this.locations = locations; |
| } |
| |
| @Override |
| public Exception getJobFailure() { |
| return jobFailure; |
| } |
| |
| @Override |
| public Set<Dataset> getDatasets() { |
| return datasets; |
| } |
| |
| @Override |
| public synchronized void replace(Dataset dataset) { |
| if (getDatasets().contains(dataset)) { |
| getDatasets().remove(dataset); |
| getDatasets().add(dataset); |
| } |
| } |
| |
| @Override |
| public String getDisplayName() throws HyracksDataException { |
| return this.getEntityId().toString(); |
| } |
| |
| @Override |
| public synchronized boolean isSuspended() { |
| return suspended; |
| } |
| |
| protected RecoveryTask createRecoveryTask() { |
| return new RecoveryTask(appCtx, this, retryPolicyFactory); |
| } |
| |
| public void acquireSuspendLocks(MetadataProvider metadataProvider, Dataset targetDataset) |
| throws AlgebricksException { |
| // write lock the listener |
| // exclusive lock all the datasets (except the target dataset) |
| IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); |
| DataverseName dataverseName = entityId.getDataverseName(); |
| String entityName = entityId.getEntityName(); |
| lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), entityId.getDatabaseName(), dataverseName, |
| entityName); |
| acquireSuspendDatasetsLocks(metadataProvider, lockManager, targetDataset); |
| } |
| |
| protected void acquireSuspendDatasetsLocks(MetadataProvider metadataProvider, IMetadataLockManager lockManager, |
| Dataset targetDataset) throws AlgebricksException { |
| for (Dataset dataset : getDatasets()) { |
| if (targetDataset != null && targetDataset.equals(dataset)) { |
| // DDL operation already acquired the proper lock for the operation |
| continue; |
| } |
| lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataset.getDatabaseName(), |
| dataset.getDataverseName(), dataset.getDatasetName()); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\"," |
| + "\"state\":\"" + state + "\"" + "}"; |
| } |
| } |