blob: 3ea6e5c799bfb55a045bcbc6635b9f532c47d412 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.offers;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.List;
import javax.inject.Qualifier;
import javax.inject.Singleton;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.Random;
import org.apache.aurora.scheduler.app.MoreModules;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.splitters.CommaSplitter;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Binding module for resource offer management.
*/
public class OfferManagerModule extends AbstractModule {
private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class);
@Parameters(separators = "=")
public static class Options {
@Parameter(names = "-hold_offers_forever",
description =
"Hold resource offers indefinitely, disabling automatic offer decline settings.",
arity = 1)
public boolean holdOffersForever = false;
@Parameter(names = "-min_offer_hold_time",
validateValueWith = NotNegativeAmount.class,
description = "Minimum amount of time to hold a resource offer before declining.")
public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
@Parameter(names = "-offer_hold_jitter_window",
validateValueWith = NotNegativeAmount.class,
description = "Maximum amount of random jitter to add to the offer hold time window.")
public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
@Parameter(names = "-offer_filter_duration",
description =
"Duration after which we expect Mesos to re-offer unused resources. A short duration "
+ "improves scheduling performance in smaller clusters, but might lead to resource "
+ "starvation for other frameworks if you run many frameworks in your cluster.")
public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
@Parameter(names = "-unavailability_threshold",
description =
"Threshold time, when running tasks should be drained from a host, before a host "
+ "becomes unavailable. Should be greater than min_offer_hold_time + "
+ "offer_hold_jitter_window.")
public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
@Parameter(names = "-offer_order",
description =
"Iteration order for offers, to influence task scheduling. Multiple orderings will be "
+ "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
+ " then memory and finally would randomize any equal offers.",
splitter = CommaSplitter.class)
public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
@Parameter(names = "-offer_set_module",
description = "Custom Guice module to provide a custom OfferSet.")
@SuppressWarnings("rawtypes")
public Class offerSetModule = OfferSetModule.class;
@Parameter(names = "-offer_static_ban_cache_max_size",
validateValueWith = NotNegativeNumber.class,
description =
"The number of offers to hold in the static ban cache. If no value is specified, "
+ "the cache will grow indefinitely. However, entries will expire within "
+ "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
}
/**
* Binding annotation for the threshold to veto tasks with unavailability.
*/
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
public @interface UnavailabilityThreshold { }
public static class OfferSetModule extends AbstractModule {
private final CliOptions options;
public OfferSetModule(CliOptions options) {
this.options = options;
}
@Override
protected void configure() {
install(new PrivateModule() {
@Override
protected void configure() {
bind(new TypeLiteral<Ordering<HostOffer>>() { })
.toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
bind(OfferSetImpl.class).in(Singleton.class);
bind(OfferSet.class).to(OfferSetImpl.class);
expose(OfferSet.class);
}
});
}
}
private final CliOptions cliOptions;
public OfferManagerModule(CliOptions cliOptions) {
this.cliOptions = cliOptions;
}
@Override
protected void configure() {
Options options = cliOptions.offer;
if (!options.holdOffersForever) {
long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+ options.minOfferHoldTime.as(Time.SECONDS);
if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+ " and offer_hold_jitter_window ({}). This creates risks of races between "
+ "launching and draining",
options.unavailabilityThreshold,
options.minOfferHoldTime,
options.offerHoldJitterWindow);
}
}
install(MoreModules.instantiate(options.offerSetModule, cliOptions));
bind(new TypeLiteral<Amount<Long, Time>>() { })
.annotatedWith(UnavailabilityThreshold.class)
.toInstance(options.unavailabilityThreshold);
install(new PrivateModule() {
@Override
protected void configure() {
if (options.holdOffersForever) {
bind(Deferment.class).to(Deferment.Noop.class);
} else {
bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
new RandomJitterReturnDelay(
options.minOfferHoldTime.as(Time.MILLISECONDS),
options.offerHoldJitterWindow.as(Time.MILLISECONDS),
Random.Util.newDefaultRandom()));
bind(Deferment.class).to(Deferment.DelayedDeferment.class);
}
bind(OfferManager.class).to(OfferManagerImpl.class);
bind(OfferManagerImpl.class).in(Singleton.class);
expose(OfferManager.class);
}
});
PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
}
@Provides
@Singleton
OfferSettings provideOfferSettings(OfferSet offerSet) {
// We have a dual eviction strategy for the static ban cache in OfferManager that is based on
// both maximum size of the cache and the length an offer is valid. We do this in order to
// satisfy requirements in both single- and multi-framework environments. If offers are held for
// a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
// longest it will be valid for. Additionally, cluster operators will most likely not have to
// worry about cache size in this case as this behavior mimics current behavior. If offers are
// held indefinitely, then we never expire cache entries but the cluster operator can specify a
// maximum size to avoid a memory leak.
long maxOfferHoldTime;
if (cliOptions.offer.holdOffersForever) {
maxOfferHoldTime = Long.MAX_VALUE;
} else {
maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
+ cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
}
return new OfferSettings(
cliOptions.offer.offerFilterDuration,
offerSet,
Amount.of(maxOfferHoldTime, Time.SECONDS),
cliOptions.offer.offerStaticBanCacheMaxSize,
Ticker.systemTicker());
}
}