blob: 18c2f5121ad5bebeefb13b2ecb8369e99165672e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.rest.internal.web.controllers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.logging.log4j.Logger;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.execute.NoResult;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.exceptions.EntityNotFoundException;
import org.apache.geode.rest.internal.web.exception.GemfireRestException;
import org.apache.geode.rest.internal.web.util.ArrayUtils;
import org.apache.geode.rest.internal.web.util.JSONUtils;
import org.apache.geode.security.ResourcePermission;
/**
* The FunctionsController class serving REST Requests related to the function execution
*
* @see org.springframework.stereotype.Controller
* @since GemFire 8.0
*/
@Controller("functionController")
@Api(value = "functions", description = "Rest api for gemfire function execution",
tags = "functions")
@RequestMapping(FunctionAccessController.REST_API_VERSION + "/functions")
@SuppressWarnings("unused")
public class FunctionAccessController extends AbstractBaseController {
// Constant String value indicating the version of the REST API.
static final String REST_API_VERSION = "/v1";
private static final Logger logger = LogService.getLogger();
/**
* Gets the version of the REST API implemented by this @Controller.
*
* @return a String indicating the REST API version.
*/
@Override
protected String getRestApiVersion() {
return REST_API_VERSION;
}
/**
* list all registered functions in Gemfire data node
*
* @return result as a JSON document.
*/
@RequestMapping(method = RequestMethod.GET, produces = {MediaType.APPLICATION_JSON_UTF8_VALUE})
@ApiOperation(value = "list all functions",
notes = "list all functions available in the GemFire cluster")
@ApiResponses({@ApiResponse(code = 200, message = "OK."),
@ApiResponse(code = 401, message = "Invalid Username or Password."),
@ApiResponse(code = 403, message = "Insufficient privileges for operation."),
@ApiResponse(code = 500, message = "GemFire throws an error or exception.")})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
@PreAuthorize("@securityService.authorize('DATA', 'READ')")
public ResponseEntity<?> list() {
logger.debug("Listing all registered Functions in GemFire...");
final Map<String, Function> registeredFunctions = FunctionService.getRegisteredFunctions();
String listFunctionsAsJson =
JSONUtils.formulateJsonForListFunctionsCall(registeredFunctions.keySet());
final HttpHeaders headers = new HttpHeaders();
headers.setLocation(toUri("functions"));
return new ResponseEntity<>(listFunctionsAsJson, headers, HttpStatus.OK);
}
/**
* Execute a function on Gemfire data node using REST API call. Arguments to the function are
* passed as JSON string in the request body.
*
* @param functionId represents function to be executed
* @param region list of regions on which function to be executed.
* @param members list of nodes on which function to be executed.
* @param groups list of groups on which function to be executed.
* @param filter list of keys which the function will use to determine on which node to execute
* the function.
* @param argsInBody function argument as a JSON document
* @return result as a JSON document
*/
@RequestMapping(method = RequestMethod.POST, value = "/{functionId:.+}",
produces = {MediaType.APPLICATION_JSON_VALUE})
@ApiOperation(value = "execute function",
notes = "Execute function with arguments on regions, members, or group(s). By default function will be executed on all nodes if none of (onRegion, onMembers, onGroups) specified")
@ApiResponses({@ApiResponse(code = 200, message = "OK."),
@ApiResponse(code = 401, message = "Invalid Username or Password."),
@ApiResponse(code = 403, message = "Insufficient privileges for operation."),
@ApiResponse(code = 500, message = "if GemFire throws an error or exception"),
@ApiResponse(code = 400,
message = "if Function arguments specified as JSON document in the request body is invalid")})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<String> execute(@PathVariable("functionId") String functionId,
@RequestParam(value = "onRegion", required = false) String region,
@RequestParam(value = "onMembers", required = false) final String[] members,
@RequestParam(value = "onGroups", required = false) final String[] groups,
@RequestParam(value = "filter", required = false) final String[] filter,
@RequestBody(required = false) final String argsInBody) {
Function function = FunctionService.getFunction(functionId);
// this exception will be handled by BaseControllerAdvice to eventually return a 404
if (function == null) {
throw new EntityNotFoundException(
String.format("The function %s is not registered.", functionId));
}
Object[] args = null;
if (argsInBody != null) {
args = jsonToObjectArray(argsInBody);
}
// check for required permissions of the function
Collection<ResourcePermission> requiredPermissions =
function.getRequiredPermissions(region, args);
for (ResourcePermission requiredPermission : requiredPermissions) {
securityService.authorize(requiredPermission);
}
Execution execution = null;
functionId = decode(functionId);
if (StringUtils.hasText(region)) {
logger.debug("Executing Function ({}) with arguments ({}) on Region ({})...", functionId,
ArrayUtils.toString(argsInBody), region);
region = decode(region);
try {
execution = FunctionService.onRegion(getRegion(region));
} catch (FunctionException fe) {
throw new GemfireRestException(
String.format("The Region identified by name (%1$s) could not found!", region), fe);
}
} else if (ArrayUtils.isNotEmpty(members)) {
logger.debug("Executing Function ({}) with arguments ({}) on Member ({})...", functionId,
ArrayUtils.toString(argsInBody), ArrayUtils.toString(members));
try {
execution = FunctionService.onMembers(getMembers(members));
} catch (FunctionException fe) {
throw new GemfireRestException(
"Could not found the specified members in distributed system!", fe);
}
} else if (ArrayUtils.isNotEmpty(groups)) {
logger.debug("Executing Function ({}) with arguments ({}) on Groups ({})...", functionId,
ArrayUtils.toString(argsInBody), ArrayUtils.toString(groups));
try {
execution = FunctionService.onMembers(groups);
} catch (FunctionException fe) {
throw new GemfireRestException("no member(s) are found belonging to the provided group(s)!",
fe);
}
} else {
// Default case is to execute function on all existing data node in DS, document this.
logger.debug("Executing Function ({}) with arguments ({}) on all Members...", functionId,
ArrayUtils.toString(argsInBody));
try {
execution = FunctionService.onMembers(getAllMembersInDS());
} catch (FunctionException fe) {
throw new GemfireRestException(
"Distributed system does not contain any valid data node to run the specified function!",
fe);
}
}
if (!ArrayUtils.isEmpty(filter)) {
logger.debug("Executing Function ({}) with filter ({})", functionId,
ArrayUtils.toString(filter));
Set filter1 = ArrayUtils.asSet(filter);
execution = execution.withFilter(filter1);
}
final ResultCollector<?, ?> results;
try {
if (args != null) {
// execute function with specified arguments
if (args.length == 1) {
results = execution.setArguments(args[0]).execute(functionId);
} else {
results = execution.setArguments(args).execute(functionId);
}
} else {
// execute function with no args
results = execution.execute(functionId);
}
} catch (ClassCastException cce) {
throw new GemfireRestException("Key is of an inappropriate type for this region!", cce);
} catch (NullPointerException npe) {
throw new GemfireRestException(
"Specified key is null and this region does not permit null keys!", npe);
} catch (LowMemoryException lme) {
throw new GemfireRestException("Server has encountered low memory condition!", lme);
} catch (IllegalArgumentException ie) {
throw new GemfireRestException("Input parameter is null! ", ie);
} catch (FunctionException fe) {
throw new GemfireRestException("Server has encountered error while executing the function!",
fe);
}
try {
final HttpHeaders headers = new HttpHeaders();
headers.setLocation(toUri("functions", functionId));
Object functionResult = null;
if (results instanceof NoResult) {
return new ResponseEntity<>("", headers, HttpStatus.OK);
}
functionResult = results.getResult();
if (functionResult instanceof List<?>) {
@SuppressWarnings("unchecked")
String functionResultAsJson =
JSONUtils.convertCollectionToJson((ArrayList<Object>) functionResult);
return new ResponseEntity<>(functionResultAsJson, headers, HttpStatus.OK);
} else {
throw new GemfireRestException(
"Function has returned results that could not be converted into Restful (JSON) format!");
}
} catch (FunctionException fe) {
fe.printStackTrace();
throw new GemfireRestException(
"Server has encountered an error while processing function execution!", fe);
}
}
}