/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.geode.management.internal.api;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.CacheElement;
import org.apache.geode.cache.configuration.GatewayReceiverConfig;
import org.apache.geode.cache.configuration.RegionConfig;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.api.ClusterManagementException;
import org.apache.geode.management.api.ClusterManagementListOperationsResult;
import org.apache.geode.management.api.ClusterManagementListResult;
import org.apache.geode.management.api.ClusterManagementOperation;
import org.apache.geode.management.api.ClusterManagementOperationResult;
import org.apache.geode.management.api.ClusterManagementRealizationException;
import org.apache.geode.management.api.ClusterManagementRealizationResult;
import org.apache.geode.management.api.ClusterManagementResult;
import org.apache.geode.management.api.ClusterManagementResult.StatusCode;
import org.apache.geode.management.api.ClusterManagementService;
import org.apache.geode.management.api.ConfigurationResult;
import org.apache.geode.management.api.CorrespondWith;
import org.apache.geode.management.api.RealizationResult;
import org.apache.geode.management.api.RestfulEndpoint;
import org.apache.geode.management.configuration.MemberConfig;
import org.apache.geode.management.configuration.Pdx;
import org.apache.geode.management.internal.CacheElementOperation;
import org.apache.geode.management.internal.ClusterManagementOperationStatusResult;
import org.apache.geode.management.internal.cli.functions.CacheRealizationFunction;
import org.apache.geode.management.internal.configuration.mutators.ConfigurationManager;
import org.apache.geode.management.internal.configuration.mutators.GatewayReceiverConfigManager;
import org.apache.geode.management.internal.configuration.mutators.PdxManager;
import org.apache.geode.management.internal.configuration.mutators.RegionConfigManager;
import org.apache.geode.management.internal.configuration.validators.CacheElementValidator;
import org.apache.geode.management.internal.configuration.validators.ConfigurationValidator;
import org.apache.geode.management.internal.configuration.validators.GatewayReceiverConfigValidator;
import org.apache.geode.management.internal.configuration.validators.MemberValidator;
import org.apache.geode.management.internal.configuration.validators.RegionConfigValidator;
import org.apache.geode.management.internal.exceptions.EntityExistsException;
import org.apache.geode.management.internal.operation.OperationHistoryManager;
import org.apache.geode.management.internal.operation.OperationHistoryManager.OperationInstance;
import org.apache.geode.management.internal.operation.OperationManager;
import org.apache.geode.management.internal.operation.TaggedWithOperator;
import org.apache.geode.management.runtime.OperationResult;
import org.apache.geode.management.runtime.RuntimeInfo;

public class LocatorClusterManagementService implements ClusterManagementService {
  private static final Logger logger = LogService.getLogger();
  private final ConfigurationPersistenceService persistenceService;
  private final Map<Class, ConfigurationManager> managers;
  private final Map<Class, ConfigurationValidator> validators;
  private final OperationManager operationManager;
  private final MemberValidator memberValidator;
  private final CacheElementValidator commonValidator;

  public LocatorClusterManagementService(InternalCache cache,
      ConfigurationPersistenceService persistenceService) {
    this(persistenceService, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(),
        new MemberValidator(cache, persistenceService), new CacheElementValidator(),
        new OperationManager(cache, new OperationHistoryManager()));
    // initialize the list of managers
    managers.put(RegionConfig.class, new RegionConfigManager());
    managers.put(Pdx.class, new PdxManager());
    managers.put(GatewayReceiverConfig.class, new GatewayReceiverConfigManager(cache));

    // initialize the list of validators
    validators.put(RegionConfig.class, new RegionConfigValidator(cache));
    validators.put(GatewayReceiverConfig.class, new GatewayReceiverConfigValidator());
  }

  @VisibleForTesting
  public LocatorClusterManagementService(ConfigurationPersistenceService persistenceService,
      Map<Class, ConfigurationManager> managers,
      Map<Class, ConfigurationValidator> validators,
      MemberValidator memberValidator,
      CacheElementValidator commonValidator,
      OperationManager operationManager) {
    this.persistenceService = persistenceService;
    this.managers = managers;
    this.validators = validators;
    this.memberValidator = memberValidator;
    this.commonValidator = commonValidator;
    this.operationManager = operationManager;
  }

