/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.rest.internal.web.controllers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.logging.log4j.Logger;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;

import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.execute.NoResult;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.exceptions.EntityNotFoundException;
import org.apache.geode.rest.internal.web.exception.GemfireRestException;
import org.apache.geode.rest.internal.web.util.ArrayUtils;
import org.apache.geode.rest.internal.web.util.JSONUtils;
import org.apache.geode.security.ResourcePermission;

/**
 * The FunctionsController class serving REST Requests related to the function execution
 *
 * @see org.springframework.stereotype.Controller
 * @since GemFire 8.0
 */
@Controller("functionController")
@Api(value = "functions", description = "Rest api for gemfire function execution",
    tags = "functions")
@RequestMapping(FunctionAccessController.REST_API_VERSION + "/functions")
@SuppressWarnings("unused")
public class FunctionAccessController extends AbstractBaseController {
  // Constant String value indicating the version of the REST API.
  static final String REST_API_VERSION = "/v1";
  private static final Logger logger = LogService.getLogger();

  /**
   * Gets the version of the REST API implemented by this @Controller.
   *
   * @return a String indicating the REST API version.
   */
  @Override
  protected String getRestApiVersion() {
    return REST_API_VERSION;
  }

  /**
   * list all registered functions in Gemfire data node
   *
   * @return result as a JSON document.
   */
  @RequestMapping(method = RequestMethod.GET, produces = {MediaType.APPLICATION_JSON_UTF8_VALUE})
  @ApiOperation(value = "list all functions",
      notes = "list all functions available in the GemFire cluster")
  @ApiResponses({@ApiResponse(code = 200, message = "OK."),
      @ApiResponse(code = 401, message = "Invalid Username or Password."),
      @ApiResponse(code = 403, message = "Insufficient privileges for operation."),
      @ApiResponse(code = 500, message = "GemFire throws an error or exception.")})
  @ResponseBody
  @ResponseStatus(HttpStatus.OK)
  @PreAuthorize("@securityService.authorize('DATA', 'READ')")
  public ResponseEntity<?> list() {
    logger.debug("Listing all registered Functions in GemFire...");

    final Map<String, Function> registeredFunctions = FunctionService.getRegisteredFunctions();
    String listFunctionsAsJson =
        JSONUtils.formulateJsonForListFunctionsCall(registeredFunctions.keySet());
    final HttpHeaders headers = new HttpHeaders();
    headers.setLocation(toUri("functions"));
    return new ResponseEntity<>(listFunctionsAsJson, headers, HttpStatus.OK);
  }

