/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.clustermanager;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.ShellCommandBuilder;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * {@link ContainerAllocator} makes requests for physical resources to the resource manager and also runs
 * a processor on an allocated physical resource.
 *
 * <ul>
 *  <li>
 *    In case of host-affinity enabled, each request ({@link SamzaResourceRequest} contains a processorId which
 *    identifies the processor the request is for and a "preferredHost" which is determined by the locality mappings
 *    in the coordinator stream
 *  </li>
 *  <li>
 *    This thread periodically matches outstanding resource requests with allocated resources.
 *    Its period is controlled using the {@code allocatorSleepIntervalMs} parameter
 *  </li>
 *  <li>
 *    When host-affinity is enabled, the resource-request's preferredHost param is set to the host the processor
 *    was last seen on
 *  </li>
 *  <li>
 *    When host-affinity is disabled, the resource-request's preferredHost param is set to {@link ResourceRequestState#ANY_HOST}
 *  </li>
 *  <li>
 *    When host-affinity is enabled and a preferred resource has not been obtained after {@code requestExpiryTimeout}
 *    milliseconds of the request being made, the resource is declared expired. The expired request are handled by
 *    allocating them to *ANY* allocated resource if available. If no surplus resources are available the current preferred
 *    resource-request is cancelled and resource-request for ANY_HOST is issued
 *  </li>
 *  <li>
 *    When host-affinity is not enabled, this periodically wakes up to assign a processor to *ANY* allocated resource.
 *    If there aren't enough resources, it waits by sleeping for {@code allocatorSleepIntervalMs} milliseconds.
 *  </li>
 * </ul>
 *
 * This class is not thread-safe. This class is used in the refactored code path as called by run-jc.sh
 */
public class ContainerAllocator implements Runnable {

  private static final Logger LOG = LoggerFactory.getLogger(ContainerAllocator.class);

  /* State that controls the lifecycle of the allocator thread */
  private volatile boolean isRunning = true;

  /**
   * Flag for affine host requests
   */
  private final boolean hostAffinityEnabled;

  /**
   * Config and derived config objects
   */
  private final TaskConfig taskConfig;
  private final Config config;

  /**
   * A ClusterResourceManager for the allocator to request for resources.
   */
  protected final ClusterResourceManager clusterResourceManager;
  /**
   * The allocator sleeps for allocatorSleepIntervalMs before it polls its queue for the next request
   */
  protected final int allocatorSleepIntervalMs;
  /**
   * Each container currently has the same configuration - memory, and numCpuCores.
   */
  protected final int containerMemoryMb;
  protected final int containerNumCpuCores;

  /**
   * State corresponding to num failed containers, running processors etc.
   */
  protected final SamzaApplicationState state;

  /**
   * ResourceRequestState indicates the state of all unfulfilled and allocated container requests
   */
  protected final ResourceRequestState resourceRequestState;
  /**
   * Tracks the configured expiration of a resource request defaults to {@code ClusterManagerConfig#CLUSTER_MANAGER_REQUEST_TIMEOUT_MS}
   * if specified or {@code ClusterManagerConfig#DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS} otherwise
   */
  private final int configuredRequestExpiryTimeout;

  private final ContainerManager containerManager;

  public ContainerAllocator(ClusterResourceManager clusterResourceManager,
      Config config,
      SamzaApplicationState state,
      boolean hostAffinityEnabled,
      ContainerManager containerManager) {
    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
    this.clusterResourceManager = clusterResourceManager;
    this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
    this.resourceRequestState = new ResourceRequestState(hostAffinityEnabled, this.clusterResourceManager);
    this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
    this.containerNumCpuCores = clusterManagerConfig.getNumCores();
    this.taskConfig = new TaskConfig(config);
    this.state = state;
    this.config = config;
    this.hostAffinityEnabled = hostAffinityEnabled;
    this.containerManager = containerManager;
    this.configuredRequestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
  }

  /**
   * Continually schedule processors to run on resources obtained from the cluster manager.
   * The loop frequency is governed by thread sleeps for allocatorSleepIntervalMs ms.
   *
   * Terminates when the isRunning flag is cleared.
   */
  @Override
  public void run() {
    while (isRunning) {
      try {
        assignResourceRequests();

        // Move delayed requests that are ready to the active request queue
        resourceRequestState.sendPendingDelayedResourceRequests();

        // Release extra resources and update the entire system's state
        resourceRequestState.releaseExtraResources();

        Thread.sleep(allocatorSleepIntervalMs);
      } catch (InterruptedException e) {
        LOG.warn("Got InterruptedException in AllocatorThread.", e);
        Thread.currentThread().interrupt();
      } catch (Exception e) {
        LOG.error("Got unknown Exception in AllocatorThread.", e);
      }
    }
  }

  /**
   * Assigns resources received from the cluster manager to processors.
   *
   * During the run() method, the thread sleeps for allocatorSleepIntervalMs ms. It then invokes assignResourceRequests,
   * and tries to allocate any unsatisfied request that is still in the request queue {@link ResourceRequestState})
   * with allocated resources.
   *
   * When host-affinity is disabled, all allocated resources are buffered by the key "ANY_HOST"
   * When host-affinity is enabled, all allocated resources are buffered by the hostName as key
   *
   * If the requested host is not available, the thread checks to see if the request has expired. If it has expired
   * then following cases are possible
   *
   * Case 1. If this expired request is due to a container placement action mark the request as failed and return
   * Case 2: Otherwise for a normal resource request following cases are possible
   *    Case 2.1  If StandbyContainer is present refer to {@code StandbyContainerManager#handleExpiredResourceRequest}
   *    Case 2.2: host-affinity is enabled, allocator thread looks for allocated resources on ANY_HOST and issues a
   *              container start if available, otherwise issue an ANY_HOST request
   *    Case 2.2: host-affinity is disabled, allocator thread does not handle expired requests, it waits for cluster
   *              manager to return resources on ANY_HOST
   *         TODO: SAMZA-2330 Handle expired request for host affinity disabled case
   *
   * If the requested host is available then following cases are possible
   * Case 1. If the container launch request is due to an existing container placement action, issue a stop on active
   *         container & wait for the active container to be stopped before issuing a start.
   * Case 2. If StandbyContainer is present refer to {@code StandbyContainerManager#checkStandbyConstraintsAndRunStreamProcessor}
   * Case 3. Otherwise just invoke a container start on the allocated resource for the pending request
   *
   * When host-affinity is enabled and a {@code StandbyContainerManager} is present, the allocator transfers the request
   * to it for checking StandByConstraints before launching a processor
   */
  void assignResourceRequests() {
    while (hasReadyPendingRequest()) {
      SamzaResourceRequest request = peekReadyPendingRequest().get();
      String processorId = request.getProcessorId();
      String preferredHost = hostAffinityEnabled ? request.getPreferredHost() : ResourceRequestState.ANY_HOST;
      Instant requestCreationTime = request.getRequestTimestamp();

      LOG.info("Handling assignment for Processor ID: {} with request {}", processorId, request);
      if (hasAllocatedResource(preferredHost)) {

        // Found allocated container on preferredHost
        LOG.info("Found an available container for Processor ID: {} on the host: {}", processorId, preferredHost);

        // Needs to be only updated when host affinity is enabled
        if (hostAffinityEnabled) {
          state.matchedResourceRequests.incrementAndGet();
        }

        boolean containerLaunchComplete =
            containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost),
                resourceRequestState, this);

        /**
         * Some Container launch requests are due to Container Placement actions like move, restarts. Under those
         * circumstances a container launch needs to wait for the previous container incarnation to stop. In this scenario
         * the allocator thread needs to sleep and recheck the stop again
         */
        if (!containerLaunchComplete) {
          break;
        }

      } else {

        LOG.info("Did not find any allocated containers for running Processor ID: {} on the host: {}.",
            processorId, preferredHost);

        if (isRequestExpired(request)) {
          updateExpiryMetrics(request);
          containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
          if (!hostAffinityEnabled) {
            LOG.info("Host affinity is disabled on expired request.");
            break;
          }
        } else {
          LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                  + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
              requestCreationTime, System.currentTimeMillis(), getRequestTimeout(request).toMillis());
          break;
        }
      }
    }
  }

  /**
   * Updates the request state and runs a processor on the specified host. Assumes a resource
   * is available on the preferred host, so the caller must verify that before invoking this method.
   *
   * @param request             the {@link SamzaResourceRequest} which is being handled.
   * @param preferredHost       the preferred host on which the processor should be run or
   *                            {@link ResourceRequestState#ANY_HOST} if there is no host preference.
   * @throws                    SamzaException if there is no allocated resource in the specified host.
   */
  protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) {
    CommandBuilder builder = getCommandBuilder(request.getProcessorId());
    // Get the available resource
    SamzaResource resource = peekAllocatedResource(preferredHost);
    if (resource == null) {
      throw new SamzaException("Expected resource for Processor ID: " + request.getProcessorId() + " was unavailable on host: " + preferredHost);
    }

    /**
     * If the allocated resource has expired then release the expired resource and re-request the resources from {@link ClusterResourceManager}
     */
    if (clusterResourceManager.isResourceExpired(resource)) {
      containerManager.handleExpiredResource(request, resource, preferredHost, resourceRequestState, this);
      return;
    }

    // Update state
    resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
    String processorId = request.getProcessorId();

    // Run processor on resource
    LOG.info("Found Container ID: {} for Processor ID: {} on host: {} for request creation time: {}.",
        resource.getContainerId(), processorId, preferredHost, request.getRequestTimestamp());

    // Update processor state as "pending" and then issue a request to launch it. It's important to perform the state-update
    // prior to issuing the request. Otherwise, there's a race where the response callback may arrive sooner and not see
    // the processor as "pending" (SAMZA-2117)

    state.failedProcessors.remove(processorId);
    state.pendingProcessors.put(processorId, resource);

    clusterResourceManager.launchStreamProcessor(resource, builder);
  }

  /**
   * Called during initial request for resources
   *
   * @param processorToHostMapping A Map of [processorId, hostName], where processorId is the ID of the Samza processor
   *                               to run on the resource. hostName is the host on which the resource should be allocated.
   *                               The hostName value is null, either
   *                                - when host-affinity has never been enabled, or
   *                                - when host-affinity is enabled and job is run for the first time
   *                                - when the number of containers has been increased.
   */
  public void requestResources(Map<String, String> processorToHostMapping) {
    for (Map.Entry<String, String> entry : processorToHostMapping.entrySet()) {
      String processorId = entry.getKey();
      String preferredHost = entry.getValue();
      if (!hostAffinityEnabled) {
        preferredHost = ResourceRequestState.ANY_HOST;
      } else if (preferredHost == null) {
        LOG.info("No preferred host mapping found for Processor ID: {}. Requesting resource on ANY_HOST", processorId);
        preferredHost = ResourceRequestState.ANY_HOST;
      }
      requestResource(processorId, preferredHost);
    }
  }

  /**
   * Checks if this allocator has a pending resource request with a request timestamp equal to or earlier than the current
   * timestamp.
   * @return {@code true} if there is a pending request, {@code false} otherwise.
   */
  protected final boolean hasReadyPendingRequest() {
    return peekReadyPendingRequest().isPresent();
  }

  /**
   * Retrieves, but does not remove, the next pending request in the queue with the {@link SamzaResourceRequest#getRequestTimestamp()}
   * that is greater than the current timestamp.
   *
   * @return  the pending request or {@code null} if there is no pending request.
   */
  protected final Optional<SamzaResourceRequest> peekReadyPendingRequest() {
    SamzaResourceRequest pendingRequest = resourceRequestState.peekPendingRequest();
    return Optional.ofNullable(pendingRequest);
  }

  /**
   * Requests a resource from the cluster manager
   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
   * @param preferredHost name of the host that you prefer to run the processor on
   */
  public final void requestResource(String processorId, String preferredHost) {
    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO);
  }

  /**
   * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
   * @param preferredHost name of the host that you prefer to run the processor on
   * @param delay the {@link Duration} to add to the request timestamp
   */
  public final void requestResourceWithDelay(String processorId, String preferredHost, Duration delay) {
    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, preferredHost, delay);
    issueResourceRequest(request);
  }

  /**
   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
   * @param preferredHost name of the host that you prefer to run the processor on
   * @return the created request
   */
  public final SamzaResourceRequest getResourceRequest(String processorId, String preferredHost) {
    return getResourceRequestWithDelay(processorId, preferredHost, Duration.ZERO);
  }

  /**
   * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
   * plus the specified delay.
   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
   * @param preferredHost name of the host that you prefer to run the processor on
   * @param delay the {@link Duration} to add to the request timestamp
   * @return the created request
   */
  public final SamzaResourceRequest getResourceRequestWithDelay(String processorId, String preferredHost, Duration delay) {
    return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
  }

  public final void issueResourceRequest(SamzaResourceRequest request) {
    resourceRequestState.addResourceRequest(request);
    state.containerRequests.incrementAndGet();
    if (ResourceRequestState.ANY_HOST.equals(request.getPreferredHost())) {
      state.anyHostRequests.incrementAndGet();
    } else {
      state.preferredHostRequests.incrementAndGet();
    }
  }

  /**
   * Returns true if there are resources allocated on a host.
   * @param host  the host for which a resource is needed.
   * @return      {@code true} if there is a resource allocated for the specified host, {@code false} otherwise.
   */
  protected boolean hasAllocatedResource(String host) {
    return peekAllocatedResource(host) != null;
  }

  /**
   * Retrieves, but does not remove, the first allocated resource on the specified host.
   *
   * @param host  the host on which a resource is needed.
   * @return      the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one.
   */
  protected SamzaResource peekAllocatedResource(String host) {
    return resourceRequestState.peekResource(host);
  }

  /**
   * Returns a command builder with the build environment configured with the processorId.
   * @param processorId to configure the builder with.
   * @return the constructed builder object
   */
  private CommandBuilder getCommandBuilder(String processorId) {
    String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
    CommandBuilder cmdBuilder = ReflectionUtil.getObj(cmdBuilderClassName, CommandBuilder.class);

    cmdBuilder.setConfig(config).setId(processorId).setUrl(state.jobModelManager.server().getUrl());
    return cmdBuilder;
  }

  /**
   * Adds allocated samzaResource to a synchronized buffer of allocated resources.
   * See allocatedResources in {@link ResourceRequestState}
   *
   * @param samzaResource returned by the ContainerManager
   */
  public final void addResource(SamzaResource samzaResource) {
    resourceRequestState.addResource(samzaResource);
  }

  /**
   * Releases a single resource based on containerId.
   * @param containerId container ID
   */
  public final void releaseResource(String containerId) {
    resourceRequestState.releaseResource(containerId);
  }

  /**
   * Stops the Allocator. Setting this flag to false exits the allocator loop.
   */
  public void stop() {
    isRunning = false;
  }


  /**
   * Checks if a request has expired.
   *
   * @param request the request to check
   * @return true if request has expired
   */
  protected boolean isRequestExpired(SamzaResourceRequest request) {
    long currTime = Instant.now().toEpochMilli();
    boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
    if (requestExpired) {
      LOG.info("Request for Processor ID: {} on host: {} with creation time: {} has expired at current time: {} after timeout: {} ms.",
          request.getProcessorId(), request.getPreferredHost(), request.getRequestTimestamp(), currTime, getRequestTimeout(request).toMillis());
    }
    return requestExpired;
  }

  /**
   * Determines the request expiry timeout. Container placement actions like move, restarts can optionally override
   * request expiry timeout. Otherwise it defaults to { @code configuredRequestExpiryTimeout }
   */
  private Duration getRequestTimeout(SamzaResourceRequest request) {
    Optional<Duration> controlActionRequestExpiryTimeout = containerManager.getActionExpiryTimeout(request);
    return controlActionRequestExpiryTimeout.orElse(Duration.ofMillis(configuredRequestExpiryTimeout));
  }

  private void updateExpiryMetrics(SamzaResourceRequest request) {
    String preferredHost = request.getPreferredHost();
    if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
      state.expiredAnyHostRequests.incrementAndGet();
    } else {
      state.expiredPreferredHostRequests.incrementAndGet();
    }
  }
}
