/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.server.appmaster.rpc;

import com.google.common.base.Preconditions;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.ServiceNotReadyException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.persist.AggregateConfSerDeser;
import org.apache.slider.core.persist.ConfTreeSerDeser;
import org.apache.slider.server.appmaster.AppMasterActionOperations;
import org.apache.slider.server.appmaster.actions.ActionFlexCluster;
import org.apache.slider.server.appmaster.actions.ActionHalt;
import org.apache.slider.server.appmaster.actions.ActionKillContainer;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
import org.apache.slider.server.appmaster.actions.AsyncAction;
import org.apache.slider.server.appmaster.actions.QueueAccess;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
import org.apache.slider.server.services.security.CertificateManager;
import org.apache.slider.server.services.security.KeystoreGenerator;
import org.apache.slider.server.services.security.SecurityStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.slider.api.proto.RestTypeMarshalling.marshall;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_COMPONENTS;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_CONTAINERS;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_RESOURCES;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_APPCONF;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_RESOURCES;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_APPCONF;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_RESOURCES;

/**
 * Implement the {@link SliderClusterProtocol}.
 */
@SuppressWarnings("unchecked")

public class SliderIPCService extends AbstractService
    implements SliderClusterProtocol {

  protected static final Logger log =
      LoggerFactory.getLogger(SliderIPCService.class);

  private final QueueAccess actionQueues;
  private final StateAccessForProviders state;
  private final MetricsAndMonitoring metricsAndMonitoring;
  private final AppMasterActionOperations amOperations;
  private final ContentCache cache;
  private final CertificateManager certificateManager;

  /**
   * This is the prefix used for metrics
   */
  public static final String METRICS_PREFIX =
      "org.apache.slider.api.SliderIPCService.";

  /**
   * Constructor
   * @param amOperations access to any AM operations
   * @param state state view
   * @param actionQueues queues for actions
   * @param metricsAndMonitoring metrics
   * @param cache
   */
  public SliderIPCService(AppMasterActionOperations amOperations,
      CertificateManager certificateManager,
      StateAccessForProviders state,
      QueueAccess actionQueues,
      MetricsAndMonitoring metricsAndMonitoring,
      ContentCache cache) {
    super("SliderIPCService");
    Preconditions.checkArgument(amOperations != null, "null amOperations");
    Preconditions.checkArgument(state != null, "null appState");
    Preconditions.checkArgument(actionQueues != null, "null actionQueues");
    Preconditions.checkArgument(metricsAndMonitoring != null,
        "null metricsAndMonitoring");
    Preconditions.checkArgument(cache != null, "null cache");
    this.state = state;
    this.actionQueues = actionQueues;
    this.metricsAndMonitoring = metricsAndMonitoring;
    this.amOperations = amOperations;
    this.cache = cache;
    this.certificateManager = certificateManager;
  }

  @Override   //SliderClusterProtocol
  public ProtocolSignature getProtocolSignature(String protocol,
      long clientVersion,
      int clientMethodsHash) throws IOException {
    return ProtocolSignature.getProtocolSignature(
        this, protocol, clientVersion, clientMethodsHash);
  }


  @Override   //SliderClusterProtocol
  public long getProtocolVersion(String protocol, long clientVersion)
      throws IOException {
    return SliderClusterProtocol.versionID;
  }

  /**
   * General actions to perform on a slider RPC call coming in
   * @param operation operation to log
   * @throws IOException problems
   * @throws ServiceNotReadyException if the RPC service is constructed
   * but not fully initialized
   */
  protected void onRpcCall(String operation) throws IOException {
    log.debug("Received call to {}", operation);
    metricsAndMonitoring.markMeterAndCounter(METRICS_PREFIX + operation);
  }

  /**
   * Schedule an action
   * @param action for delayed execution
   */
  public void schedule(AsyncAction action) {
    actionQueues.schedule(action);
  }

  /**
   * Queue an action for immediate execution in the executor thread
   * @param action action to execute
   */
  public void queue(AsyncAction action) {
    actionQueues.put(action);
  }

  @Override //SliderClusterProtocol
  public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request)
      throws IOException, YarnException {
    onRpcCall("stop");
    String message = request.getMessage();
    if (message == null) {
      message = "application stopped by client";
    }
    ActionStopSlider stopSlider =
        new ActionStopSlider(message,
            1000, TimeUnit.MILLISECONDS,
            LauncherExitCodes.EXIT_SUCCESS,
            FinalApplicationStatus.SUCCEEDED,
            message);
    log.info("SliderAppMasterApi.stopCluster: {}", stopSlider);
    schedule(stopSlider);
    return Messages.StopClusterResponseProto.getDefaultInstance();
  }

  @Override //SliderClusterProtocol
  public Messages.UpgradeContainersResponseProto upgradeContainers(
      Messages.UpgradeContainersRequestProto request) throws IOException,
      YarnException {
    onRpcCall("upgrade");
    String message = request.getMessage();
    if (message == null) {
      message = "application containers upgraded by client";
    }
    ActionUpgradeContainers upgradeContainers =
        new ActionUpgradeContainers(
            "Upgrade containers",
            1000, TimeUnit.MILLISECONDS,
            LauncherExitCodes.EXIT_SUCCESS,
            FinalApplicationStatus.SUCCEEDED,
            request.getContainerList(),
            request.getComponentList(),
            message);
    log.info("SliderAppMasterApi.upgradeContainers: {}", upgradeContainers);
    schedule(upgradeContainers);
    return Messages.UpgradeContainersResponseProto.getDefaultInstance();
  }

  @Override //SliderClusterProtocol
  public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request)
      throws IOException {
    onRpcCall("flex");
    String payload = request.getClusterSpec();
    ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
    ConfTree updatedResources = confTreeSerDeser.fromJson(payload);
    schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS,
        updatedResources));
    return Messages.FlexClusterResponseProto.newBuilder().setResponse(
        true).build();
  }

  @Override //SliderClusterProtocol
  public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus(
      Messages.GetJSONClusterStatusRequestProto request)
      throws IOException, YarnException {
    onRpcCall("getstatus");
    String result;
    //quick update
    //query and json-ify
    ClusterDescription cd = state.refreshClusterStatus();
    result = cd.toJsonString();
    String stat = result;
    return Messages.GetJSONClusterStatusResponseProto.newBuilder()
                                                     .setClusterSpec(stat)
                                                     .build();
  }

  @Override
  public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
      Messages.GetInstanceDefinitionRequestProto request)
      throws IOException, YarnException {

    onRpcCall("getinstancedefinition");
    String internal;
    String resources;
    String app;
    AggregateConf instanceDefinition =
        state.getInstanceDefinitionSnapshot();
    internal = instanceDefinition.getInternal().toJson();
    resources = instanceDefinition.getResources().toJson();
    app = instanceDefinition.getAppConf().toJson();
    assert internal != null;
    assert resources != null;
    assert app != null;
    log.debug("Generating getInstanceDefinition Response");
    Messages.GetInstanceDefinitionResponseProto.Builder builder =
        Messages.GetInstanceDefinitionResponseProto.newBuilder();
    builder.setInternal(internal);
    builder.setResources(resources);
    builder.setApplication(app);
    return builder.build();
  }

  @Override //SliderClusterProtocol
  public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request)
      throws IOException, YarnException {
    onRpcCall("listnodes)");
    String role = request.getRole();
    Messages.ListNodeUUIDsByRoleResponseProto.Builder builder =
        Messages.ListNodeUUIDsByRoleResponseProto.newBuilder();
    List<RoleInstance> nodes = state.enumLiveNodesInRole(role);
    for (RoleInstance node : nodes) {
      builder.addUuid(node.id);
    }
    return builder.build();
  }

  @Override //SliderClusterProtocol
  public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request)
      throws IOException, YarnException {
    onRpcCall("getnode");
    RoleInstance instance = state.getLiveInstanceByContainerID(
        request.getUuid());
    return Messages.GetNodeResponseProto.newBuilder()
                                        .setClusterNode(instance.toProtobuf())
                                        .build();
  }

  @Override //SliderClusterProtocol
  public Messages.GetClusterNodesResponseProto getClusterNodes(
      Messages.GetClusterNodesRequestProto request)
      throws IOException, YarnException {
    onRpcCall("getclusternodes");
    List<RoleInstance>
        clusterNodes = state.getLiveInstancesByContainerIDs(
        request.getUuidList());

    Messages.GetClusterNodesResponseProto.Builder builder =
        Messages.GetClusterNodesResponseProto.newBuilder();
    for (RoleInstance node : clusterNodes) {
      builder.addClusterNode(node.toProtobuf());
    }
    //at this point: a possibly empty list of nodes
    return builder.build();
  }

  @Override
  public Messages.EchoResponseProto echo(Messages.EchoRequestProto request)
      throws IOException, YarnException {
    onRpcCall("echo");
    Messages.EchoResponseProto.Builder builder =
        Messages.EchoResponseProto.newBuilder();
    String text = request.getText();
    log.info("Echo request size ={}", text.length());
    log.info(text);
    //now return it
    builder.setText(text);
    return builder.build();
  }

  @Override
  public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request)
      throws IOException, YarnException {
    onRpcCall("killcontainer");
    String containerID = request.getId();
    log.info("Kill Container {}", containerID);
    //throws NoSuchNodeException if it is missing
    RoleInstance instance =
        state.getLiveInstanceByContainerID(containerID);
    queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS,
        amOperations));
    Messages.KillContainerResponseProto.Builder builder =
        Messages.KillContainerResponseProto.newBuilder();
    builder.setSuccess(true);
    return builder.build();
  }


  @Override
  public Messages.AMSuicideResponseProto amSuicide(
      Messages.AMSuicideRequestProto request)
      throws IOException {
    onRpcCall("amsuicide");
    int signal = request.getSignal();
    String text = request.getText();
    if (text == null) {
      text = "";
    }
    int delay = request.getDelay();
    log.info("AM Suicide with signal {}, message {} delay = {}", signal, text,
        delay);
    ActionHalt action = new ActionHalt(signal, text, delay,
        TimeUnit.MILLISECONDS);
    schedule(action);
    return Messages.AMSuicideResponseProto.getDefaultInstance();
  }

  @Override
  public Messages.ApplicationLivenessInformationProto getLivenessInformation(
      Messages.GetApplicationLivenessRequestProto request) throws IOException {
    ApplicationLivenessInformation info =
        state.getApplicationLivenessInformation();
    return marshall(info);
  }

  @Override
  public Messages.GetLiveContainersResponseProto getLiveContainers(
      Messages.GetLiveContainersRequestProto request)
      throws IOException {
    Map<String, ContainerInformation> infoMap =
        (Map<String, ContainerInformation>) cache.lookupWithIOE(
            LIVE_CONTAINERS);
    Messages.GetLiveContainersResponseProto.Builder builder =
        Messages.GetLiveContainersResponseProto.newBuilder();

    for (Map.Entry<String, ContainerInformation> entry : infoMap
        .entrySet()) {
      builder.addNames(entry.getKey());
      builder.addContainers(marshall(entry.getValue()));
    }
    return builder.build();
  }

  @Override
  public Messages.ContainerInformationProto getLiveContainer(Messages.GetLiveContainerRequestProto request) throws
      IOException {
    String containerId = request.getContainerId();
    RoleInstance id = state.getLiveInstanceByContainerID(containerId);
    ContainerInformation containerInformation = id.serialize();
    return marshall(containerInformation);
  }

  @Override
  public Messages.GetLiveComponentsResponseProto getLiveComponents(Messages.GetLiveComponentsRequestProto request) throws
      IOException {
    Map<String, ComponentInformation> infoMap =
        (Map<String, ComponentInformation>) cache.lookupWithIOE(
            LIVE_COMPONENTS);
    Messages.GetLiveComponentsResponseProto.Builder builder =
        Messages.GetLiveComponentsResponseProto.newBuilder();

    for (Map.Entry<String, ComponentInformation> entry : infoMap
        .entrySet()) {
      builder.addNames(entry.getKey());
      builder.addComponents(marshall(entry.getValue()));
    }
    return builder.build();
  }

  @Override
  public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException {
    return lookupAggregateConf(MODEL_DESIRED);
  }

  @Override
  public Messages.WrappedJsonProto getModelDesiredAppconf(Messages.EmptyPayloadProto request) throws IOException {
    return lookupConfTree(MODEL_DESIRED_APPCONF);
  }

  @Override
  public Messages.WrappedJsonProto getModelDesiredResources(Messages.EmptyPayloadProto request) throws IOException {
    return lookupConfTree(MODEL_DESIRED_RESOURCES);
  }

  @Override
  public Messages.WrappedJsonProto getModelResolved(Messages.EmptyPayloadProto request) throws IOException {
    return lookupAggregateConf(MODEL_RESOLVED);
  }

  @Override
  public Messages.WrappedJsonProto getModelResolvedAppconf(Messages.EmptyPayloadProto request) throws IOException {
    return lookupConfTree(MODEL_RESOLVED_APPCONF);
  }

  @Override
  public Messages.WrappedJsonProto getModelResolvedResources(Messages.EmptyPayloadProto request) throws IOException {
    return lookupConfTree(MODEL_RESOLVED_RESOURCES);
  }

  @Override
  public Messages.WrappedJsonProto getLiveResources(Messages.EmptyPayloadProto request) throws IOException {
    return lookupConfTree(LIVE_RESOURCES);
  }

  @Override
  public Messages.ComponentInformationProto getLiveComponent(Messages.GetLiveComponentRequestProto request) throws
      IOException {
    String name = request.getName();
    try {
      return marshall(state.getComponentInformation(name));
    } catch (YarnRuntimeException e) {
      throw new FileNotFoundException("Unknown component: " + name);
    }
  }

  /**
   * Helper method; look up an aggregate configuration in the cache from
   * a key, or raise an exception
   * @param key key to resolve
   * @return the configuration
   * @throws IOException on a failure
   */

  protected Messages.WrappedJsonProto lookupAggregateConf(String key) throws
      IOException {
    AggregateConf aggregateConf = (AggregateConf) cache.lookupWithIOE(key);
    String json = AggregateConfSerDeser.toString(aggregateConf);
    return wrap(json);
  }

  /**
   * Helper method; look up an conf tree in the cache from
   * a key, or raise an exception
   * @param key key to resolve
   * @return the configuration
   * @throws IOException on a failure
   */
  protected Messages.WrappedJsonProto lookupConfTree(String key) throws
      IOException {
    ConfTree conf = (ConfTree) cache.lookupWithIOE(key);
    String json = ConfTreeSerDeser.toString(conf);
    return wrap(json);
  }

  private Messages.WrappedJsonProto wrap(String json) {
    Messages.WrappedJsonProto.Builder builder =
        Messages.WrappedJsonProto.newBuilder();
    builder.setJson(json);
    return builder.build();
  }

  @Override
  public Messages.GetCertificateStoreResponseProto getClientCertificateStore(Messages.GetCertificateStoreRequestProto request) throws
      IOException {
    String hostname = request.getHostname();
    String clientId = request.getRequesterId();
    String password = request.getPassword();
    String type = request.getType();

    SecurityStore store = null;
    try {
      if ( SecurityStore.StoreType.keystore.equals(
          SecurityStore.StoreType.valueOf(type))) {
        store = certificateManager.generateContainerKeystore(hostname,
                                                             clientId,
                                                             null,
                                                             password);
      } else if (SecurityStore.StoreType.truststore.equals(
          SecurityStore.StoreType.valueOf(type))) {
        store = certificateManager.generateContainerTruststore(clientId,
                                                               null,
                                                               password);

      } else {
        throw new IOException("Illegal store type");
      }
    } catch (SliderException e) {
      throw new IOException(e);
    }
    return marshall(store);
  }
}
