blob: f9b1c7cf30f93336fb850da09c1f2b7178cbdc17 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.mesos;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import org.apache.aurora.GuavaUtils;
import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.AcceptedOffer;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.storage.entities.IAppcImage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IDockerContainer;
import org.apache.aurora.scheduler.storage.entities.IDockerImage;
import org.apache.aurora.scheduler.storage.entities.IImage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IMesosContainer;
import org.apache.aurora.scheduler.storage.entities.IMetadata;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.CommandInfo;
import org.apache.mesos.Protos.ContainerInfo;
import org.apache.mesos.Protos.DiscoveryInfo;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.Label;
import org.apache.mesos.Protos.Labels;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Port;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
/**
* A factory to create mesos task objects.
*/
public interface MesosTaskFactory {
/**
* Creates a mesos task object.
*
* @param task Assigned task to translate into a task object.
* @param offer Resource offer the task is being assigned to.
* @return A new task.
* @throws SchedulerException If the task could not be encoded.
*/
TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException;
// TODO(wfarner): Move this class to its own file to reduce visibility to package private.
class MesosTaskFactoryImpl implements MesosTaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(MesosTaskFactoryImpl.class);
private static final String EXECUTOR_PREFIX = "thermos-";
@VisibleForTesting
static final String METADATA_LABEL_PREFIX = "org.apache.aurora.metadata.";
@VisibleForTesting
static final String DEFAULT_PORT_PROTOCOL = "TCP";
@VisibleForTesting
static final String SOURCE_LABEL = "source";
private final ExecutorSettings executorSettings;
private final TierManager tierManager;
private final IServerInfo serverInfo;
@Inject
MesosTaskFactoryImpl(
ExecutorSettings executorSettings,
TierManager tierManager,
IServerInfo serverInfo) {
this.executorSettings = requireNonNull(executorSettings);
this.tierManager = requireNonNull(tierManager);
this.serverInfo = requireNonNull(serverInfo);
}
@VisibleForTesting
static ExecutorID getExecutorId(String taskId) {
return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
}
private static String getJobSourceName(IJobKey jobkey) {
return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
}
private static String getJobSourceName(ITaskConfig task) {
return getJobSourceName(task.getJob());
}
@VisibleForTesting
static String getInstanceSourceName(ITaskConfig task, int instanceId) {
return String.format("%s.%s", getJobSourceName(task), instanceId);
}
@VisibleForTesting
static String getInverseJobSourceName(IJobKey job) {
return String.format("%s.%s.%s", job.getName(), job.getEnvironment(), job.getRole());
}
private static byte[] serializeTask(IAssignedTask task) throws SchedulerException {
try {
return ThriftBinaryCodec.encode(task.newBuilder());
} catch (ThriftBinaryCodec.CodingException e) {
LOG.error("Unable to serialize task.", e);
throw new SchedulerException("Internal error.", e);
}
}
@Override
public TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException {
requireNonNull(task);
requireNonNull(offer);
ITaskConfig config = task.getTask();
AcceptedOffer acceptedOffer;
// TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
try {
acceptedOffer = AcceptedOffer.create(
offer,
task,
executorSettings.getExecutorOverhead(),
tierManager.getTier(task.getTask()));
} catch (ResourceManager.InsufficientResourcesException e) {
throw new SchedulerException(e);
}
Iterable<Resource> resources = acceptedOffer.getTaskResources();
LOG.debug(
"Setting task resources to {}",
Iterables.transform(resources, Protobufs::toString));
TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
.setName(JobKeys.canonicalString(Tasks.getJob(task)))
.setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
.setSlaveId(offer.getSlaveId())
.addAllResources(resources);
configureTaskLabels(config.getMetadata(), taskBuilder);
if (executorSettings.shouldPopulateDiscoverInfo()) {
configureDiscoveryInfos(task, taskBuilder);
}
if (config.getContainer().isSetMesos()) {
ExecutorInfo.Builder executorInfoBuilder = configureTaskForExecutor(task, acceptedOffer);
Optional<ContainerInfo.Builder> containerInfoBuilder = configureTaskForImage(
task.getTask().getContainer().getMesos());
if (containerInfoBuilder.isPresent()) {
executorInfoBuilder.setContainer(containerInfoBuilder.get());
}
taskBuilder.setExecutor(executorInfoBuilder.build());
} else if (config.getContainer().isSetDocker()) {
IDockerContainer dockerContainer = config.getContainer().getDocker();
if (config.isSetExecutorConfig()) {
ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, acceptedOffer)
.setContainer(getDockerContainerInfo(dockerContainer));
taskBuilder.setExecutor(execBuilder.build());
} else {
LOG.warn("Running Docker-based task without an executor.");
taskBuilder.setContainer(getDockerContainerInfo(dockerContainer))
.setCommand(CommandInfo.newBuilder().setShell(false));
}
} else {
throw new SchedulerException("Task had no supported container set.");
}
if (taskBuilder.hasExecutor()) {
taskBuilder.setData(ByteString.copyFrom(serializeTask(task)));
}
return taskBuilder.build();
}
private Optional<ContainerInfo.Builder> configureTaskForImage(IMesosContainer mesosContainer) {
requireNonNull(mesosContainer);
if (mesosContainer.isSetImage()) {
IImage image = mesosContainer.getImage();
Protos.Image.Builder imageBuilder = Protos.Image.newBuilder();
if (image.isSetAppc()) {
IAppcImage appcImage = image.getAppc();
imageBuilder.setType(Protos.Image.Type.APPC);
imageBuilder.setAppc(Protos.Image.Appc.newBuilder()
.setName(appcImage.getName())
.setId(appcImage.getImageId()));
} else if (image.isSetDocker()) {
IDockerImage dockerImage = image.getDocker();
imageBuilder.setType(Protos.Image.Type.DOCKER);
imageBuilder.setDocker(Protos.Image.Docker.newBuilder()
.setName(dockerImage.getName() + ":" + dockerImage.getTag()));
} else {
throw new SchedulerException("Task had no supported image set.");
}
ContainerInfo.MesosInfo.Builder mesosContainerBuilder =
ContainerInfo.MesosInfo.newBuilder();
mesosContainerBuilder.setImage(imageBuilder);
return Optional.of(ContainerInfo.newBuilder()
.setType(ContainerInfo.Type.MESOS)
.setMesos(mesosContainerBuilder)
.addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts()));
}
return Optional.absent();
}
private ContainerInfo getDockerContainerInfo(IDockerContainer config) {
Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
item -> Protos.Parameter.newBuilder().setKey(item.getName())
.setValue(item.getValue()).build());
ContainerInfo.DockerInfo.Builder dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
.setImage(config.getImage()).addAllParameters(parameters);
return ContainerInfo.newBuilder()
.setType(ContainerInfo.Type.DOCKER)
.setDocker(dockerBuilder.build())
.addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts())
.build();
}
private ExecutorInfo.Builder configureTaskForExecutor(
IAssignedTask task,
AcceptedOffer acceptedOffer) {
ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
.setExecutorId(getExecutorId(task.getTaskId()))
.setLabels(
Labels.newBuilder().addLabels(
Label.newBuilder()
.setKey(SOURCE_LABEL)
.setValue(getInstanceSourceName(task.getTask(), task.getInstanceId()))));
//TODO: (rdelvalle) add output_file when Aurora's Mesos dep is updated (MESOS-4735)
List<CommandInfo.URI> mesosFetcherUris = task.getTask().getMesosFetcherUris().stream()
.map(u -> Protos.CommandInfo.URI.newBuilder().setValue(u.getValue())
.setExecutable(false)
.setExtract(u.isExtract())
.setCache(u.isCache()).build())
.collect(Collectors.toList());
builder.setCommand(builder.getCommand().toBuilder().addAllUris(mesosFetcherUris));
Iterable<Resource> executorResources = acceptedOffer.getExecutorResources();
LOG.debug(
"Setting executor resources to {}",
Iterables.transform(executorResources, Protobufs::toString));
builder.clearResources().addAllResources(executorResources);
return builder;
}
private void configureTaskLabels(Set<IMetadata> metadata, TaskInfo.Builder taskBuilder) {
ImmutableSet<Label> labels = metadata.stream()
.map(m -> Label.newBuilder()
.setKey(METADATA_LABEL_PREFIX + m.getKey())
.setValue(m.getValue())
.build())
.collect(GuavaUtils.toImmutableSet());
if (!labels.isEmpty()) {
taskBuilder.setLabels(Labels.newBuilder().addAllLabels(labels));
}
}
private void configureDiscoveryInfos(IAssignedTask task, TaskInfo.Builder taskBuilder) {
DiscoveryInfo.Builder builder = taskBuilder.getDiscoveryBuilder();
builder.setVisibility(DiscoveryInfo.Visibility.CLUSTER);
builder.setName(getInverseJobSourceName(task.getTask().getJob()));
builder.setEnvironment(task.getTask().getJob().getEnvironment());
// A good sane choice for default location is current Aurora cluster name.
builder.setLocation(serverInfo.getClusterName());
for (Map.Entry<String, Integer> entry : task.getAssignedPorts().entrySet()) {
builder.getPortsBuilder().addPorts(
Port.newBuilder()
.setName(entry.getKey())
.setNumber(entry.getValue())
.setProtocol(DEFAULT_PORT_PROTOCOL)
);
}
}
}
}