  @Override
  public <T extends CacheElement> ClusterManagementRealizationResult create(T config) {
    // validate that user used the correct config object type
    ConfigurationManager configurationManager = getConfigurationManager(config);

    if (persistenceService == null) {
      return assertSuccessful(new ClusterManagementRealizationResult(StatusCode.ERROR,
          "Cluster configuration service needs to be enabled."));
    }

    String group = config.getConfigGroup();
    try {
      // first validate common attributes of all configuration object
      commonValidator.validate(CacheElementOperation.CREATE, config);

      ConfigurationValidator validator = validators.get(config.getClass());
      if (validator != null) {
        validator.validate(CacheElementOperation.CREATE, config);
      }

      // check if this config already exists on all/some members of this group
      memberValidator.validateCreate(config, configurationManager);
      // execute function on all members
    } catch (EntityExistsException e) {
      raise(StatusCode.ENTITY_EXISTS, e);
    } catch (IllegalArgumentException e) {
      raise(StatusCode.ILLEGAL_ARGUMENT, e);
    }

    Set<DistributedMember> targetedMembers = memberValidator.findServers(group);
    ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

    List<RealizationResult> functionResults = executeAndGetFunctionResult(
        new CacheRealizationFunction(),
        Arrays.asList(config, CacheElementOperation.CREATE),
        targetedMembers);

    functionResults.forEach(result::addMemberStatus);

    // if any false result is added to the member list
    if (result.getStatusCode() != StatusCode.OK) {
      result.setStatus(StatusCode.ERROR, "Failed to create on all members.");
      return assertSuccessful(result);
    }

    // persist configuration in cache config
    final String finalGroup = group; // the below lambda requires a reference that is final
    persistenceService.updateCacheConfig(finalGroup, cacheConfigForGroup -> {
      try {
        configurationManager.add(config, cacheConfigForGroup);
        result.setStatus(StatusCode.OK,
            "Successfully updated configuration for " + finalGroup + ".");
      } catch (Exception e) {
        String message = "Failed to update cluster configuration for " + finalGroup + ".";
        logger.error(message, e);
        result.setStatus(StatusCode.FAIL_TO_PERSIST, message);
        return null;
      }
      return cacheConfigForGroup;
    });

    // add the config object which includes the HATOS information of the element created
    if (result.isSuccessful() && config instanceof RestfulEndpoint) {
      result.setUri(((RestfulEndpoint) config).getUri());
    }
    return assertSuccessful(result);
  }

