/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.aurora.scheduler.mesos;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.inject.Inject;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;

import org.apache.aurora.GuavaUtils;
import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.AcceptedOffer;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.storage.entities.IAppcImage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IDockerContainer;
import org.apache.aurora.scheduler.storage.entities.IDockerImage;
import org.apache.aurora.scheduler.storage.entities.IImage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IMesosContainer;
import org.apache.aurora.scheduler.storage.entities.IMetadata;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.CommandInfo;
import org.apache.mesos.Protos.ContainerInfo;
import org.apache.mesos.Protos.DiscoveryInfo;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.Label;
import org.apache.mesos.Protos.Labels;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Port;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
 * A factory to create mesos task objects.
 */
public interface MesosTaskFactory {

  /**
   * Creates a mesos task object.
   *
   * @param task Assigned task to translate into a task object.
   * @param offer Resource offer the task is being assigned to.
   * @return A new task.
   * @throws SchedulerException If the task could not be encoded.
   */
  TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException;

  // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
  class MesosTaskFactoryImpl implements MesosTaskFactory {
    private static final Logger LOG = LoggerFactory.getLogger(MesosTaskFactoryImpl.class);
    private static final String EXECUTOR_PREFIX = "thermos-";

    @VisibleForTesting
    static final String METADATA_LABEL_PREFIX = "org.apache.aurora.metadata.";

    @VisibleForTesting
    static final String DEFAULT_PORT_PROTOCOL = "TCP";

    @VisibleForTesting
    static final String SOURCE_LABEL = "source";

    private final ExecutorSettings executorSettings;
    private final TierManager tierManager;
    private final IServerInfo serverInfo;

    @Inject
    MesosTaskFactoryImpl(
        ExecutorSettings executorSettings,
        TierManager tierManager,
        IServerInfo serverInfo) {

      this.executorSettings = requireNonNull(executorSettings);
      this.tierManager = requireNonNull(tierManager);
      this.serverInfo = requireNonNull(serverInfo);
    }

    @VisibleForTesting
    static ExecutorID getExecutorId(String taskId) {
      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
    }

    private static String getJobSourceName(IJobKey jobkey) {
      return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
    }

    private static String getJobSourceName(ITaskConfig task) {
      return getJobSourceName(task.getJob());
    }

    @VisibleForTesting
    static String getInstanceSourceName(ITaskConfig task, int instanceId) {
      return String.format("%s.%s", getJobSourceName(task), instanceId);
    }

    @VisibleForTesting
    static String getInverseJobSourceName(IJobKey job) {
      return String.format("%s.%s.%s", job.getName(), job.getEnvironment(), job.getRole());
    }

    private static byte[] serializeTask(IAssignedTask task) throws SchedulerException {
      try {
        return ThriftBinaryCodec.encode(task.newBuilder());
      } catch (ThriftBinaryCodec.CodingException e) {
        LOG.error("Unable to serialize task.", e);
        throw new SchedulerException("Internal error.", e);
      }
    }

    @Override
    public TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException {
      requireNonNull(task);
      requireNonNull(offer);

      ITaskConfig config = task.getTask();
      AcceptedOffer acceptedOffer;
      // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
      try {
        acceptedOffer = AcceptedOffer.create(
            offer,
            task,
            executorSettings.getExecutorOverhead(),
            tierManager.getTier(task.getTask()));
      } catch (ResourceManager.InsufficientResourcesException e) {
        throw new SchedulerException(e);
      }
      Iterable<Resource> resources = acceptedOffer.getTaskResources();

      LOG.debug(
          "Setting task resources to {}",
          Iterables.transform(resources, Protobufs::toString));

      TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
          .setName(JobKeys.canonicalString(Tasks.getJob(task)))
          .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
          .setSlaveId(offer.getSlaveId())
          .addAllResources(resources);

      configureTaskLabels(config.getMetadata(), taskBuilder);

      if (executorSettings.shouldPopulateDiscoverInfo()) {
        configureDiscoveryInfos(task, taskBuilder);
      }

      if (config.getContainer().isSetMesos()) {
        ExecutorInfo.Builder executorInfoBuilder = configureTaskForExecutor(task, acceptedOffer);

        Optional<ContainerInfo.Builder> containerInfoBuilder = configureTaskForImage(
            task.getTask().getContainer().getMesos());
        if (containerInfoBuilder.isPresent()) {
          executorInfoBuilder.setContainer(containerInfoBuilder.get());
        }

        taskBuilder.setExecutor(executorInfoBuilder.build());
      } else if (config.getContainer().isSetDocker()) {
        IDockerContainer dockerContainer = config.getContainer().getDocker();
        if (config.isSetExecutorConfig()) {
          ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, acceptedOffer)
              .setContainer(getDockerContainerInfo(dockerContainer));
          taskBuilder.setExecutor(execBuilder.build());
        } else {
          LOG.warn("Running Docker-based task without an executor.");
          taskBuilder.setContainer(getDockerContainerInfo(dockerContainer))
              .setCommand(CommandInfo.newBuilder().setShell(false));
        }
      } else {
        throw new SchedulerException("Task had no supported container set.");
      }

