| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.core.mgmt.internal; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Lists; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.ManagementContext; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.entity.trait.Startable; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; |
| import org.apache.brooklyn.util.javalang.Threads; |
| import org.apache.brooklyn.util.time.CountdownTimer; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| public class BrooklynShutdownHooks { |
| |
| private static final Logger log = LoggerFactory.getLogger(BrooklynShutdownHooks.class); |
| |
| private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.TWO_MINUTES; |
| |
| private static final AtomicBoolean isShutdownHookRegistered = new AtomicBoolean(); |
| private static final List<Entity> entitiesToStopOnShutdown = Lists.newArrayList(); |
| private static final List<ManagementContext> managementContextsToStopAppsOnShutdown = Lists.newArrayList(); |
| private static final List<ManagementContext> managementContextsToTerminateOnShutdown = Lists.newArrayList(); |
| private static final AtomicBoolean isShutDown = new AtomicBoolean(false); |
| |
| // private static final Object mutex = new Object(); |
| private static final Semaphore semaphore = new Semaphore(1); |
| |
| /** |
| * Max time to wait for shutdown to complete, when stopping the entities from {@link #invokeStopOnShutdown(Entity)}. |
| * Default is two minutes - deliberately long because stopping cloud VMs can often take a minute. |
| */ |
| private static volatile Duration shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; |
| |
| public static void setShutdownTimeout(Duration val) { |
| shutdownTimeout = val; |
| } |
| |
| public static void invokeStopOnShutdown(Entity entity) { |
| if (!(entity instanceof Startable)) { |
| log.warn("Not adding entity {} for stop-on-shutdown as not an instance of {}", entity, Startable.class.getSimpleName()); |
| return; |
| } |
| try { |
| semaphore.acquire(); |
| if (isShutDown.get()) { |
| semaphore.release(); |
| try { |
| log.warn("Call to invokeStopOnShutdown for "+entity+" while system already shutting down; invoking stop now and throwing exception"); |
| Entities.destroy(entity, false); |
| throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down"); |
| } catch (Exception e) { |
| throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down, had error: "+e, e); |
| } |
| } |
| |
| try { |
| // TODO should be a weak reference in case it is destroyed before shutdown |
| // (only applied to certain entities started via launcher so not a big leak) |
| entitiesToStopOnShutdown.add(entity); |
| } finally { |
| semaphore.release(); |
| } |
| addShutdownHookIfNotAlready(); |
| |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| public static void resetShutdownFlag() { |
| try { |
| semaphore.acquire(); |
| isShutDown.compareAndSet(true, false); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException("Could not reset shutdown flag", e); |
| } finally { |
| semaphore.release(); |
| } |
| } |
| |
| public static void invokeStopAppsOnShutdown(ManagementContext managementContext) { |
| try { |
| semaphore.acquire(); |
| if (isShutDown.get()) { |
| semaphore.release(); |
| try { |
| log.warn("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception"); |
| destroyAndWait(managementContext.getApplications(), shutdownTimeout); |
| |
| throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down"); |
| } catch (Exception e) { |
| throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e); |
| } |
| } |
| |
| // TODO weak reference, as per above |
| managementContextsToStopAppsOnShutdown.add(managementContext); |
| semaphore.release(); |
| addShutdownHookIfNotAlready(); |
| |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| public static void invokeTerminateOnShutdown(ManagementContext managementContext) { |
| try { |
| semaphore.acquire(); |
| if (isShutDown.get()) { |
| semaphore.release(); |
| try { |
| log.warn("Call to invokeStopOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception"); |
| ((ManagementContextInternal)managementContext).terminate(); |
| throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down"); |
| } catch (Exception e) { |
| throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e); |
| } |
| } |
| |
| // TODO weak reference, as per above |
| managementContextsToTerminateOnShutdown.add(managementContext); |
| semaphore.release(); |
| addShutdownHookIfNotAlready(); |
| |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| private static void addShutdownHookIfNotAlready() { |
| if (isShutdownHookRegistered.compareAndSet(false, true)) { |
| Threads.addShutdownHook(BrooklynShutdownHookJob.newInstanceForReal()); |
| } |
| } |
| |
| @VisibleForTesting |
| public static class BrooklynShutdownHookJob implements Runnable { |
| |
| final boolean setStaticShutDownFlag; |
| |
| private BrooklynShutdownHookJob(boolean setStaticShutDownFlag) { |
| this.setStaticShutDownFlag = setStaticShutDownFlag; |
| } |
| |
| public static BrooklynShutdownHookJob newInstanceForReal() { |
| return new BrooklynShutdownHookJob(true); |
| } |
| |
| /** testing instance does not actually set the `isShutDown` bit */ |
| public static BrooklynShutdownHookJob newInstanceForTesting() { |
| return new BrooklynShutdownHookJob(false); |
| } |
| |
| @Override |
| public void run() { |
| // First stop entities; on interrupt, abort waiting for tasks - but let shutdown hook continue |
| Set<Entity> entitiesToStop = MutableSet.of(); |
| try { |
| semaphore.acquire(); |
| if (setStaticShutDownFlag) |
| isShutDown.set(true); |
| semaphore.release(); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| entitiesToStop.addAll(entitiesToStopOnShutdown); |
| for (ManagementContext mgmt: managementContextsToStopAppsOnShutdown) { |
| if (mgmt.isRunning()) { |
| entitiesToStop.addAll(mgmt.getApplications()); |
| } |
| } |
| |
| if (entitiesToStop.isEmpty()) { |
| log.debug("Brooklyn shutdown: no entities to stop"); |
| } else { |
| log.info("Brooklyn shutdown: stopping entities "+entitiesToStop); |
| destroyAndWait(entitiesToStop, shutdownTimeout); |
| } |
| |
| // Then terminate management contexts |
| log.debug("Brooklyn terminateOnShutdown shutdown-hook invoked: terminating management contexts: "+managementContextsToTerminateOnShutdown); |
| for (ManagementContext managementContext: managementContextsToTerminateOnShutdown) { |
| try { |
| if (!managementContext.isRunning()) |
| continue; |
| ((ManagementContextInternal)managementContext).terminate(); |
| } catch (RuntimeException e) { |
| log.info("terminateOnShutdown of "+managementContext+" returned error (continuing): "+e, e); |
| } |
| } |
| } |
| } |
| |
| protected static void destroyAndWait(Iterable<? extends Entity> entitiesToStop, Duration timeout) { |
| MutableList<Task<?>> stops = MutableList.of(); |
| for (Entity entityToStop: entitiesToStop) { |
| final Entity entity = entityToStop; |
| if (!Entities.isManaged(entity)) continue; |
| Task<Object> t = Tasks.builder().dynamic(false).displayName("destroying "+entity).body(new Runnable() { |
| @Override public void run() { Entities.destroy(entity, false); } |
| }).build(); |
| stops.add( ((EntityInternal)entity).getExecutionContext().submit(t) ); |
| } |
| CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout); |
| for (Task<?> t: stops) { |
| try { |
| Duration durationRemaining = timer.getDurationRemaining(); |
| Object result = t.getUnchecked(durationRemaining.isPositive() ? durationRemaining : Duration.ONE_MILLISECOND); |
| if (log.isDebugEnabled()) log.debug("stopOnShutdown of {} completed: {}", t, result); |
| } catch (RuntimeInterruptedException e) { |
| Thread.currentThread().interrupt(); |
| if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" interrupted: "+e); |
| break; |
| } catch (RuntimeException e) { |
| Exceptions.propagateIfFatal(e); |
| log.warn("Shutdown hook "+t+" returned error (continuing): "+e); |
| if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" returned error (continuing to stop others): "+e, e); |
| } |
| } |
| } |
| |
| } |