blob: a17014c29c562c109246b0414e53c2c405b855d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.autoscaling;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioningStrategy
{
private static final EmittingLogger log = new EmittingLogger(SimpleWorkerProvisioningStrategy.class);
private final SimpleWorkerProvisioningConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
@Inject
public SimpleWorkerProvisioningStrategy(
SimpleWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ProvisioningSchedulerConfig provisioningSchedulerConfig
)
{
this(
config,
workerConfigRef,
provisioningSchedulerConfig,
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return ScheduledExecutors.fixed(1, "SimpleResourceManagement-manager--%d");
}
}
);
}
public SimpleWorkerProvisioningStrategy(
SimpleWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ProvisioningSchedulerConfig provisioningSchedulerConfig,
Supplier<ScheduledExecutorService> execFactory
)
{
super(provisioningSchedulerConfig, execFactory);
this.config = config;
this.workerConfigRef = workerConfigRef;
}
@Override
public Provisioner makeProvisioner(WorkerTaskRunner runner)
{
return new SimpleProvisioner(runner);
}
private class SimpleProvisioner implements Provisioner
{
private final WorkerTaskRunner runner;
private final ScalingStats scalingStats = new ScalingStats(config.getNumEventsToTrack());
private final Set<String> currentlyProvisioning = new HashSet<>();
private final Set<String> currentlyTerminating = new HashSet<>();
private int targetWorkerCount = -1;
private DateTime lastProvisionTime = DateTimes.nowUtc();
private DateTime lastTerminateTime = lastProvisionTime;
SimpleProvisioner(WorkerTaskRunner runner)
{
this.runner = runner;
}
@Override
public synchronized boolean doProvision()
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
boolean didProvision = false;
final DefaultWorkerBehaviorConfig workerConfig =
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
if (workerConfig == null) {
return false;
}
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workers,
new Function<ImmutableWorkerInfo, String>()
{
@Override
public String apply(ImmutableWorkerInfo input)
{
return input.getWorker().getIp();
}
}
)
)
);
currentlyProvisioning.removeAll(workerNodeIds);
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) {
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
log.warn("NewNodes is empty, returning from provision loop");
break;
} else {
currentlyProvisioning.addAll(newNodes);
lastProvisionTime = DateTimes.nowUtc();
scalingStats.addProvisionEvent(provisioned);
want -= provisioned.getNodeIds().size();
didProvision = true;
}
}
if (!currentlyProvisioning.isEmpty()) {
Duration durSinceLastProvision = new Duration(lastProvisionTime, DateTimes.nowUtc());
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(currentlyProvisioning));
currentlyProvisioning.clear();
}
}
return didProvision;
}
@Override
public synchronized boolean doTerminate()
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
final DefaultWorkerBehaviorConfig workerConfig =
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
if (workerConfig == null) {
return false;
}
boolean didTerminate = false;
final Set<String> workerNodeIds = Sets.newHashSet(
workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
runner.getLazyWorkers(),
new Function<Worker, String>()
{
@Override
public String apply(Worker input)
{
return input.getIp();
}
}
)
)
)
);
currentlyTerminating.retainAll(workerNodeIds);
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<Worker, String>()
{
@Override
public String apply(Worker worker)
{
return worker.getIp();
}
}
);
if (laziestWorkerIps.isEmpty()) {
log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", excessWorkers);
} else {
log.info(
"Terminating %,d workers (wanted %,d): %s",
laziestWorkerIps.size(),
excessWorkers,
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = workerConfig.getAutoScaler()
.terminate(ImmutableList.copyOf(laziestWorkerIps));
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = DateTimes.nowUtc();
scalingStats.addTerminateEvent(terminated);
didTerminate = true;
}
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, DateTimes.nowUtc());
log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
}
return didTerminate;
}
private void updateTargetWorkerCount(
final DefaultWorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ImmutableWorkerInfo> zkWorkers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
zkWorkers,
ProvisioningUtil.createValidWorkerPredicate(config)
);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
if (minWorkerCount > maxWorkerCount) {
log.error("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", minWorkerCount, maxWorkerCount);
return;
}
if (targetWorkerCount < 0) {
// Initialize to size of current worker pool, subject to pool size limits
targetWorkerCount = Math.max(
Math.min(
zkWorkers.size(),
maxWorkerCount
),
minWorkerCount
);
log.info(
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
minWorkerCount,
maxWorkerCount
);
}
final boolean notTakingActions = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty();
final boolean shouldScaleUp = notTakingActions
&& validWorkers.size() >= targetWorkerCount
&& targetWorkerCount < maxWorkerCount
&& (hasTaskPendingBeyondThreshold(pendingTasks)
|| targetWorkerCount < minWorkerCount);
final boolean shouldScaleDown = notTakingActions
&& validWorkers.size() == targetWorkerCount
&& targetWorkerCount > minWorkerCount
&& Iterables.any(validWorkers, isLazyWorker);
if (shouldScaleUp) {
targetWorkerCount = Math.max(targetWorkerCount + 1, minWorkerCount);
log.info(
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
minWorkerCount,
maxWorkerCount
);
} else if (shouldScaleDown) {
targetWorkerCount = Math.min(targetWorkerCount - 1, maxWorkerCount);
log.info(
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
minWorkerCount,
maxWorkerCount
);
} else {
log.info(
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
minWorkerCount,
maxWorkerCount
);
}
}
private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> pendingTasks)
{
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
}
}
return false;
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}
}