      if (taskBuilder.hasExecutor()) {
        taskBuilder.setData(ByteString.copyFrom(serializeTask(task)));
      }
      return taskBuilder.build();
    }

    private Optional<ContainerInfo.Builder> configureTaskForImage(IMesosContainer mesosContainer) {
      requireNonNull(mesosContainer);

      if (mesosContainer.isSetImage()) {
        IImage image = mesosContainer.getImage();

        Protos.Image.Builder imageBuilder = Protos.Image.newBuilder();

        if (image.isSetAppc()) {
          IAppcImage appcImage = image.getAppc();

          imageBuilder.setType(Protos.Image.Type.APPC);
          imageBuilder.setAppc(Protos.Image.Appc.newBuilder()
              .setName(appcImage.getName())
              .setId(appcImage.getImageId()));
        } else if (image.isSetDocker()) {
          IDockerImage dockerImage = image.getDocker();

          imageBuilder.setType(Protos.Image.Type.DOCKER);
          imageBuilder.setDocker(Protos.Image.Docker.newBuilder()
              .setName(dockerImage.getName() + ":" + dockerImage.getTag()));
        } else {
          throw new SchedulerException("Task had no supported image set.");
        }

        ContainerInfo.MesosInfo.Builder mesosContainerBuilder =
            ContainerInfo.MesosInfo.newBuilder();

        mesosContainerBuilder.setImage(imageBuilder);

        return Optional.of(ContainerInfo.newBuilder()
            .setType(ContainerInfo.Type.MESOS)
            .setMesos(mesosContainerBuilder)
            .addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts()));
      }

      return Optional.absent();
    }

    private ContainerInfo getDockerContainerInfo(IDockerContainer config) {
      Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
          item -> Protos.Parameter.newBuilder().setKey(item.getName())
            .setValue(item.getValue()).build());

      ContainerInfo.DockerInfo.Builder dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
          .setImage(config.getImage()).addAllParameters(parameters);
      return ContainerInfo.newBuilder()
          .setType(ContainerInfo.Type.DOCKER)
          .setDocker(dockerBuilder.build())
          .addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts())
          .build();
    }

    private ExecutorInfo.Builder configureTaskForExecutor(
        IAssignedTask task,
        AcceptedOffer acceptedOffer) {

      ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
          .setExecutorId(getExecutorId(task.getTaskId()))
          .setLabels(
              Labels.newBuilder().addLabels(
                  Label.newBuilder()
                      .setKey(SOURCE_LABEL)
                      .setValue(getInstanceSourceName(task.getTask(), task.getInstanceId()))));

      //TODO: (rdelvalle) add output_file when Aurora's Mesos dep is updated (MESOS-4735)
      List<CommandInfo.URI> mesosFetcherUris = task.getTask().getMesosFetcherUris().stream()
          .map(u -> Protos.CommandInfo.URI.newBuilder().setValue(u.getValue())
              .setExecutable(false)
              .setExtract(u.isExtract())
              .setCache(u.isCache()).build())
          .collect(Collectors.toList());

      builder.setCommand(builder.getCommand().toBuilder().addAllUris(mesosFetcherUris));

      Iterable<Resource> executorResources = acceptedOffer.getExecutorResources();
      LOG.debug(
          "Setting executor resources to {}",
          Iterables.transform(executorResources, Protobufs::toString));
      builder.clearResources().addAllResources(executorResources);
      return builder;
    }

    private void configureTaskLabels(Set<IMetadata> metadata, TaskInfo.Builder taskBuilder) {
      ImmutableSet<Label> labels = metadata.stream()
          .map(m -> Label.newBuilder()
              .setKey(METADATA_LABEL_PREFIX + m.getKey())
              .setValue(m.getValue())
              .build())
          .collect(GuavaUtils.toImmutableSet());

      if (!labels.isEmpty()) {
        taskBuilder.setLabels(Labels.newBuilder().addAllLabels(labels));
      }
    }

    private void configureDiscoveryInfos(IAssignedTask task, TaskInfo.Builder taskBuilder) {
      DiscoveryInfo.Builder builder = taskBuilder.getDiscoveryBuilder();
      builder.setVisibility(DiscoveryInfo.Visibility.CLUSTER);
      builder.setName(getInverseJobSourceName(task.getTask().getJob()));
      builder.setEnvironment(task.getTask().getJob().getEnvironment());
      // A good sane choice for default location is current Aurora cluster name.
      builder.setLocation(serverInfo.getClusterName());
      for (Map.Entry<String, Integer> entry : task.getAssignedPorts().entrySet()) {
        builder.getPortsBuilder().addPorts(
            Port.newBuilder()
                .setName(entry.getKey())
                .setNumber(entry.getValue())
                .setProtocol(DEFAULT_PORT_PROTOCOL)
        );
      }
    }
  }
}