  @Override
  public <T extends CacheElement> ClusterManagementRealizationResult delete(
      T config) {
    // validate that user used the correct config object type
    ConfigurationManager configurationManager = getConfigurationManager(config);

    if (persistenceService == null) {
      return assertSuccessful(new ClusterManagementRealizationResult(StatusCode.ERROR,
          "Cluster configuration service needs to be enabled."));
    }

    try {
      // first validate common attributes of all configuration object
      commonValidator.validate(CacheElementOperation.DELETE, config);

      ConfigurationValidator validator = validators.get(config.getClass());
      if (validator != null) {
        validator.validate(CacheElementOperation.DELETE, config);
      }
    } catch (IllegalArgumentException e) {
      raise(StatusCode.ILLEGAL_ARGUMENT, e);
    }

    String[] groupsWithThisElement =
        memberValidator.findGroupsWithThisElement(config.getId(), configurationManager);
    if (groupsWithThisElement.length == 0) {
      raise(StatusCode.ENTITY_NOT_FOUND,
          config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
    }

    // execute function on all members
    ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

    List<RealizationResult> functionResults = executeAndGetFunctionResult(
        new CacheRealizationFunction(),
        Arrays.asList(config, CacheElementOperation.DELETE),
        memberValidator.findServers(groupsWithThisElement));
    functionResults.forEach(result::addMemberStatus);

    // if any false result is added to the member list
    if (result.getStatusCode() != StatusCode.OK) {
      result.setStatus(StatusCode.ERROR, "Failed to delete on all members.");
      return result;
    }

    // persist configuration in cache config
    List<String> updatedGroups = new ArrayList<>();
    List<String> failedGroups = new ArrayList<>();
    for (String finalGroup : groupsWithThisElement) {
      persistenceService.updateCacheConfig(finalGroup, cacheConfigForGroup -> {
        try {
          configurationManager.delete(config, cacheConfigForGroup);
          updatedGroups.add(finalGroup);
        } catch (Exception e) {
          logger.error("Failed to update cluster configuration for " + finalGroup + ".", e);
          failedGroups.add(finalGroup);
          return null;
        }
        return cacheConfigForGroup;
      });
    }

    if (failedGroups.isEmpty()) {
      result.setStatus(StatusCode.OK,
          "Successfully removed configuration for " + updatedGroups + ".");
    } else {
      String message = "Failed to update cluster configuration for " + failedGroups + ".";
      result.setStatus(StatusCode.FAIL_TO_PERSIST, message);
    }

    return assertSuccessful(result);
  }

  @Override
  public <T extends CacheElement> ClusterManagementRealizationResult update(
      T config) {
    throw new NotImplementedException("Not implemented");
  }

  @Override
  public <T extends CacheElement & CorrespondWith<R>, R extends RuntimeInfo> ClusterManagementListResult<T, R> list(
      T filter) {
    ClusterManagementListResult<T, R> result = new ClusterManagementListResult<>();

    if (persistenceService == null) {
      return assertSuccessful(new ClusterManagementListResult<>(StatusCode.ERROR,
          "Cluster configuration service needs to be enabled."));
    }

    List<T> resultList = new ArrayList<>();

    if (filter instanceof MemberConfig) {
      resultList.add(filter);
    } else {
      ConfigurationManager<T> manager = getConfigurationManager(filter);
      // gather elements on all the groups, consolidate the group information and then do the filter
      // so that when we filter by a specific group, we still show that a particular element might
      // also belong to another group.
      for (String group : persistenceService.getGroups()) {
        CacheConfig currentPersistedConfig = persistenceService.getCacheConfig(group, true);
        List<T> listInGroup = manager.list(filter, currentPersistedConfig);
        for (T element : listInGroup) {
          element.setGroup(group);
          resultList.add(element);
        }
      }

      // if empty result, return immediately
      if (resultList.size() == 0) {
        return result;
      }

      // right now the list contains [{regionA, group1}, {regionA, group2}...], if the elements are
      // MultiGroupCacheElement, we need to consolidate the list into [{regionA, [group1, group2]}
      List<T> consolidatedResultList = new ArrayList<>();
      for (T element : resultList) {
        int index = consolidatedResultList.indexOf(element);
        if (index >= 0) {
          T exist = consolidatedResultList.get(index);
          exist.addGroup(element.getGroup());
        } else {
          consolidatedResultList.add(element);
        }
      }
      if (StringUtils.isNotBlank(filter.getGroup())) {
        consolidatedResultList = consolidatedResultList.stream()
            .filter(e -> (e.getGroups().contains(filter.getConfigGroup())))
            .collect(Collectors.toList());
      }
      resultList = consolidatedResultList;
    }

    // gather the runtime info for each configuration objects
    List<ConfigurationResult<T, R>> responses = new ArrayList<>();
    boolean hasRuntimeInfo = filter.hasRuntimeInfo();

    for (T element : resultList) {
      List<String> groups = element.getGroups();
      ConfigurationResult<T, R> response = new ConfigurationResult<>(element);

      // if "cluster" is the only group, clear it, so that the returning json does not show
      // "cluster" as a group value
      if (element.getGroups().size() == 1 && CacheElement.CLUSTER.equals(element.getGroup())) {
        element.getGroups().clear();
      }

      responses.add(response);
      // do not gather runtime if this type of CacheElement is RespondWith<RuntimeInfo>
      if (!hasRuntimeInfo) {
        continue;
      }

      Set<DistributedMember> members;

      if (filter instanceof MemberConfig) {
        members =
            memberValidator.findMembers(filter.getId(), filter.getGroups().toArray(new String[0]));
      } else {
        members = memberValidator.findServers(groups.toArray(new String[0]));
      }

      // no member belongs to these groups
      if (members.size() == 0) {
        continue;
      }

      // if this cacheElement's runtime info only contains global info (no per member info), we will
      // only need to issue get function on any member instead of all of them.
      if (element.isGlobalRuntime()) {
        members = Collections.singleton(members.iterator().next());
      }

      List<R> runtimeInfos = executeAndGetFunctionResult(new CacheRealizationFunction(),
          Arrays.asList(element, CacheElementOperation.GET),
          members);
      response.setRuntimeInfo(runtimeInfos);
    }

    result.setResult(responses);
    return assertSuccessful(result);
  }

  @Override
  public <T extends CacheElement & CorrespondWith<R>, R extends RuntimeInfo> ClusterManagementListResult<T, R> get(
      T config) {
    ClusterManagementListResult<T, R> list = list(config);
    List<ConfigurationResult<T, R>> result = list.getResult();

    if (result.size() == 0) {
      raise(StatusCode.ENTITY_NOT_FOUND,
          config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
    }

    if (result.size() > 1) {
      raise(StatusCode.ERROR,
          "Expect only one matching " + config.getClass().getSimpleName() + ".");
    }
    return assertSuccessful(list);
  }

  @Override
  public <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> start(
      A op) {
    OperationInstance<A, V> operationInstance = operationManager.submit(op);
    if (op instanceof TaggedWithOperator) {
      operationInstance.setOperator(((TaggedWithOperator) op).getOperator());
    }

    ClusterManagementResult result = new ClusterManagementResult(
        StatusCode.ACCEPTED, "Operation started.  Use the URI to check its status.");

    return assertSuccessful(toClusterManagementListOperationsResult(result, operationInstance));
  }

  @Override
  public <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementListOperationsResult<V> list(
      A opType) {
    return assertSuccessful(new ClusterManagementListOperationsResult<>(
        operationManager.listOperationInstances(opType).stream()
            .map(this::toClusterManagementListOperationsResult).collect(Collectors.toList())));
  }

  /**
   * builds a base status from the state of a future result
   */
  private static <V extends OperationResult> ClusterManagementResult getStatus(
      CompletableFuture<V> future) {
    if (future.isCompletedExceptionally()) {
      String error = "Operation failed.";
      try {
        future.get();
      } catch (InterruptedException ignore) {
        Thread.currentThread().interrupt();
      } catch (ExecutionException e) {
        error = e.getMessage();
      }
      return new ClusterManagementResult(StatusCode.ERROR, error);
    } else if (future.isDone()) {
      return new ClusterManagementResult(StatusCode.OK, "Operation finished successfully.");
    } else {
      return new ClusterManagementResult(StatusCode.IN_PROGRESS, "Operation in progress.");
    }
  }

  /**
   * builds a result object from a base status and an operation instance
   */
  private <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> toClusterManagementListOperationsResult(
      ClusterManagementResult status, OperationInstance<A, V> operationInstance) {
    ClusterManagementOperationResult<V> result = new ClusterManagementOperationResult<>(status,
        operationInstance.getFutureResult(), operationInstance.getOperationStart(),
        operationInstance.getFutureOperationEnded(), operationInstance.getOperator());
    result.setUri(RestfulEndpoint.URI_CONTEXT + RestfulEndpoint.URI_VERSION
        + operationInstance.getOperation().getEndpoint() + "/" + operationInstance.getId());
    return result;
  }

  /**
   * builds a result object from an operation instance
   */
  private <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> toClusterManagementListOperationsResult(
      OperationInstance<A, V> operationInstance) {
    return toClusterManagementListOperationsResult(getStatus(operationInstance.getFutureResult()),
        operationInstance);
  }

  /**
   * this is intended for use by the REST controller. for Java usage, please use
   * {@link ClusterManagementOperationResult#getFutureResult()}
   */
  public <V extends OperationResult> ClusterManagementOperationStatusResult<V> checkStatus(
      String opId) {
    final OperationInstance<?, V> operationInstance = operationManager.getOperationInstance(opId);
    if (operationInstance == null) {
      raise(StatusCode.ENTITY_NOT_FOUND, "Operation '" + opId + "' does not exist.");
    }
    final CompletableFuture<V> status = operationInstance.getFutureResult();
    ClusterManagementOperationStatusResult<V> result =
        new ClusterManagementOperationStatusResult<>(getStatus(status));
    result.setOperator(operationInstance.getOperator());
    result.setOperationStart(operationInstance.getOperationStart());
    if (status.isDone() && !status.isCompletedExceptionally()) {
      try {
        result.setOperationEnded(operationInstance.getFutureOperationEnded().get());
        result.setResult(status.get());
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } catch (ExecutionException ignore) {
      }
    }
    return result;
  }

  private <T extends ClusterManagementResult> T assertSuccessful(T result) {
    if (!result.isSuccessful()) {
      if (result instanceof ClusterManagementRealizationResult) {
        throw new ClusterManagementRealizationException(
            (ClusterManagementRealizationResult) result);
      } else {
        throw new ClusterManagementException(result);
      }
    }
    return result;
  }

  private static void raise(StatusCode statusCode, String statusMessage) {
    throw new ClusterManagementException(new ClusterManagementResult(statusCode, statusMessage));
  }

  private static void raise(StatusCode statusCode, Exception e) {
    throw new ClusterManagementException(new ClusterManagementResult(statusCode, e.getMessage()),
        e);
  }

  public boolean isConnected() {
    return true;
  }

  @Override
  public void close() {
    operationManager.close();
  }

  private <T extends CacheElement> ConfigurationManager<T> getConfigurationManager(
      T config) {
    ConfigurationManager configurationManager = managers.get(config.getClass());
    if (configurationManager == null) {
      raise(StatusCode.ILLEGAL_ARGUMENT, String.format("%s is not supported.",
          config.getClass().getSimpleName()));
    }

    return configurationManager;
  }

  @VisibleForTesting
  <R> List<R> executeAndGetFunctionResult(Function function, Object args,
      Set<DistributedMember> targetMembers) {
    if (targetMembers.size() == 0) {
      return Collections.emptyList();
    }

    Execution execution = FunctionService.onMembers(targetMembers).setArguments(args);
    ((AbstractExecution) execution).setIgnoreDepartedMembers(true);
    ResultCollector rc = execution.execute(function);

    return (List<R>) rc.getResult();
  }
}