  /**
   * Execute a function on Gemfire data node using REST API call. Arguments to the function are
   * passed as JSON string in the request body.
   *
   * @param functionId represents function to be executed
   * @param region list of regions on which function to be executed.
   * @param members list of nodes on which function to be executed.
   * @param groups list of groups on which function to be executed.
   * @param filter list of keys which the function will use to determine on which node to execute
   *        the function.
   * @param argsInBody function argument as a JSON document
   * @return result as a JSON document
   */
  @RequestMapping(method = RequestMethod.POST, value = "/{functionId:.+}",
      produces = {MediaType.APPLICATION_JSON_VALUE})
  @ApiOperation(value = "execute function",
      notes = "Execute function with arguments on regions, members, or group(s). By default function will be executed on all nodes if none of (onRegion, onMembers, onGroups) specified")
  @ApiResponses({@ApiResponse(code = 200, message = "OK."),
      @ApiResponse(code = 401, message = "Invalid Username or Password."),
      @ApiResponse(code = 403, message = "Insufficient privileges for operation."),
      @ApiResponse(code = 500, message = "if GemFire throws an error or exception"),
      @ApiResponse(code = 400,
          message = "if Function arguments specified as JSON document in the request body is invalid")})
  @ResponseBody
  @ResponseStatus(HttpStatus.OK)
  public ResponseEntity<String> execute(@PathVariable("functionId") String functionId,
      @RequestParam(value = "onRegion", required = false) String region,
      @RequestParam(value = "onMembers", required = false) final String[] members,
      @RequestParam(value = "onGroups", required = false) final String[] groups,
      @RequestParam(value = "filter", required = false) final String[] filter,
      @RequestBody(required = false) final String argsInBody) {

    Function function = FunctionService.getFunction(functionId);

    // this exception will be handled by BaseControllerAdvice to eventually return a 404
    if (function == null) {
      throw new EntityNotFoundException(
          String.format("The function %s is not registered.", functionId));
    }

    Object[] args = null;
    if (argsInBody != null) {
      args = jsonToObjectArray(argsInBody);
    }

    // check for required permissions of the function
    Collection<ResourcePermission> requiredPermissions =
        function.getRequiredPermissions(region, args);
    for (ResourcePermission requiredPermission : requiredPermissions) {
      securityService.authorize(requiredPermission);
    }

    Execution execution = null;
    functionId = decode(functionId);

    if (StringUtils.hasText(region)) {
      logger.debug("Executing Function ({}) with arguments ({}) on Region ({})...", functionId,
          ArrayUtils.toString(argsInBody), region);

      region = decode(region);
      try {
        execution = FunctionService.onRegion(getRegion(region));
      } catch (FunctionException fe) {
        throw new GemfireRestException(
            String.format("The Region identified by name (%1$s) could not found!", region), fe);
      }
    } else if (ArrayUtils.isNotEmpty(members)) {
      logger.debug("Executing Function ({}) with arguments ({}) on Member ({})...", functionId,
          ArrayUtils.toString(argsInBody), ArrayUtils.toString(members));

      try {
        execution = FunctionService.onMembers(getMembers(members));
      } catch (FunctionException fe) {
        throw new GemfireRestException(
            "Could not found the specified members in distributed system!", fe);
      }
    } else if (ArrayUtils.isNotEmpty(groups)) {
      logger.debug("Executing Function ({}) with arguments ({}) on Groups ({})...", functionId,
          ArrayUtils.toString(argsInBody), ArrayUtils.toString(groups));

      try {
        execution = FunctionService.onMembers(groups);
      } catch (FunctionException fe) {
        throw new GemfireRestException("no member(s) are found belonging to the provided group(s)!",
            fe);
      }
    } else {
      // Default case is to execute function on all existing data node in DS, document this.
      logger.debug("Executing Function ({}) with arguments ({}) on all Members...", functionId,
          ArrayUtils.toString(argsInBody));

      try {
        execution = FunctionService.onMembers(getAllMembersInDS());
      } catch (FunctionException fe) {
        throw new GemfireRestException(
            "Distributed system does not contain any valid data node to run the specified  function!",
            fe);
      }
    }

    if (!ArrayUtils.isEmpty(filter)) {
      logger.debug("Executing Function ({}) with filter ({})", functionId,
          ArrayUtils.toString(filter));

      Set filter1 = ArrayUtils.asSet(filter);
      execution = execution.withFilter(filter1);
    }

    final ResultCollector<?, ?> results;

    try {
      if (args != null) {
        // execute function with specified arguments
        if (args.length == 1) {
          results = execution.setArguments(args[0]).execute(functionId);
        } else {
          results = execution.setArguments(args).execute(functionId);
        }
      } else {
        // execute function with no args
        results = execution.execute(functionId);
      }
    } catch (ClassCastException cce) {
      throw new GemfireRestException("Key is of an inappropriate type for this region!", cce);
    } catch (NullPointerException npe) {
      throw new GemfireRestException(
          "Specified key is null and this region does not permit null keys!", npe);
    } catch (LowMemoryException lme) {
      throw new GemfireRestException("Server has encountered low memory condition!", lme);
    } catch (IllegalArgumentException ie) {
      throw new GemfireRestException("Input parameter is null! ", ie);
    } catch (FunctionException fe) {
      throw new GemfireRestException("Server has encountered error while executing the function!",
          fe);
    }

    try {
      final HttpHeaders headers = new HttpHeaders();
      headers.setLocation(toUri("functions", functionId));

      Object functionResult = null;
      if (results instanceof NoResult) {
        return new ResponseEntity<>("", headers, HttpStatus.OK);
      }
      functionResult = results.getResult();

      if (functionResult instanceof List<?>) {
        @SuppressWarnings("unchecked")
        String functionResultAsJson =
            JSONUtils.convertCollectionToJson((ArrayList<Object>) functionResult);
        return new ResponseEntity<>(functionResultAsJson, headers, HttpStatus.OK);
      } else {
        throw new GemfireRestException(
            "Function has returned results that could not be converted into Restful (JSON) format!");
      }
    } catch (FunctionException fe) {
      fe.printStackTrace();
      throw new GemfireRestException(
          "Server has encountered an error while processing function execution!", fe);
    }
  }
}
