blob: bdc17b783b4be6a0559c9a37198a1de66b420953 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.common.util.TruncatedBinaryBackoff;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.BatchWorker.NoResult;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Binding module for scheduling logic and higher-level state management.
*/
public class UpdaterModule extends AbstractModule {
private static final Logger LOG = LoggerFactory.getLogger(UpdaterModule.class);
@Parameters(separators = "=")
public static class Options {
@Parameter(names = "-enable_update_affinity",
description = "Enable best-effort affinity of task updates.",
arity = 1)
public boolean enableAffinity = false;
@Parameter(names = "-update_affinity_reservation_hold_time",
description = "How long to wait for a reserved agent to reoffer freed up resources.")
public TimeAmount affinityExpiration = new TimeAmount(3L, Time.MINUTES);
@Parameter(names = "-sla_aware_action_max_batch_size",
description = "The maximum number of sla aware update actions that can be processed"
+ " in a batch.")
public int slaAwareActionMaxBatchSize = 300;
@Parameter(names = "-sla_aware_kill_retry_min_delay",
description = "Minimum amount of time to wait between attempting to perform an SLA-Aware"
+ " kill on a task.")
public TimeAmount slaAwareKillRetryMinDelay = new TimeAmount(1L, Time.MINUTES);
@Parameter(names = "-sla_aware_kill_retry_max_delay",
description = "Maximum amount of time to wait between attempting to perform an SLA-Aware"
+ " kill on a task.")
public TimeAmount slaAwareKillRetryMaxDelay = new TimeAmount(5L, Time.MINUTES);
}
private final ScheduledExecutorService executor;
private final Optional<UpdateActionBatchWorker> batchWorker;
private final Options options;
public UpdaterModule(Options options) {
this(
AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG),
Optional.empty(),
options);
}
@VisibleForTesting
UpdaterModule(ScheduledExecutorService executor,
Optional<UpdateActionBatchWorker> batchWorker,
Options options) {
this.executor = Objects.requireNonNull(executor);
this.batchWorker = batchWorker;
this.options = options;
}
@Override
protected void configure() {
install(new PrivateModule() {
@Override
protected void configure() {
if (options.enableAffinity) {
bind(BiCacheSettings.class).toInstance(
new BiCacheSettings(options.affinityExpiration, "update_affinity"));
bind(new TypeLiteral<BiCache<IInstanceKey, TaskGroupKey>>() { }).in(Singleton.class);
bind(UpdateAgentReserver.class).to(UpdateAgentReserver.UpdateAgentReserverImpl.class);
bind(UpdateAgentReserver.UpdateAgentReserverImpl.class).in(Singleton.class);
} else {
bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);
}
expose(UpdateAgentReserver.class);
bind(BackoffStrategy.class).toInstance(new TruncatedBinaryBackoff(
options.slaAwareKillRetryMinDelay,
options.slaAwareKillRetryMaxDelay));
bind(SlaKillController.class).in(Singleton.class);
bind(ScheduledExecutorService.class).toInstance(executor);
bind(UpdateFactory.class).to(UpdateFactory.UpdateFactoryImpl.class);
bind(UpdateFactory.UpdateFactoryImpl.class).in(Singleton.class);
bind(JobUpdateController.class).to(JobUpdateControllerImpl.class);
bind(JobUpdateControllerImpl.class).in(Singleton.class);
expose(JobUpdateController.class);
bind(JobUpdateEventSubscriber.class);
expose(JobUpdateEventSubscriber.class);
}
});
if (batchWorker.isPresent()) {
bind(UpdateActionBatchWorker.class).toInstance(batchWorker.get());
} else {
bind(UpdateActionBatchWorker.class).in(Singleton.class);
}
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
.to(UpdateActionBatchWorker.class);
PubsubEventModule.bindSubscriber(binder(), JobUpdateEventSubscriber.class);
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
.to(JobUpdateEventSubscriber.class);
}
public static class UpdateActionBatchWorker extends BatchWorker<NoResult> {
@Inject
UpdateActionBatchWorker(CliOptions options, Storage storage, StatsProvider statsProvider) {
super(storage, statsProvider, options.updater.slaAwareActionMaxBatchSize);
}
@Override
protected String serviceName() {
return "UpdateActionBatchWorker";
}
}
}