blob: e0f870295a34bd9a80fe94f16c18b2c004ab3d3d [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.configuration;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.aurora.gen.Container;
import org.apache.aurora.gen.CountSlaPolicy;
import org.apache.aurora.gen.DockerParameter;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.PercentageSlaPolicy;
import org.apache.aurora.gen.SlaPolicy;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.UserProvidedStrings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IContainer;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IMesosContainer;
import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.GPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
/**
* Manages translation from a string-mapped configuration to a concrete configuration type, and
* defaults for optional values.
*
* TODO(William Farner): Add input validation to all fields (strings not empty, positive ints, etc).
*/
public class ConfigurationManager {
public static final String DEDICATED_ATTRIBUTE = "dedicated";
public static final String DEFAULT_ALLOWED_JOB_ENVIRONMENTS = "^(prod|devel|test|staging\\d*)$";
private interface Validator<T> {
void validate(T value) throws TaskDescriptionException;
}
private static class GreaterThan implements Validator<Number> {
private final double min;
private final String label;
GreaterThan(double min, String label) {
this.min = min;
this.label = label;
}
@Override
public void validate(Number value) throws TaskDescriptionException {
if (this.min >= value.doubleValue()) {
throw new TaskDescriptionException(label + " must be greater than " + this.min);
}
}
}
public static class ConfigurationManagerSettings {
private final ImmutableSet<Container._Fields> allowedContainerTypes;
private final boolean allowDockerParameters;
private final List<DockerParameter> defaultDockerParameters;
private final boolean requireDockerUseExecutor;
private final boolean allowGpuResource;
private final boolean enableMesosFetcher;
private final boolean allowContainerVolumes;
private final Pattern allowedJobEnvironments;
private final int minRequiredInstances;
private final long maxSlaDurationSecs;
public ConfigurationManagerSettings(
ImmutableSet<Container._Fields> allowedContainerTypes,
boolean allowDockerParameters,
List<DockerParameter> defaultDockerParameters,
boolean requireDockerUseExecutor,
boolean allowGpuResource,
boolean enableMesosFetcher,
boolean allowContainerVolumes,
int minRequiredInstances,
long maxSlaDurationSecs,
String allowedJobEnvironment) {
this.allowedContainerTypes = requireNonNull(allowedContainerTypes);
this.allowDockerParameters = allowDockerParameters;
this.defaultDockerParameters = requireNonNull(defaultDockerParameters);
this.requireDockerUseExecutor = requireDockerUseExecutor;
this.allowGpuResource = allowGpuResource;
this.enableMesosFetcher = enableMesosFetcher;
this.allowContainerVolumes = allowContainerVolumes;
this.allowedJobEnvironments = Pattern.compile(requireNonNull(allowedJobEnvironment));
this.minRequiredInstances = minRequiredInstances;
this.maxSlaDurationSecs = maxSlaDurationSecs;
}
}
private final ConfigurationManagerSettings settings;
private final TierManager tierManager;
private final ThriftBackfill thriftBackfill;
private final ExecutorSettings executorSettings;
@Inject
public ConfigurationManager(
ConfigurationManagerSettings settings,
TierManager tierManager,
ThriftBackfill thriftBackfill,
ExecutorSettings executorSettings) {
this.settings = requireNonNull(settings);
this.tierManager = requireNonNull(tierManager);
this.thriftBackfill = requireNonNull(thriftBackfill);
this.executorSettings = requireNonNull(executorSettings);
}
private static String getRole(IValueConstraint constraint) {
return Iterables.getOnlyElement(constraint.getValues()).split("/")[0];
}
private static boolean isValueConstraint(ITaskConstraint taskConstraint) {
return taskConstraint.getSetField() == TaskConstraint._Fields.VALUE;
}
public static boolean isDedicated(Iterable<IConstraint> taskConstraints) {
return Iterables.any(taskConstraints, getConstraintByName(DEDICATED_ATTRIBUTE));
}
@Nullable
private static IConstraint getDedicatedConstraint(ITaskConfig task) {
return Iterables.find(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE), null);
}
/**
* Check validity of and populates defaults in a job configuration. This will return a deep copy
* of the provided job configuration with default configuration values applied, and configuration
* map values sanitized and applied to their respective struct fields.
*
* @param job Job to validate and populate.
* @return A deep copy of {@code job} that has been populated.
* @throws TaskDescriptionException If the job configuration is invalid.
*/
public IJobConfiguration validateAndPopulate(IJobConfiguration job)
throws TaskDescriptionException {
requireNonNull(job);
if (!job.isSetTaskConfig()) {
throw new TaskDescriptionException("Job configuration must have taskConfig set.");
}
if (job.getInstanceCount() <= 0) {
throw new TaskDescriptionException("Instance count must be positive.");
}
JobConfiguration builder = job.newBuilder();
if (!JobKeys.isValid(job.getKey())) {
throw new TaskDescriptionException("Job key " + job.getKey() + " is invalid.");
}
if (!settings.allowedJobEnvironments.matcher(job.getKey().getEnvironment()).matches()) {
throw new TaskDescriptionException(String.format(
"Job environment %s doesn't match: %s", job.getKey().getEnvironment(),
settings.allowedJobEnvironments.toString()));
}
if (job.isSetOwner() && !UserProvidedStrings.isGoodIdentifier(job.getOwner().getUser())) {
throw new TaskDescriptionException(
"Job user contains illegal characters: " + job.getOwner().getUser());
}
builder.setTaskConfig(
validateAndPopulate(
ITaskConfig.build(builder.getTaskConfig()),
builder.getInstanceCount()).newBuilder());
// Only one of [service=true, cron_schedule] may be set.
if (!Strings.isNullOrEmpty(job.getCronSchedule()) && builder.getTaskConfig().isIsService()) {
throw new TaskDescriptionException(
"A service task may not be run on a cron schedule: " + builder);
}
return IJobConfiguration.build(builder);
}
/**
* Validates the {@link TaskConfig}'s {@link SlaPolicy}, if any.
*
* A valid {@link SlaPolicy} is one that allows at least 1 instance to be drained for a job.
*
* @param config {@link TaskConfig} to be validated.
* @param instanceCount number of instances in the job.
* @throws TaskDescriptionException thrown when {@link SlaPolicy} is not valid.
*/
private void validateSlaPolicy(
TaskConfig config,
int instanceCount) throws TaskDescriptionException {
if (config.isSetSlaPolicy()) {
if (tierManager.getTier(ITaskConfig.build(config)).isRevocable()
|| tierManager.getTier(ITaskConfig.build(config)).isPreemptible()) {
throw new TaskDescriptionException(String.format(
"Tier '%s' does not support SlaPolicy.",
config.getTier()));
}
SlaPolicy slaPolicy = config.getSlaPolicy();
if (!slaPolicy.isSetCoordinatorSlaPolicy()
&& instanceCount < settings.minRequiredInstances) {
throw new TaskDescriptionException(String.format(
"Job with fewer than %d instances cannot have Percentage/Count SlaPolicy.",
settings.minRequiredInstances));
}
if (slaPolicy.isSetCountSlaPolicy()) {
validateCountSlaPolicy(instanceCount, slaPolicy.getCountSlaPolicy());
}
if (slaPolicy.isSetPercentageSlaPolicy()) {
validatePercentageSlaPolicy(instanceCount, slaPolicy.getPercentageSlaPolicy());
}
}
}
private void validatePercentageSlaPolicy(
int instanceCount,
PercentageSlaPolicy slaPolicy) throws TaskDescriptionException {
if (slaPolicy.isSetPercentage()) {
double percentage = slaPolicy.getPercentage() / 100.0;
if (instanceCount - instanceCount * percentage < 1) {
throw new TaskDescriptionException(String.format(
"Current PercentageSlaPolicy: percentage=%f will not allow any instances to be killed. "
+ "Must be less than %f.",
slaPolicy.getPercentage(),
((double) (instanceCount - 1)) / instanceCount * 100.0));
}
}
if (slaPolicy.isSetDurationSecs()
&& slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) {
throw new TaskDescriptionException(String.format(
"PercentageSlaPolicy: durationSecs=%d must be less than cluster-wide maximum of %d secs.",
slaPolicy.getDurationSecs(),
settings.maxSlaDurationSecs));
}
}
private void validateCountSlaPolicy(
int instanceCount,
CountSlaPolicy slaPolicy) throws TaskDescriptionException {
if (slaPolicy.isSetCount()
&& instanceCount - slaPolicy.getCount() < 1) {
throw new TaskDescriptionException(String.format(
"Current CountSlaPolicy: count=%d will not allow any instances to be killed. "
+ "Must be less than instanceCount=%d.",
slaPolicy.getCount(),
instanceCount));
}
if (slaPolicy.isSetDurationSecs()
&& slaPolicy.getDurationSecs() > settings.maxSlaDurationSecs) {
throw new TaskDescriptionException(String.format(
"CountSlaPolicy: durationSecs=%d must be less than cluster-wide maximum of %d secs.",
slaPolicy.getDurationSecs(),
settings.maxSlaDurationSecs));
}
}
@VisibleForTesting
static final String NO_DOCKER_PARAMETERS =
"This scheduler is configured to disallow Docker parameters.";
@VisibleForTesting
static final String EXECUTOR_REQUIRED_WITH_DOCKER =
"This scheduler is configured to require an executor for Docker-based tasks.";
@VisibleForTesting
static final String MESOS_FETCHER_DISABLED =
"Mesos Fetcher for individual jobs is disabled in this cluster.";
@VisibleForTesting
public static final String NO_EXECUTOR_OR_CONTAINER = "Configuration may not be null.";
@VisibleForTesting
static final String INVALID_EXECUTOR_CONFIG = "Executor name may not be left unset.";
@VisibleForTesting
static final String NO_CONTAINER_VOLUMES =
"This scheduler is configured to disallow container volumes.";
/**
* Check validity of and populates defaults in a task configuration. This will return a deep copy
* of the provided task configuration with default configuration values applied, and configuration
* map values sanitized and applied to their respective struct fields.
*
*
* @param config Task config to validate and populate.
* @param instanceCount The number of instances in the job.
* @return A reference to the modified {@code config} (for chaining).
* @throws TaskDescriptionException If the task is invalid.
*/
public ITaskConfig validateAndPopulate(
ITaskConfig config,
int instanceCount) throws TaskDescriptionException {
TaskConfig builder = config.newBuilder();
if (config.isSetTier() && !UserProvidedStrings.isGoodIdentifier(config.getTier())) {
throw new TaskDescriptionException("Tier contains illegal characters: " + config.getTier());
}
if (!config.isSetTier()) {
String defaultTierName = tierManager.getDefaultTierName();
builder.setTier(defaultTierName);
builder.setProduction(tierManager.getTiers().get(defaultTierName).isProduction());
}
try {
// Explicitly set the production field as it is used for Quota calculations.
builder.setProduction(tierManager.getTier(config).isProduction());
} catch (IllegalArgumentException e) {
throw new TaskDescriptionException(e.getMessage(), e);
}
if (!JobKeys.isValid(config.getJob())) {
// Job key is set but invalid
throw new TaskDescriptionException("Job key " + config.getJob() + " is invalid.");
}
// A task must either have an executor configuration or specify a Docker container.
if (!builder.isSetExecutorConfig()
&& !(builder.isSetContainer() && builder.getContainer().isSetDocker())) {
throw new TaskDescriptionException(NO_EXECUTOR_OR_CONTAINER);
}
// Docker containers don't require executors, validate the rest
if (builder.isSetExecutorConfig()) {
if (!builder.getExecutorConfig().isSetName()) {
throw new TaskDescriptionException(INVALID_EXECUTOR_CONFIG);
}
executorSettings.getExecutorConfig(builder.getExecutorConfig().getName()).orElseThrow(
() -> new TaskDescriptionException("Configuration for executor '"
+ builder.getExecutorConfig().getName()
+ "' doesn't exist."));
}
IConstraint constraint = getDedicatedConstraint(config);
if (constraint != null) {
if (!isValueConstraint(constraint.getConstraint())) {
throw new TaskDescriptionException("A dedicated constraint must be of value type.");
}
IValueConstraint valueConstraint = constraint.getConstraint().getValue();
if (valueConstraint.getValues().size() != 1) {
throw new TaskDescriptionException("A dedicated constraint must have exactly one value");
}
String dedicatedRole = getRole(valueConstraint);
if (!("*".equals(dedicatedRole) || config.getJob().getRole().equals(dedicatedRole))) {
throw new TaskDescriptionException(
"Only " + dedicatedRole + " may use hosts dedicated for that role.");
}
}
Optional<Container._Fields> containerType;
if (config.isSetContainer()) {
IContainer containerConfig = config.getContainer();
containerType = Optional.of(containerConfig.getSetField());
if (containerConfig.isSetDocker()) {
if (!containerConfig.getDocker().isSetImage()) {
throw new TaskDescriptionException("A container must specify an image.");
}
if (containerConfig.getDocker().getParameters().isEmpty()) {
builder.getContainer().getDocker()
.setParameters(ImmutableList.copyOf(settings.defaultDockerParameters));
} else {
if (!settings.allowDockerParameters) {
throw new TaskDescriptionException(NO_DOCKER_PARAMETERS);
}
}
if (settings.requireDockerUseExecutor && !config.isSetExecutorConfig()) {
throw new TaskDescriptionException(EXECUTOR_REQUIRED_WITH_DOCKER);
}
}
} else {
// Default to mesos container type if unset.
containerType = Optional.of(Container._Fields.MESOS);
}
if (!containerType.isPresent()) {
throw new TaskDescriptionException("A job must have a container type.");
}
if (!settings.allowedContainerTypes.contains(containerType.get())) {
throw new TaskDescriptionException(
"This scheduler is not configured to allow the container type "
+ containerType.get().toString());
}
thriftBackfill.backfillTask(builder);
String types = config.getResources().stream()
.collect(Collectors.groupingBy(e -> ResourceType.fromResource(e)))
.entrySet().stream()
.filter(e -> !e.getKey().isMultipleAllowed() && e.getValue().size() > 1)
.map(r -> r.getKey().getAuroraName())
.sorted()
.collect(Collectors.joining(", "));
if (!Strings.isNullOrEmpty(types)) {
throw new TaskDescriptionException("Multiple resource values are not supported for " + types);
}
Validator<Number> cpuvalidator = new GreaterThan(0.0, "num_cpus");
cpuvalidator.validate(
ResourceManager.quantityOf(ResourceManager.getTaskResources(config, CPUS)));
Validator<Number> ramvalidator = new GreaterThan(0.0, "ram_mb");
ramvalidator.validate(
ResourceManager.quantityOf(ResourceManager.getTaskResources(config, RAM_MB)));
Validator<Number> diskvalidator = new GreaterThan(0.0, "disk_mb");
diskvalidator.validate(
ResourceManager.quantityOf(ResourceManager.getTaskResources(config, DISK_MB)));
if (!settings.allowGpuResource && config.getResources().stream()
.filter(r -> ResourceType.fromResource(r).equals(GPUS))
.findAny()
.isPresent()) {
throw new TaskDescriptionException("GPU resource support is disabled in this cluster.");
}
if (!settings.enableMesosFetcher && !config.getMesosFetcherUris().isEmpty()) {
throw new TaskDescriptionException(MESOS_FETCHER_DISABLED);
}
if (config.getContainer().isSetMesos()) {
IMesosContainer container = config.getContainer().getMesos();
if (!settings.allowContainerVolumes && !container.getVolumes().isEmpty()) {
throw new TaskDescriptionException(NO_CONTAINER_VOLUMES);
}
}
validateSlaPolicy(builder, instanceCount);
maybeFillLinks(builder);
return ITaskConfig.build(builder);
}
/**
* Provides a filter for the given constraint name.
*
* @param name The name of the constraint.
* @return A filter that matches the constraint.
*/
public static Predicate<IConstraint> getConstraintByName(final String name) {
return constraint -> constraint.getName().equals(name);
}
private static void maybeFillLinks(TaskConfig task) {
if (task.getTaskLinksSize() == 0) {
ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
for (IResource resource : ResourceManager.getTaskResources(ITaskConfig.build(task), PORTS)) {
if (resource.getNamedPort().equals("health")) {
links.put("health", "http://%host%:%port:health%");
} else if (resource.getNamedPort().equals("http")) {
links.put("http", "http://%host%:%port:http%");
}
}
task.setTaskLinks(links.build());
}
}
/**
* Thrown when an invalid task or job configuration is encountered.
*/
public static class TaskDescriptionException extends Exception {
public TaskDescriptionException(String msg, Exception e) {
super(msg, e);
}
public TaskDescriptionException(String msg) {
super(msg);
}
}
}