blob: 21646bd420f67c2dc6843ef4851f7ae8edbe54d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_POST_PROCESSOR;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
import joptsimple.internal.Strings;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.cache.ClusterConfigurationLoader;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
import org.apache.geode.internal.config.JAXBService;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.internal.configuration.callbacks.ConfigurationChangeListener;
import org.apache.geode.management.internal.configuration.domain.Configuration;
import org.apache.geode.management.internal.configuration.domain.SharedConfigurationStatus;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse;
import org.apache.geode.management.internal.configuration.utils.XmlUtils;
public class InternalConfigurationPersistenceService implements ConfigurationPersistenceService {
private static final Logger logger = LogService.getLogger();
/**
* Name of the directory where the shared configuration artifacts are stored
*/
public static final String CLUSTER_CONFIG_ARTIFACTS_DIR_NAME = "cluster_config";
private static final String CLUSTER_CONFIG_DISK_STORE_NAME = "cluster_config";
public static final String CLUSTER_CONFIG_DISK_DIR_PREFIX = "ConfigDiskDir_";
/**
* Name of the lock service used for shared configuration
*/
private static final String SHARED_CONFIG_LOCK_SERVICE_NAME = "__CLUSTER_CONFIG_LS";
/**
* Name of the lock for locking the shared configuration
*/
private static final String SHARED_CONFIG_LOCK_NAME = "__CLUSTER_CONFIG_LOCK";
/**
* Name of the region which is used to store the configuration information
*/
public static final String CONFIG_REGION_NAME = "_ConfigurationRegion";
private static final String CACHE_CONFIG_VERSION = "1.0";
private final Path configDirPath;
private final Path configDiskDirPath;
private final Set<PersistentMemberPattern> newerSharedConfigurationLocatorInfo = new HashSet<>();
private final AtomicReference<SharedConfigurationStatus> status = new AtomicReference<>();
private final InternalCache cache;
private final DistributedLockService sharedConfigLockingService;
private final JAXBService jaxbService;
public InternalConfigurationPersistenceService(InternalCache cache, Path workingDirectory,
JAXBService jaxbService) {
this(cache,
sharedConfigLockService(cache.getDistributedSystem()),
jaxbService,
workingDirectory.resolve(CLUSTER_CONFIG_ARTIFACTS_DIR_NAME),
workingDirectory
.resolve(CLUSTER_CONFIG_DISK_DIR_PREFIX + cache.getDistributedSystem().getName()));
}
@VisibleForTesting
public InternalConfigurationPersistenceService(JAXBService jaxbService) {
this(null, null, jaxbService, null, null);
}
private InternalConfigurationPersistenceService(InternalCache cache,
DistributedLockService sharedConfigLockingService, JAXBService jaxbService,
Path configDirPath, Path configDiskDirPath) {
this.cache = cache;
this.configDirPath = configDirPath;
this.configDiskDirPath = configDiskDirPath;
this.sharedConfigLockingService = sharedConfigLockingService;
status.set(SharedConfigurationStatus.NOT_STARTED);
this.jaxbService = jaxbService;
}
/**
* Gets or creates (if not created) shared configuration lock service
*/
private static DistributedLockService sharedConfigLockService(DistributedSystem ds) {
DistributedLockService sharedConfigDls =
DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
try {
if (sharedConfigDls == null) {
sharedConfigDls = DLockService.create(SHARED_CONFIG_LOCK_SERVICE_NAME,
(InternalDistributedSystem) ds, true, true);
}
} catch (IllegalArgumentException ignore) {
return DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
}
return sharedConfigDls;
}
public JAXBService getJaxbService() {
return jaxbService;
}
/**
* Adds/replaces the xml entity in the shared configuration we don't need to trigger the change
* listener for this modification, so it's ok to operate on the original configuration object
*/
public void addXmlEntity(XmlEntity xmlEntity, String[] groups) {
lockSharedConfiguration();
try {
Region<String, Configuration> configRegion = getConfigurationRegion();
if (groups == null || groups.length == 0) {
groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
}
for (String group : groups) {
Configuration configuration = configRegion.get(group);
if (configuration == null) {
configuration = new Configuration(group);
}
String xmlContent = configuration.getCacheXmlContent();
if (xmlContent == null || xmlContent.isEmpty()) {
xmlContent = generateInitialXmlContent();
}
try {
final Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
XmlUtils.addNewNode(doc, xmlEntity);
configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
configRegion.put(group, configuration);
} catch (Exception e) {
logger.error("error updating cluster configuration for group {}", group, e);
}
}
} finally {
unlockSharedConfiguration();
}
}
/**
* Deletes the xml entity from the shared configuration.
*/
public void deleteXmlEntity(final XmlEntity xmlEntity, String[] groups) {
lockSharedConfiguration();
try {
Region<String, Configuration> configRegion = getConfigurationRegion();
// No group is specified, so delete in every single group if it exists.
if (groups == null) {
Set<String> groupSet = configRegion.keySet();
groups = groupSet.toArray(new String[0]);
}
for (String group : groups) {
Configuration configuration = configRegion.get(group);
if (configuration != null) {
String xmlContent = configuration.getCacheXmlContent();
try {
if (xmlContent != null && !xmlContent.isEmpty()) {
Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
XmlUtils.deleteNode(doc, xmlEntity);
configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
configRegion.put(group, configuration);
}
} catch (Exception e) {
logger.error("error updating cluster configuration for group {}", group, e);
}
}
}
} finally {
unlockSharedConfiguration();
}
}
/**
* we don't need to trigger the change listener for this modification, so it's ok to operate on
* the original configuration object
*/
public void modifyXmlAndProperties(Properties properties, XmlEntity xmlEntity, String[] groups) {
lockSharedConfiguration();
try {
if (groups == null) {
groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
}
Region<String, Configuration> configRegion = getConfigurationRegion();
for (String group : groups) {
Configuration configuration = configRegion.get(group);
if (configuration == null) {
configuration = new Configuration(group);
}
if (xmlEntity != null) {
String xmlContent = configuration.getCacheXmlContent();
if (xmlContent == null || xmlContent.isEmpty()) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
CacheXmlGenerator.generateDefault(pw);
xmlContent = sw.toString();
}
try {
Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
// Modify the cache attributes
XmlUtils.modifyRootAttributes(doc, xmlEntity);
// Change the xml content of the configuration and put it the config region
configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
} catch (Exception e) {
logger.error("error updating cluster configuration for group {}", group, e);
}
}
if (properties != null) {
configuration.getGemfireProperties().putAll(properties);
}
configRegion.put(group, configuration);
}
} finally {
unlockSharedConfiguration();
}
}
/**
* Add jar information into the shared configuration and save the jars in the file system used
* when deploying jars
*/
public void addJarsToThisLocator(List<String> jarFullPaths, String[] groups) throws IOException {
lockSharedConfiguration();
try {
if (groups == null) {
groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
}
Region<String, Configuration> configRegion = getConfigurationRegion();
for (String group : groups) {
Configuration configuration = configRegion.get(group);
if (configuration == null) {
configuration = new Configuration(group);
createConfigDirIfNecessary(group);
}
Path groupDir = configDirPath.resolve(group);
Set<String> jarNames = new HashSet<>();
for (String jarFullPath : jarFullPaths) {
File stagedJar = new File(jarFullPath);
jarNames.add(stagedJar.getName());
Path filePath = groupDir.resolve(stagedJar.getName());
FileUtils.copyFile(stagedJar, filePath.toFile());
}
// update the record after writing the jars to the file system, since the listener
// will need the jars on file to upload to other locators. Need to update the jars
// using a new copy of the Configuration so that the change listener will pick up the jar
// name changes.
String memberId = cache.getMyId().getId();
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.addJarNames(jarNames);
configRegion.put(group, configurationCopy, memberId);
}
} finally {
unlockSharedConfiguration();
}
}
/**
* Removes the jar files from the shared configuration. used when undeploy jars
*
* @param jarNames Names of the jar files.
* @param groups Names of the groups which had the jar file deployed.
* @return true on success.
*/
public boolean removeJars(final String[] jarNames, String[] groups) {
lockSharedConfiguration();
boolean success = true;
try {
Region<String, Configuration> configRegion = getConfigurationRegion();
if (groups == null) {
groups = configRegion.keySet().stream().toArray(String[]::new);
}
for (String group : groups) {
Configuration configuration = configRegion.get(group);
if (configuration == null) {
break;
}
for (String jarRemoved : jarNames) {
File jar = getPathToJarOnThisLocator(group, jarRemoved).toFile();
if (jar.exists()) {
try {
FileUtils.forceDelete(jar);
} catch (IOException e) {
logger.error(
"Exception occurred while attempting to delete a jar from the filesystem: {}",
jarRemoved, e);
}
}
}
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.removeJarNames(jarNames);
configRegion.put(group, configurationCopy);
}
} catch (Exception e) {
logger.info("Exception occurred while deleting the jar files", e);
success = false;
} finally {
unlockSharedConfiguration();
}
return success;
}
// Only used when a locator is initially starting up
public void downloadJarFromOtherLocators(String groupName, String jarName)
throws IllegalStateException, IOException {
logger.info("Getting Jar files from other locators");
DistributionManager dm = cache.getDistributionManager();
DistributedMember me = cache.getMyId();
List<DistributedMember> locators =
new ArrayList<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
locators.remove(me);
createConfigDirIfNecessary(groupName);
if (locators.isEmpty()) {
throw new IllegalStateException(
"Request to download jar " + jarName + " but no other locators are present");
}
downloadJarFromLocator(groupName, jarName, locators.get(0));
}
// used in the cluster config change listener when jarnames are changed in the internal region
public void downloadJarFromLocator(String groupName, String jarName,
DistributedMember sourceLocator) throws IllegalStateException, IOException {
logger.info("Downloading jar {} from locator {}", jarName, sourceLocator.getName());
createConfigDirIfNecessary(groupName);
File jarFile = downloadJar(sourceLocator, groupName, jarName);
File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
Files.copy(jarFile.toPath(), jarToWrite.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
/**
* Retrieve a deployed jar from a locator. The retrieved file is staged in a temporary location.
*
* @param locator the DistributedMember
* @param groupName the group to use when retrieving the jar
* @param jarName the name of the deployed jar
* @return a File referencing the downloaded jar. The File is downloaded to a temporary location.
*/
public File downloadJar(DistributedMember locator, String groupName, String jarName)
throws IOException {
ClusterConfigurationLoader loader = new ClusterConfigurationLoader();
return loader.downloadJar(locator, groupName, jarName);
}
/**
* Creates the shared configuration service
*
* @param loadSharedConfigFromDir when set to true, loads the configuration from the share_config
* directory
*/
void initSharedConfiguration(boolean loadSharedConfigFromDir) throws IOException {
status.set(SharedConfigurationStatus.STARTED);
Region<String, Configuration> configRegion = getConfigurationRegion();
lockSharedConfiguration();
try {
removeInvalidXmlConfigurations(configRegion);
if (loadSharedConfigFromDir) {
logger.info("Reading cluster configuration from '{}' directory",
InternalConfigurationPersistenceService.CLUSTER_CONFIG_ARTIFACTS_DIR_NAME);
loadSharedConfigurationFromDir(configDirPath.toFile());
} else {
persistSecuritySettings(configRegion);
// for those groups that have jar files, need to download the jars from other locators
// if it doesn't exist yet
for (Entry<String, Configuration> stringConfigurationEntry : configRegion.entrySet()) {
Configuration config = stringConfigurationEntry.getValue();
for (String jar : config.getJarNames()) {
if (!getPathToJarOnThisLocator(stringConfigurationEntry.getKey(), jar).toFile()
.exists()) {
downloadJarFromOtherLocators(stringConfigurationEntry.getKey(), jar);
}
}
}
}
} finally {
unlockSharedConfiguration();
}
status.set(SharedConfigurationStatus.RUNNING);
}
void removeInvalidXmlConfigurations(Region<String, Configuration> configRegion)
throws IOException {
for (Map.Entry<String, Configuration> entry : configRegion.entrySet()) {
String group = entry.getKey();
Configuration configuration = entry.getValue();
String configurationXml = configuration.getCacheXmlContent();
if (configurationXml != null && !configurationXml.isEmpty()) {
try {
Document document = XmlUtils.createDocumentFromXml(configurationXml);
boolean removedInvalidReceivers = removeInvalidGatewayReceivers(document);
boolean removedDuplicateReceivers = removeDuplicateGatewayReceivers(document);
if (removedInvalidReceivers || removedDuplicateReceivers) {
configuration.setCacheXmlContent(XmlUtils.prettyXml(document));
configRegion.put(group, configuration);
}
} catch (SAXException | TransformerException | ParserConfigurationException e) {
throw new IOException("Unable to parse existing cluster configuration from disk. ", e);
}
}
}
}
boolean removeInvalidGatewayReceivers(Document document) throws TransformerException {
boolean modified = false;
NodeList receiverNodes = document.getElementsByTagName("gateway-receiver");
for (int i = receiverNodes.getLength() - 1; i >= 0; i--) {
Element receiverElement = (Element) receiverNodes.item(i);
// Check hostname-for-senders
String hostNameForSenders = receiverElement.getAttribute("hostname-for-senders");
if (StringUtils.isNotBlank(hostNameForSenders)) {
receiverElement.getParentNode().removeChild(receiverElement);
logger.info("Removed invalid cluster configuration gateway-receiver element="
+ XmlUtils.prettyXml(receiverElement));
modified = true;
}
// Check bind-address
String bindAddress = receiverElement.getAttribute("bind-address");
if (StringUtils.isNotBlank(bindAddress) && !bindAddress.equals("0.0.0.0")) {
receiverElement.getParentNode().removeChild(receiverElement);
logger.info("Removed invalid cluster configuration gateway-receiver element="
+ XmlUtils.prettyXml(receiverElement));
modified = true;
}
}
return modified;
}
boolean removeDuplicateGatewayReceivers(Document document) throws TransformerException {
boolean modified = false;
NodeList receiverNodes = document.getElementsByTagName("gateway-receiver");
while (receiverNodes.getLength() > 1) {
Element receiverElement = (Element) receiverNodes.item(0);
receiverElement.getParentNode().removeChild(receiverElement);
logger.info("Removed duplicate cluster configuration gateway-receiver element="
+ XmlUtils.prettyXml(receiverElement));
modified = true;
receiverNodes = document.getElementsByTagName("gateway-receiver");
}
return modified;
}
private void persistSecuritySettings(final Region<String, Configuration> configRegion) {
Properties securityProps = cache.getDistributedSystem().getSecurityProperties();
Configuration clusterPropertiesConfig =
configRegion.get(ConfigurationPersistenceService.CLUSTER_CONFIG);
if (clusterPropertiesConfig == null) {
clusterPropertiesConfig = new Configuration(ConfigurationPersistenceService.CLUSTER_CONFIG);
configRegion.put(ConfigurationPersistenceService.CLUSTER_CONFIG, clusterPropertiesConfig);
}
// put security-manager and security-post-processor in the cluster config
Properties clusterProperties = clusterPropertiesConfig.getGemfireProperties();
if (securityProps.containsKey(SECURITY_MANAGER)) {
clusterProperties.setProperty(SECURITY_MANAGER, securityProps.getProperty(SECURITY_MANAGER));
}
if (securityProps.containsKey(SECURITY_POST_PROCESSOR)) {
clusterProperties.setProperty(SECURITY_POST_PROCESSOR,
securityProps.getProperty(SECURITY_POST_PROCESSOR));
}
}
/**
* Creates a ConfigurationResponse based on the configRequest, configuration response contains the
* requested shared configuration This method locks the ConfigurationPersistenceService
*/
public ConfigurationResponse createConfigurationResponse(Set<String> groups) {
ConfigurationResponse configResponse = null;
boolean isLocked = lockSharedConfiguration();
try {
if (isLocked) {
configResponse = new ConfigurationResponse();
groups.add(ConfigurationPersistenceService.CLUSTER_CONFIG);
logger.info("Building up configuration response with following configurations: {}", groups);
for (String group : groups) {
Configuration configuration = getConfiguration(group);
configResponse.addConfiguration(configuration);
if (configuration != null) {
configResponse.addJar(group, configuration.getJarNames());
}
}
return configResponse;
}
} finally {
unlockSharedConfiguration();
}
return configResponse;
}
/**
* Create a response containing the status of the Shared configuration and information about other
* locators containing newer shared configuration data (if at all)
*
* @return {@link SharedConfigurationStatusResponse} containing the
* {@link SharedConfigurationStatus}
*/
SharedConfigurationStatusResponse createStatusResponse() {
SharedConfigurationStatusResponse response = new SharedConfigurationStatusResponse();
response.setStatus(getStatus());
response.addWaitingLocatorInfo(newerSharedConfigurationLocatorInfo);
return response;
}
/**
* For tests only. TODO: clean this up and remove from production code
* <p>
* Throws {@code AssertionError} wrapping any exception thrown by operation.
*/
public void destroySharedConfiguration() {
try {
Region<String, Configuration> configRegion = getConfigurationRegion();
if (configRegion != null) {
configRegion.destroyRegion();
}
DiskStore configDiskStore = cache.findDiskStore(CLUSTER_CONFIG_ARTIFACTS_DIR_NAME);
if (configDiskStore != null) {
configDiskStore.destroy();
}
FileUtils.deleteDirectory(configDirPath.toFile());
} catch (Exception exception) {
throw new AssertionError(exception);
}
}
public Path getPathToJarOnThisLocator(String groupName, String jarName) {
return configDirPath.resolve(groupName).resolve(jarName);
}
public Configuration getConfiguration(String groupName) {
return getConfigurationRegion().get(groupName);
}
public void setConfiguration(String groupName, Configuration configuration) {
getConfigurationRegion().put(groupName, configuration);
}
public boolean hasXmlConfiguration() {
Region<String, Configuration> configRegion = getConfigurationRegion();
return configRegion.values().stream().anyMatch(c -> c.getCacheXmlContent() != null);
}
public Map<String, Configuration> getEntireConfiguration() {
Set<String> keys = getConfigurationRegion().keySet();
return getConfigurationRegion().getAll(keys);
}
public Set<String> getGroups() {
return getConfigurationRegion().keySet();
}
public Path getClusterConfigDirPath() {
return configDirPath;
}
/**
* Gets the current status of the ConfigurationPersistenceService If the status is started , it
* determines if the shared configuration is waiting for new configuration on other locators
*
* @return {@link SharedConfigurationStatus}
*/
public SharedConfigurationStatus getStatus() {
SharedConfigurationStatus scStatus = status.get();
if (scStatus == SharedConfigurationStatus.STARTED) {
PersistentMemberManager pmm = cache.getPersistentMemberManager();
Map<String, Set<PersistentMemberID>> waitingRegions = pmm.getWaitingRegions();
if (!waitingRegions.isEmpty()) {
status.compareAndSet(SharedConfigurationStatus.STARTED,
SharedConfigurationStatus.WAITING);
Set<PersistentMemberID> persMemIds =
waitingRegions.get(Region.SEPARATOR_CHAR + CONFIG_REGION_NAME);
for (PersistentMemberID persMemId : persMemIds) {
newerSharedConfigurationLocatorInfo.add(new PersistentMemberPattern(persMemId));
}
}
}
return status.get();
}
// configDir is the dir that has all the groups structure underneath it.
public void loadSharedConfigurationFromDir(File configDir) throws IOException {
lockSharedConfiguration();
try {
File[] groupNames = configDir.listFiles((FileFilter) DirectoryFileFilter.INSTANCE);
boolean needToCopyJars = true;
if (configDir.getAbsolutePath().equals(configDirPath)) {
needToCopyJars = false;
}
logger.info("loading the cluster configuration: ");
Map<String, Configuration> sharedConfiguration = new HashMap<>();
for (File groupName : groupNames) {
Configuration configuration = readConfiguration(groupName);
logger.info(configuration.getConfigName() + " xml content: \n"
+ configuration.getCacheXmlContent());
logger.info(configuration.getConfigName() + " properties: "
+ configuration.getGemfireProperties().size());
logger.info(configuration.getConfigName() + " jars: "
+ Strings.join(configuration.getJarNames(), ", "));
sharedConfiguration.put(groupName.getName(), configuration);
if (needToCopyJars && configuration.getJarNames().size() > 0) {
Path groupDirPath = createConfigDirIfNecessary(configuration.getConfigName()).toPath();
for (String jarName : configuration.getJarNames()) {
Files.copy(groupName.toPath().resolve(jarName), groupDirPath.resolve(jarName));
}
}
}
Region<String, Configuration> clusterRegion = getConfigurationRegion();
clusterRegion.clear();
String memberId = cache.getMyId().getId();
clusterRegion.putAll(sharedConfiguration, memberId);
// Overwrite the security settings using the locator's properties, ignoring whatever
// in the import
persistSecuritySettings(clusterRegion);
} finally {
unlockSharedConfiguration();
}
}
// Write the content of xml and properties into the file system for exporting purpose
public void writeConfigToFile(final Configuration configuration, File rootDir)
throws IOException {
File configDir = createConfigDirIfNecessary(rootDir, configuration.getConfigName());
File propsFile = new File(configDir, configuration.getPropertiesFileName());
BufferedWriter bw = new BufferedWriter(new FileWriter(propsFile));
configuration.getGemfireProperties().store(bw, null);
bw.close();
File xmlFile = new File(configDir, configuration.getCacheXmlFileName());
FileUtils.writeStringToFile(xmlFile, configuration.getCacheXmlContent(), "UTF-8");
// copy the jars if the rootDir is different than the configDirPath
if (rootDir.getAbsolutePath().equals(configDirPath)) {
return;
}
File locatorConfigDir = configDirPath.resolve(configuration.getConfigName()).toFile();
if (locatorConfigDir.exists()) {
File[] jarFiles = locatorConfigDir.listFiles(x -> x.getName().endsWith(".jar"));
for (File file : jarFiles) {
Files.copy(file.toPath(), configDir.toPath().resolve(file.getName()));
}
}
}
public boolean lockSharedConfiguration() {
return sharedConfigLockingService.lock(SHARED_CONFIG_LOCK_NAME, -1, -1);
}
public void unlockSharedConfiguration() {
sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
}
/**
* Gets the region containing the shared configuration data. The region is created , if it does
* not exist already. Note : this could block if this locator contains stale persistent
* configuration data.
*
* @return {@link Region} ConfigurationRegion, this should never be null
*/
public Region<String, Configuration> getConfigurationRegion() {
Region<String, Configuration> configRegion = cache.getRegion(CONFIG_REGION_NAME);
try {
if (configRegion == null) {
File diskDir = configDiskDirPath.toFile();
if (!diskDir.exists() && !diskDir.mkdirs()) {
throw new IOException("Cannot create directory at " + configDiskDirPath);
}
File[] diskDirs = {diskDir};
cache.createDiskStoreFactory().setDiskDirs(diskDirs).setAutoCompact(true)
.setMaxOplogSize(10).create(CLUSTER_CONFIG_DISK_STORE_NAME);
AttributesFactory<String, Configuration> regionAttrsFactory = new AttributesFactory<>();
regionAttrsFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
regionAttrsFactory.setCacheListener(new ConfigurationChangeListener(this, cache));
regionAttrsFactory.setDiskStoreName(CLUSTER_CONFIG_DISK_STORE_NAME);
regionAttrsFactory.setScope(Scope.DISTRIBUTED_ACK);
InternalRegionArguments internalArgs = new InternalRegionArguments();
internalArgs.setIsUsedForMetaRegion(true);
internalArgs.setMetaRegionWithTransactions(false);
configRegion = cache.createVMRegion(CONFIG_REGION_NAME, regionAttrsFactory.create(),
internalArgs);
}
} catch (RuntimeException e) {
if (configRegion == null) {
status.set(SharedConfigurationStatus.STOPPED);
}
// throw RuntimeException as is
throw e;
} catch (Exception e) {
if (configRegion == null) {
status.set(SharedConfigurationStatus.STOPPED);
}
// turn all other exceptions into runtime exceptions
throw new RuntimeException("Error occurred while initializing cluster configuration", e);
}
return configRegion;
}
/**
* Reads the configuration information from the shared configuration directory and returns a
* {@link Configuration} object
*
* @return {@link Configuration}
*/
private Configuration readConfiguration(File groupConfigDir) throws IOException {
Configuration configuration = new Configuration(groupConfigDir.getName());
File cacheXmlFull = new File(groupConfigDir, configuration.getCacheXmlFileName());
File propertiesFull = new File(groupConfigDir, configuration.getPropertiesFileName());
configuration.setCacheXmlFile(cacheXmlFull);
configuration.setPropertiesFile(propertiesFull);
Set<String> jarFileNames = Arrays.stream(groupConfigDir.list())
.filter((String filename) -> filename.endsWith(".jar")).collect(Collectors.toSet());
configuration.addJarNames(jarFileNames);
return configuration;
}
/**
* Creates a directory for this configuration if it doesn't already exist.
*/
private File createConfigDirIfNecessary(final String configName) throws IOException {
return createConfigDirIfNecessary(configDirPath.toFile(), configName);
}
private File createConfigDirIfNecessary(File clusterConfigDir, final String configName)
throws IOException {
if (!clusterConfigDir.exists() && !clusterConfigDir.mkdirs()) {
throw new IOException("Cannot create directory : " + configDirPath);
}
Path configDirPath = clusterConfigDir.toPath().resolve(configName);
File configDir = configDirPath.toFile();
if (!configDir.exists() && !configDir.mkdir()) {
throw new IOException("Cannot create directory : " + configDirPath);
}
return configDir;
}
private String generateInitialXmlContent() {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
CacheXmlGenerator.generateDefault(pw);
return sw.toString();
}
@Override
public CacheConfig getCacheConfig(String group) {
return getCacheConfig(group, false);
}
@Override
public CacheConfig getCacheConfig(String group, boolean createNew) {
if (group == null) {
group = CLUSTER_CONFIG;
}
Configuration configuration = getConfiguration(group);
if (configuration == null) {
if (createNew) {
return new CacheConfig(CACHE_CONFIG_VERSION);
}
return null;
}
String xmlContent = configuration.getCacheXmlContent();
// group existed, so we should create a blank one to start with
if (xmlContent == null || xmlContent.isEmpty()) {
if (createNew) {
return new CacheConfig(CACHE_CONFIG_VERSION);
}
return null;
}
return jaxbService.unMarshall(xmlContent);
}
@Override
public void updateCacheConfig(String group, UnaryOperator<CacheConfig> mutator) {
if (group == null) {
group = CLUSTER_CONFIG;
}
lockSharedConfiguration();
try {
CacheConfig cacheConfig = getCacheConfig(group, true);
cacheConfig = mutator.apply(cacheConfig);
if (cacheConfig == null) {
// mutator returns a null config, indicating no change needs to be persisted
return;
}
Configuration configuration = getConfiguration(group);
if (configuration == null) {
configuration = new Configuration(group);
}
configuration.setCacheXmlContent(jaxbService.marshall(cacheConfig));
getConfigurationRegion().put(group, configuration);
} finally {
unlockSharedConfiguration();
}
}
}