blob: 6ee70337252a81c7e9605d1f8bdd567204b5c5bb [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.system.PackingPlans;
import com.twitter.heron.proto.system.PhysicalPlans;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoDeserializer;
import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.statemgr.IStateManager;
import com.twitter.heron.spi.statemgr.Lock;
import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import com.twitter.heron.spi.utils.NetworkUtils;
import com.twitter.heron.spi.utils.TMasterException;
import com.twitter.heron.spi.utils.TMasterUtils;
import static com.twitter.heron.api.Config.TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS;
import static com.twitter.heron.api.Config.TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS;
/**
* Class that is able to update a topology. This includes changing the parallelism of
* topology components
*/
public class UpdateTopologyManager implements Closeable {
private static final Logger LOG = Logger.getLogger(UpdateTopologyManager.class.getName());
private Config config;
private Config runtime;
private Optional<IScalable> scalableScheduler;
private PackingPlanProtoDeserializer deserializer;
private ScheduledThreadPoolExecutor reactivateExecutorService;
public UpdateTopologyManager(Config config, Config runtime,
Optional<IScalable> scalableScheduler) {
this.config = config;
this.runtime = runtime;
this.scalableScheduler = scalableScheduler;
this.deserializer = new PackingPlanProtoDeserializer();
this.reactivateExecutorService = new ScheduledThreadPoolExecutor(1);
this.reactivateExecutorService.setMaximumPoolSize(1);
}
@Override
public void close() {
this.reactivateExecutorService.shutdownNow();
}
/**
* Scales the topology out or in based on the proposedPackingPlan
*
* @param existingProtoPackingPlan the current plan. If this isn't what's found in the state
* manager, the update will fail
* @param proposedProtoPackingPlan packing plan to change the topology to
*/
public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan)
throws ExecutionException, InterruptedException, ConcurrentModificationException {
String topologyName = Runtime.topologyName(runtime);
SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
Lock lock = stateManager.getLock(topologyName, IStateManager.LockName.UPDATE_TOPOLOGY);
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
PackingPlans.PackingPlan foundPackingPlan = getPackingPlan(stateManager, topologyName);
if (!deserializer.fromProto(existingProtoPackingPlan)
.equals(deserializer.fromProto(foundPackingPlan))) {
throw new ConcurrentModificationException(String.format(
"The packing plan in state manager is not the same as the submitted existing "
+ "packing plan for topology %s. Another actor has changed it and has likely"
+ "performed an update on it. Failing this request, try again once other "
+ "update is complete", topologyName));
}
updateTopology(existingProtoPackingPlan, proposedProtoPackingPlan, stateManager);
} finally {
lock.unlock();
}
} else {
throw new ConcurrentModificationException(String.format(
"The update lock can not be obtained for topology %s. Another actor is performing an "
+ "update on it. Failing this request, try again once current update is complete",
topologyName));
}
}
private void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan,
SchedulerStateManagerAdaptor stateManager)
throws ExecutionException, InterruptedException {
String topologyName = Runtime.topologyName(runtime);
PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);
Preconditions.checkArgument(proposedPackingPlan.getContainers().size() > 0, String.format(
"proposed packing plan must have at least 1 container %s", proposedPackingPlan));
ContainerDelta containerDelta = new ContainerDelta(
existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
int newContainerCount = containerDelta.getContainersToAdd().size();
int removableContainerCount = containerDelta.getContainersToRemove().size();
String message = String.format("Topology change requires %s new containers and removing %s "
+ "existing containers, but the scheduler does not support scaling, aborting. "
+ "Existing packing plan: %s, proposed packing plan: %s",
newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
Preconditions.checkState(newContainerCount + removableContainerCount == 0
|| scalableScheduler.isPresent(), message);
TopologyAPI.Topology topology = getTopology(stateManager, topologyName);
boolean initiallyRunning = topology.getState() == TopologyAPI.TopologyState.RUNNING;
// deactivate and sleep
if (initiallyRunning) {
// Update the topology since the state should have changed from RUNNING to PAUSED
// Will throw exceptions internally if tmaster fails to deactivate
deactivateTopology(stateManager, topology, proposedPackingPlan);
}
Set<PackingPlan.ContainerPlan> updatedContainers =
new HashSet<>(proposedPackingPlan.getContainers());
// request new resources if necessary. Once containers are allocated we should make the changes
// to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
if (newContainerCount > 0 && scalableScheduler.isPresent()) {
Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
Set<PackingPlan.ContainerPlan> containersAdded =
scalableScheduler.get().addContainers(containersToAdd);
// Update the PackingPlan with new container-ids
if (containersAdded != null) {
updatedContainers.removeAll(containersToAdd);
updatedContainers.addAll(containersAdded);
}
}
PackingPlan updatedPackingPlan =
new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);
// update packing plan to trigger the scaling event
logInfo("Update new PackingPlan: %s",
stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));
// reactivate topology
if (initiallyRunning) {
// wait before reactivating to give the tmaster a chance to receive the packing update and
// delete the packing plan. Instead we could message tmaster to invalidate the physical plan
// and/or possibly even update the packing plan directly
SysUtils.sleep(Duration.ofSeconds(10));
// Will throw exceptions internally if tmaster fails to deactivate
reactivateTopology(stateManager, topology, removableContainerCount);
}
if (removableContainerCount > 0 && scalableScheduler.isPresent()) {
scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
}
}
@VisibleForTesting
void deactivateTopology(SchedulerStateManagerAdaptor stateManager,
final TopologyAPI.Topology topology,
PackingPlan proposedPackingPlan)
throws InterruptedException, TMasterException {
List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
long deactivateSleepSeconds = TopologyUtils.getConfigWithDefault(
topologyConfig, TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS, 0L);
logInfo("Deactivating topology %s before handling update request", topology.getName());
NetworkUtils.TunnelConfig tunnelConfig =
NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.SCHEDULER);
TMasterUtils.transitionTopologyState(
topology.getName(), TMasterUtils.TMasterCommand.DEACTIVATE, stateManager,
TopologyAPI.TopologyState.RUNNING, TopologyAPI.TopologyState.PAUSED, tunnelConfig);
if (deactivateSleepSeconds > 0) {
logInfo("Deactivated topology %s. Sleeping for %d seconds before handling update request",
topology.getName(), deactivateSleepSeconds);
Thread.sleep(deactivateSleepSeconds * 1000);
} else {
logInfo("Deactivated topology %s.", topology.getName());
}
}
@VisibleForTesting
void reactivateTopology(SchedulerStateManagerAdaptor stateManager,
TopologyAPI.Topology topology,
int removableContainerCount)
throws ExecutionException, InterruptedException {
List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
long waitSeconds = TopologyUtils.getConfigWithDefault(
topologyConfig, TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS, 10 * 60L);
long delaySeconds = 10;
logInfo("Waiting for physical plan to be set before re-activating topology %s. "
+ "Will wait up to %s seconds for packing plan to be reset",
topology.getName(), waitSeconds);
Enabler enabler = new Enabler(stateManager, topology, waitSeconds, removableContainerCount);
Future<?> future = this.reactivateExecutorService
.scheduleWithFixedDelay(enabler, 0, delaySeconds, TimeUnit.SECONDS);
enabler.setFutureRunnable(future);
try {
future.get(waitSeconds, TimeUnit.SECONDS);
} catch (CancellationException e) {
LOG.fine("Task to re-enable was cancelled.");
} catch (TimeoutException e) {
throw new ExecutionException("Timeout waiting for topology to be enabled.", e);
}
}
private final class Enabler implements Runnable {
private SchedulerStateManagerAdaptor stateManager;
private String topologyName;
private int removableContainerCount;
private long timeoutTime;
private Future<?> futureRunnable;
private volatile boolean cancelled = false;
private Enabler(SchedulerStateManagerAdaptor stateManager,
TopologyAPI.Topology topology,
long timeoutSeconds,
int removableContainerCount) {
this.stateManager = stateManager;
this.removableContainerCount = removableContainerCount;
this.topologyName = topology.getName();
this.timeoutTime = System.currentTimeMillis() + timeoutSeconds * 1000;
}
private synchronized void setFutureRunnable(Future<?> futureRunnable) {
this.futureRunnable = futureRunnable;
if (this.cancelled) {
cancel();
}
}
private void cancel() {
this.cancelled = true;
if (this.futureRunnable != null && !this.futureRunnable.isCancelled()) {
logInfo("Cancelling Topology reactivation task for topology %s", topologyName);
this.futureRunnable.cancel(true);
}
}
@Override
public synchronized void run() {
PhysicalPlans.PhysicalPlan physicalPlan = stateManager.getPhysicalPlan(topologyName);
if (physicalPlan != null) {
logInfo("Received physical plan for topology %s. "
+ "Reactivating topology after scaling event", topologyName);
NetworkUtils.TunnelConfig tunnelConfig =
NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.SCHEDULER);
try {
TMasterUtils.transitionTopologyState(
topologyName, TMasterUtils.TMasterCommand.ACTIVATE, stateManager,
TopologyAPI.TopologyState.PAUSED, TopologyAPI.TopologyState.RUNNING, tunnelConfig);
} catch (TMasterException e) {
if (removableContainerCount < 1) {
throw new TopologyRuntimeManagementException(String.format(
"Topology reactivation failed for topology %s after topology update",
topologyName), e);
} else {
throw new TopologyRuntimeManagementException(String.format(
"Topology reactivation failed for topology %s after topology "
+ "update but before releasing %d no longer used containers",
topologyName, removableContainerCount), e);
}
} finally {
cancel();
}
}
if (System.currentTimeMillis() > this.timeoutTime) {
LOG.warning(String.format("New physical plan not received within configured timeout for "
+ "topology %s. Not reactivating", topologyName));
cancel();
} else {
logInfo("Couldn't fetch physical plan for topology %s. This is probably because stream "
+ "managers are still registering with TMaster. Will sleep and try again",
topologyName);
}
}
}
@VisibleForTesting
PackingPlans.PackingPlan getPackingPlan(SchedulerStateManagerAdaptor stateManager,
String topologyName) {
return stateManager.getPackingPlan(topologyName);
}
/**
* Returns the topology. It's key that we get the topology from the physical plan to reflect any
* state changes since launch. The stateManager.getTopology(name) method returns the topology from
* the time of submission. See additional commentary in topology.proto and physical_plan.proto.
*/
@VisibleForTesting
TopologyAPI.Topology getTopology(SchedulerStateManagerAdaptor stateManager, String topologyName) {
return stateManager.getPhysicalPlan(topologyName).getTopology();
}
/**
* Given both a current and proposed set of containers, determines the set of containers to be
* added and those to be removed. Whether to add or remove a container is determined by the id of
* the container. Proposed containers with an id not in the existing set are to be added, while
* current container ids not in the proposed set are to be removed.
*
* It is important to note that the container comparison is done by id only, and does not include
* the InstancePlans in the container, which for a given container might change in the proposed
* plan. Changing the size of a container is not supported and will be ignored.
*/
@VisibleForTesting
static class ContainerDelta {
private final Set<PackingPlan.ContainerPlan> containersToAdd;
private final Set<PackingPlan.ContainerPlan> containersToRemove;
@VisibleForTesting
ContainerDelta(Set<PackingPlan.ContainerPlan> currentContainers,
Set<PackingPlan.ContainerPlan> proposedContainers) {
Set<Integer> currentContainerIds = toIdSet(currentContainers);
Set<Integer> proposedContainerIds = toIdSet(proposedContainers);
Set<PackingPlan.ContainerPlan> toAdd = new HashSet<>();
for (PackingPlan.ContainerPlan proposedContainerPlan : proposedContainers) {
if (!currentContainerIds.contains(proposedContainerPlan.getId())) {
toAdd.add(proposedContainerPlan);
}
}
this.containersToAdd = Collections.unmodifiableSet(toAdd);
Set<PackingPlan.ContainerPlan> toRemove = new HashSet<>();
for (PackingPlan.ContainerPlan currentContainerPlan : currentContainers) {
if (!proposedContainerIds.contains(currentContainerPlan.getId())) {
toRemove.add(currentContainerPlan);
}
}
this.containersToRemove = Collections.unmodifiableSet(toRemove);
}
@VisibleForTesting
Set<PackingPlan.ContainerPlan> getContainersToRemove() {
return containersToRemove;
}
@VisibleForTesting
Set<PackingPlan.ContainerPlan> getContainersToAdd() {
return containersToAdd;
}
}
private static Set<Integer> toIdSet(Set<PackingPlan.ContainerPlan> containers) {
Set<Integer> currentContainerMap = new HashSet<>();
for (PackingPlan.ContainerPlan container : containers) {
currentContainerMap.add(container.getId());
}
return currentContainerMap;
}
private static void logInfo(String format, Object... values) {
LOG.info(String.format(format, values));
}
private static void logFine(String format, Object... values) {
LOG.fine(String.format(format, values));
}
}