blob: 562ea6964acd7fc2c5af1c70ac53e376c561fa9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.management.internal.api;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.CacheElement;
import org.apache.geode.cache.configuration.GatewayReceiverConfig;
import org.apache.geode.cache.configuration.RegionConfig;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.api.ClusterManagementException;
import org.apache.geode.management.api.ClusterManagementListOperationsResult;
import org.apache.geode.management.api.ClusterManagementListResult;
import org.apache.geode.management.api.ClusterManagementOperation;
import org.apache.geode.management.api.ClusterManagementOperationResult;
import org.apache.geode.management.api.ClusterManagementRealizationException;
import org.apache.geode.management.api.ClusterManagementRealizationResult;
import org.apache.geode.management.api.ClusterManagementResult;
import org.apache.geode.management.api.ClusterManagementResult.StatusCode;
import org.apache.geode.management.api.ClusterManagementService;
import org.apache.geode.management.api.ConfigurationResult;
import org.apache.geode.management.api.CorrespondWith;
import org.apache.geode.management.api.RealizationResult;
import org.apache.geode.management.api.RestfulEndpoint;
import org.apache.geode.management.configuration.MemberConfig;
import org.apache.geode.management.configuration.Pdx;
import org.apache.geode.management.internal.CacheElementOperation;
import org.apache.geode.management.internal.ClusterManagementOperationStatusResult;
import org.apache.geode.management.internal.cli.functions.CacheRealizationFunction;
import org.apache.geode.management.internal.configuration.mutators.ConfigurationManager;
import org.apache.geode.management.internal.configuration.mutators.GatewayReceiverConfigManager;
import org.apache.geode.management.internal.configuration.mutators.PdxManager;
import org.apache.geode.management.internal.configuration.mutators.RegionConfigManager;
import org.apache.geode.management.internal.configuration.validators.CacheElementValidator;
import org.apache.geode.management.internal.configuration.validators.ConfigurationValidator;
import org.apache.geode.management.internal.configuration.validators.GatewayReceiverConfigValidator;
import org.apache.geode.management.internal.configuration.validators.MemberValidator;
import org.apache.geode.management.internal.configuration.validators.RegionConfigValidator;
import org.apache.geode.management.internal.exceptions.EntityExistsException;
import org.apache.geode.management.internal.operation.OperationHistoryManager;
import org.apache.geode.management.internal.operation.OperationHistoryManager.OperationInstance;
import org.apache.geode.management.internal.operation.OperationManager;
import org.apache.geode.management.internal.operation.TaggedWithOperator;
import org.apache.geode.management.runtime.OperationResult;
import org.apache.geode.management.runtime.RuntimeInfo;
public class LocatorClusterManagementService implements ClusterManagementService {
private static final Logger logger = LogService.getLogger();
private final ConfigurationPersistenceService persistenceService;
private final Map<Class, ConfigurationManager> managers;
private final Map<Class, ConfigurationValidator> validators;
private final OperationManager operationManager;
private final MemberValidator memberValidator;
private final CacheElementValidator commonValidator;
public LocatorClusterManagementService(InternalCache cache,
ConfigurationPersistenceService persistenceService) {
this(persistenceService, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(),
new MemberValidator(cache, persistenceService), new CacheElementValidator(),
new OperationManager(cache, new OperationHistoryManager()));
// initialize the list of managers
managers.put(RegionConfig.class, new RegionConfigManager());
managers.put(Pdx.class, new PdxManager());
managers.put(GatewayReceiverConfig.class, new GatewayReceiverConfigManager(cache));
// initialize the list of validators
validators.put(RegionConfig.class, new RegionConfigValidator(cache));
validators.put(GatewayReceiverConfig.class, new GatewayReceiverConfigValidator());
}
@VisibleForTesting
public LocatorClusterManagementService(ConfigurationPersistenceService persistenceService,
Map<Class, ConfigurationManager> managers,
Map<Class, ConfigurationValidator> validators,
MemberValidator memberValidator,
CacheElementValidator commonValidator,
OperationManager operationManager) {
this.persistenceService = persistenceService;
this.managers = managers;
this.validators = validators;
this.memberValidator = memberValidator;
this.commonValidator = commonValidator;
this.operationManager = operationManager;
}
@Override
public <T extends CacheElement> ClusterManagementRealizationResult create(T config) {
// validate that user used the correct config object type
ConfigurationManager configurationManager = getConfigurationManager(config);
if (persistenceService == null) {
return assertSuccessful(new ClusterManagementRealizationResult(StatusCode.ERROR,
"Cluster configuration service needs to be enabled."));
}
String group = config.getConfigGroup();
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.CREATE, config);
ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.CREATE, config);
}
// check if this config already exists on all/some members of this group
memberValidator.validateCreate(config, configurationManager);
// execute function on all members
} catch (EntityExistsException e) {
raise(StatusCode.ENTITY_EXISTS, e);
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}
Set<DistributedMember> targetedMembers = memberValidator.findServers(group);
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
Arrays.asList(config, CacheElementOperation.CREATE),
targetedMembers);
functionResults.forEach(result::addMemberStatus);
// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to create on all members.");
return assertSuccessful(result);
}
// persist configuration in cache config
final String finalGroup = group; // the below lambda requires a reference that is final
persistenceService.updateCacheConfig(finalGroup, cacheConfigForGroup -> {
try {
configurationManager.add(config, cacheConfigForGroup);
result.setStatus(StatusCode.OK,
"Successfully updated configuration for " + finalGroup + ".");
} catch (Exception e) {
String message = "Failed to update cluster configuration for " + finalGroup + ".";
logger.error(message, e);
result.setStatus(StatusCode.FAIL_TO_PERSIST, message);
return null;
}
return cacheConfigForGroup;
});
// add the config object which includes the HATOS information of the element created
if (result.isSuccessful() && config instanceof RestfulEndpoint) {
result.setUri(((RestfulEndpoint) config).getUri());
}
return assertSuccessful(result);
}
@Override
public <T extends CacheElement> ClusterManagementRealizationResult delete(
T config) {
// validate that user used the correct config object type
ConfigurationManager configurationManager = getConfigurationManager(config);
if (persistenceService == null) {
return assertSuccessful(new ClusterManagementRealizationResult(StatusCode.ERROR,
"Cluster configuration service needs to be enabled."));
}
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.DELETE, config);
ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.DELETE, config);
}
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}
String[] groupsWithThisElement =
memberValidator.findGroupsWithThisElement(config.getId(), configurationManager);
if (groupsWithThisElement.length == 0) {
raise(StatusCode.ENTITY_NOT_FOUND,
config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
}
// execute function on all members
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
Arrays.asList(config, CacheElementOperation.DELETE),
memberValidator.findServers(groupsWithThisElement));
functionResults.forEach(result::addMemberStatus);
// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to delete on all members.");
return result;
}
// persist configuration in cache config
List<String> updatedGroups = new ArrayList<>();
List<String> failedGroups = new ArrayList<>();
for (String finalGroup : groupsWithThisElement) {
persistenceService.updateCacheConfig(finalGroup, cacheConfigForGroup -> {
try {
configurationManager.delete(config, cacheConfigForGroup);
updatedGroups.add(finalGroup);
} catch (Exception e) {
logger.error("Failed to update cluster configuration for " + finalGroup + ".", e);
failedGroups.add(finalGroup);
return null;
}
return cacheConfigForGroup;
});
}
if (failedGroups.isEmpty()) {
result.setStatus(StatusCode.OK,
"Successfully removed configuration for " + updatedGroups + ".");
} else {
String message = "Failed to update cluster configuration for " + failedGroups + ".";
result.setStatus(StatusCode.FAIL_TO_PERSIST, message);
}
return assertSuccessful(result);
}
@Override
public <T extends CacheElement> ClusterManagementRealizationResult update(
T config) {
throw new NotImplementedException("Not implemented");
}
@Override
public <T extends CacheElement & CorrespondWith<R>, R extends RuntimeInfo> ClusterManagementListResult<T, R> list(
T filter) {
ClusterManagementListResult<T, R> result = new ClusterManagementListResult<>();
if (persistenceService == null) {
return assertSuccessful(new ClusterManagementListResult<>(StatusCode.ERROR,
"Cluster configuration service needs to be enabled."));
}
List<T> resultList = new ArrayList<>();
if (filter instanceof MemberConfig) {
resultList.add(filter);
} else {
ConfigurationManager<T> manager = getConfigurationManager(filter);
// gather elements on all the groups, consolidate the group information and then do the filter
// so that when we filter by a specific group, we still show that a particular element might
// also belong to another group.
for (String group : persistenceService.getGroups()) {
CacheConfig currentPersistedConfig = persistenceService.getCacheConfig(group, true);
List<T> listInGroup = manager.list(filter, currentPersistedConfig);
for (T element : listInGroup) {
element.setGroup(group);
resultList.add(element);
}
}
// if empty result, return immediately
if (resultList.size() == 0) {
return result;
}
// right now the list contains [{regionA, group1}, {regionA, group2}...], if the elements are
// MultiGroupCacheElement, we need to consolidate the list into [{regionA, [group1, group2]}
List<T> consolidatedResultList = new ArrayList<>();
for (T element : resultList) {
int index = consolidatedResultList.indexOf(element);
if (index >= 0) {
T exist = consolidatedResultList.get(index);
exist.addGroup(element.getGroup());
} else {
consolidatedResultList.add(element);
}
}
if (StringUtils.isNotBlank(filter.getGroup())) {
consolidatedResultList = consolidatedResultList.stream()
.filter(e -> (e.getGroups().contains(filter.getConfigGroup())))
.collect(Collectors.toList());
}
resultList = consolidatedResultList;
}
// gather the runtime info for each configuration objects
List<ConfigurationResult<T, R>> responses = new ArrayList<>();
boolean hasRuntimeInfo = filter.hasRuntimeInfo();
for (T element : resultList) {
List<String> groups = element.getGroups();
ConfigurationResult<T, R> response = new ConfigurationResult<>(element);
// if "cluster" is the only group, clear it, so that the returning json does not show
// "cluster" as a group value
if (element.getGroups().size() == 1 && CacheElement.CLUSTER.equals(element.getGroup())) {
element.getGroups().clear();
}
responses.add(response);
// do not gather runtime if this type of CacheElement is RespondWith<RuntimeInfo>
if (!hasRuntimeInfo) {
continue;
}
Set<DistributedMember> members;
if (filter instanceof MemberConfig) {
members =
memberValidator.findMembers(filter.getId(), filter.getGroups().toArray(new String[0]));
} else {
members = memberValidator.findServers(groups.toArray(new String[0]));
}
// no member belongs to these groups
if (members.size() == 0) {
continue;
}
// if this cacheElement's runtime info only contains global info (no per member info), we will
// only need to issue get function on any member instead of all of them.
if (element.isGlobalRuntime()) {
members = Collections.singleton(members.iterator().next());
}
List<R> runtimeInfos = executeAndGetFunctionResult(new CacheRealizationFunction(),
Arrays.asList(element, CacheElementOperation.GET),
members);
response.setRuntimeInfo(runtimeInfos);
}
result.setResult(responses);
return assertSuccessful(result);
}
@Override
public <T extends CacheElement & CorrespondWith<R>, R extends RuntimeInfo> ClusterManagementListResult<T, R> get(
T config) {
ClusterManagementListResult<T, R> list = list(config);
List<ConfigurationResult<T, R>> result = list.getResult();
if (result.size() == 0) {
raise(StatusCode.ENTITY_NOT_FOUND,
config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
}
if (result.size() > 1) {
raise(StatusCode.ERROR,
"Expect only one matching " + config.getClass().getSimpleName() + ".");
}
return assertSuccessful(list);
}
@Override
public <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> start(
A op) {
OperationInstance<A, V> operationInstance = operationManager.submit(op);
if (op instanceof TaggedWithOperator) {
operationInstance.setOperator(((TaggedWithOperator) op).getOperator());
}
ClusterManagementResult result = new ClusterManagementResult(
StatusCode.ACCEPTED, "Operation started. Use the URI to check its status.");
return assertSuccessful(toClusterManagementListOperationsResult(result, operationInstance));
}
@Override
public <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementListOperationsResult<V> list(
A opType) {
return assertSuccessful(new ClusterManagementListOperationsResult<>(
operationManager.listOperationInstances(opType).stream()
.map(this::toClusterManagementListOperationsResult).collect(Collectors.toList())));
}
/**
* builds a base status from the state of a future result
*/
private static <V extends OperationResult> ClusterManagementResult getStatus(
CompletableFuture<V> future) {
if (future.isCompletedExceptionally()) {
String error = "Operation failed.";
try {
future.get();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
error = e.getMessage();
}
return new ClusterManagementResult(StatusCode.ERROR, error);
} else if (future.isDone()) {
return new ClusterManagementResult(StatusCode.OK, "Operation finished successfully.");
} else {
return new ClusterManagementResult(StatusCode.IN_PROGRESS, "Operation in progress.");
}
}
/**
* builds a result object from a base status and an operation instance
*/
private <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> toClusterManagementListOperationsResult(
ClusterManagementResult status, OperationInstance<A, V> operationInstance) {
ClusterManagementOperationResult<V> result = new ClusterManagementOperationResult<>(status,
operationInstance.getFutureResult(), operationInstance.getOperationStart(),
operationInstance.getFutureOperationEnded(), operationInstance.getOperator());
result.setUri(RestfulEndpoint.URI_CONTEXT + RestfulEndpoint.URI_VERSION
+ operationInstance.getOperation().getEndpoint() + "/" + operationInstance.getId());
return result;
}
/**
* builds a result object from an operation instance
*/
private <A extends ClusterManagementOperation<V>, V extends OperationResult> ClusterManagementOperationResult<V> toClusterManagementListOperationsResult(
OperationInstance<A, V> operationInstance) {
return toClusterManagementListOperationsResult(getStatus(operationInstance.getFutureResult()),
operationInstance);
}
/**
* this is intended for use by the REST controller. for Java usage, please use
* {@link ClusterManagementOperationResult#getFutureResult()}
*/
public <V extends OperationResult> ClusterManagementOperationStatusResult<V> checkStatus(
String opId) {
final OperationInstance<?, V> operationInstance = operationManager.getOperationInstance(opId);
if (operationInstance == null) {
raise(StatusCode.ENTITY_NOT_FOUND, "Operation '" + opId + "' does not exist.");
}
final CompletableFuture<V> status = operationInstance.getFutureResult();
ClusterManagementOperationStatusResult<V> result =
new ClusterManagementOperationStatusResult<>(getStatus(status));
result.setOperator(operationInstance.getOperator());
result.setOperationStart(operationInstance.getOperationStart());
if (status.isDone() && !status.isCompletedExceptionally()) {
try {
result.setOperationEnded(operationInstance.getFutureOperationEnded().get());
result.setResult(status.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException ignore) {
}
}
return result;
}
private <T extends ClusterManagementResult> T assertSuccessful(T result) {
if (!result.isSuccessful()) {
if (result instanceof ClusterManagementRealizationResult) {
throw new ClusterManagementRealizationException(
(ClusterManagementRealizationResult) result);
} else {
throw new ClusterManagementException(result);
}
}
return result;
}
private static void raise(StatusCode statusCode, String statusMessage) {
throw new ClusterManagementException(new ClusterManagementResult(statusCode, statusMessage));
}
private static void raise(StatusCode statusCode, Exception e) {
throw new ClusterManagementException(new ClusterManagementResult(statusCode, e.getMessage()),
e);
}
public boolean isConnected() {
return true;
}
@Override
public void close() {
operationManager.close();
}
private <T extends CacheElement> ConfigurationManager<T> getConfigurationManager(
T config) {
ConfigurationManager configurationManager = managers.get(config.getClass());
if (configurationManager == null) {
raise(StatusCode.ILLEGAL_ARGUMENT, String.format("%s is not supported.",
config.getClass().getSimpleName()));
}
return configurationManager;
}
@VisibleForTesting
<R> List<R> executeAndGetFunctionResult(Function function, Object args,
Set<DistributedMember> targetMembers) {
if (targetMembers.size() == 0) {
return Collections.emptyList();
}
Execution execution = FunctionService.onMembers(targetMembers).setArguments(args);
((AbstractExecution) execution).setIgnoreDepartedMembers(true);
ResultCollector rc = execution.execute(function);
return (List<R>) rc.getResult();
}
}