| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal; |
| |
| import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; |
| import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS; |
| import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX; |
| import static org.apache.geode.internal.admin.remote.DistributionLocatorId.asDistributionLocatorIds; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.net.InetAddress; |
| import java.net.URI; |
| import java.net.UnknownHostException; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest; |
| import org.apache.geode.cache.client.internal.locator.ClientReplacementRequest; |
| import org.apache.geode.cache.client.internal.locator.GetAllServersRequest; |
| import org.apache.geode.cache.client.internal.locator.LocatorListRequest; |
| import org.apache.geode.cache.client.internal.locator.LocatorStatusRequest; |
| import org.apache.geode.cache.client.internal.locator.QueueConnectionRequest; |
| import org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListener; |
| import org.apache.geode.cache.internal.HttpService; |
| import org.apache.geode.distributed.ConfigurationProperties; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.Locator; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem.ConnectListener; |
| import org.apache.geode.distributed.internal.membership.NetLocator; |
| import org.apache.geode.distributed.internal.membership.NetLocatorFactory; |
| import org.apache.geode.distributed.internal.membership.QuorumChecker; |
| import org.apache.geode.distributed.internal.membership.adapter.GMSMembershipManager; |
| import org.apache.geode.distributed.internal.membership.gms.locator.PeerLocatorRequest; |
| import org.apache.geode.distributed.internal.tcpserver.InfoRequest; |
| import org.apache.geode.distributed.internal.tcpserver.TcpClient; |
| import org.apache.geode.distributed.internal.tcpserver.TcpServer; |
| import org.apache.geode.internal.GemFireVersion; |
| import org.apache.geode.internal.admin.remote.DistributionLocatorId; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalCacheBuilder; |
| import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory; |
| import org.apache.geode.internal.cache.wan.WANServiceProvider; |
| import org.apache.geode.internal.config.JAXBService; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LogWriterFactory; |
| import org.apache.geode.internal.net.SocketCreator; |
| import org.apache.geode.internal.net.SocketCreatorFactory; |
| import org.apache.geode.internal.statistics.StatisticsConfig; |
| import org.apache.geode.logging.internal.LoggingSession; |
| import org.apache.geode.logging.internal.NullLoggingSession; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.logging.internal.spi.LogConfig; |
| import org.apache.geode.logging.internal.spi.LogConfigListener; |
| import org.apache.geode.logging.internal.spi.LogConfigSupplier; |
| import org.apache.geode.management.api.ClusterManagementService; |
| import org.apache.geode.management.internal.AgentUtil; |
| import org.apache.geode.management.internal.JmxManagerLocator; |
| import org.apache.geode.management.internal.JmxManagerLocatorRequest; |
| import org.apache.geode.management.internal.api.LocatorClusterManagementService; |
| import org.apache.geode.management.internal.configuration.domain.SharedConfigurationStatus; |
| import org.apache.geode.management.internal.configuration.handlers.ClusterManagementServiceInfoRequestHandler; |
| import org.apache.geode.management.internal.configuration.handlers.SharedConfigurationStatusRequestHandler; |
| import org.apache.geode.management.internal.configuration.messages.ClusterManagementServiceInfoRequest; |
| import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusRequest; |
| import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse; |
| import org.apache.geode.security.AuthTokenEnabledComponents; |
| |
| /** |
| * Provides the implementation of a distribution {@code Locator} as well as internal-only |
| * functionality. |
| * |
| * <p> |
| * This class has APIs that perform essentially three layers of services. At the bottom layer is the |
| * JGroups location service. On top of that you can start a distributed system. And then on top of |
| * that you can start server location services. |
| * |
| * <p> |
| * Server Location Service DistributedSystem Peer Location Service |
| * |
| * <p> |
| * The startLocator() methods provide a way to start all three services in one call. Otherwise, the |
| * services can be started independently {@code locator = createLocator(); |
| * locator.startPeerLocation(); locator.startDistributeSystem();} |
| * |
| * @since GemFire 4.0 |
| */ |
| public class InternalLocator extends Locator implements ConnectListener, LogConfigSupplier { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * system property name for forcing an locator distribution manager type |
| */ |
| public static final String FORCE_LOCATOR_DM_TYPE = "Locator.forceLocatorDMType"; |
| |
| /** |
| * system property name for inhibiting DM banner |
| */ |
| public static final String INHIBIT_DM_BANNER = "Locator.inhibitDMBanner"; |
| |
| /** |
| * system property name for forcing locators to be preferred as coordinators |
| */ |
| public static final String LOCATORS_PREFERRED_AS_COORDINATORS = |
| GEMFIRE_PREFIX + "disable-floating-coordinator"; |
| |
| /** |
| * the locator hosted by this JVM. As of 7.0 it is a singleton. |
| * |
| * GuardedBy must synchronize on locatorLock |
| */ |
| @MakeNotStatic |
| private static InternalLocator locator; |
| |
| private static final Object locatorLock = new Object(); |
| |
| /** |
| * The tcp server responding to locator requests |
| */ |
| private final TcpServer server; |
| |
| /** |
| * @since GemFire 5.7 |
| */ |
| private final PrimaryHandler handler; |
| private final LocatorMembershipListener locatorListener; |
| private final AtomicBoolean shutdownHandled = new AtomicBoolean(false); |
| private final LoggingSession loggingSession; |
| private final Set<LogConfigListener> logConfigListeners = new HashSet<>(); |
| private final LocatorStats locatorStats; |
| private final Path workingDirectory; |
| |
| /** |
| * whether the locator was stopped during forced-disconnect processing but a reconnect will occur |
| */ |
| private volatile boolean stoppedForReconnect; |
| private volatile boolean reconnected; |
| |
| /** |
| * whether the locator was stopped during forced-disconnect processing |
| */ |
| private volatile boolean forcedDisconnect; |
| private volatile boolean isSharedConfigurationStarted; |
| private volatile Thread restartThread; |
| |
| /** |
| * The distributed system owned by this locator, if any. Note that if a ds already exists because |
| * the locator is being colocated in a normal member this field will be null. |
| */ |
| private InternalDistributedSystem internalDistributedSystem; |
| |
| /** |
| * The cache owned by this locator, if any. Note that if a cache already exists because the |
| * locator is being colocated in a normal member this field will be null. |
| */ |
| private InternalCache internalCache; |
| |
| /** |
| * product use logging |
| */ |
| private ProductUseLog productUseLog; |
| private boolean peerLocator; |
| private ServerLocator serverLocator; |
| private Properties env; |
| |
| /** |
| * the TcpHandler used for peer location |
| */ |
| private NetLocator netLocator; |
| private DistributionConfigImpl distributionConfig; |
| private WanLocatorDiscoverer locatorDiscoverer; |
| private InternalConfigurationPersistenceService configurationPersistenceService; |
| private LocatorClusterManagementService clusterManagementService; |
| |
| public static InternalLocator getLocator() { |
| synchronized (locatorLock) { |
| return locator; |
| } |
| } |
| |
| public static boolean hasLocator() { |
| synchronized (locatorLock) { |
| return locator != null; |
| } |
| } |
| |
| private static void removeLocator(InternalLocator locator) { |
| if (locator == null) { |
| return; |
| } |
| synchronized (locatorLock) { |
| locator.loggingSession.stopSession(); |
| locator.loggingSession.shutdown(); |
| if (locator.equals(InternalLocator.locator)) { |
| InternalLocator.locator = null; |
| } |
| } |
| } |
| |
| /** |
| * Create a locator that listens on a given port. This locator will not have peer or server |
| * location services available until they are started by calling startServerLocation or |
| * startPeerLocation on the locator object. |
| * |
| * @param port the tcp/ip port to listen on |
| * @param loggingSession the LoggingSession to use, may be a NullLoggingSession which does |
| * nothing |
| * @param logFile the file that log messages should be written to |
| * @param logWriter a log writer that should be used (logFile parameter is ignored) |
| * @param securityLogWriter the logWriter to be used for security related log messages |
| * @param distributedSystemProperties optional properties to configure the distributed system |
| * (e.g., mcast addr/port, other locators) |
| * @param startDistributedSystem if true then this locator will also start its own ds |
| * |
| * @deprecated Please use |
| * {@link #createLocator(int, LoggingSession, File, InternalLogWriter, InternalLogWriter, InetAddress, String, Properties, Path)} |
| * instead. |
| */ |
| @Deprecated |
| public static InternalLocator createLocator(int port, LoggingSession loggingSession, File logFile, |
| InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress, |
| String hostnameForClients, Properties distributedSystemProperties, |
| boolean startDistributedSystem) { |
| return createLocator(port, loggingSession, logFile, logWriter, securityLogWriter, bindAddress, |
| hostnameForClients, distributedSystemProperties, |
| Paths.get(System.getProperty("user.dir"))); |
| } |
| |
| /** |
| * Create a locator that listens on a given port. This locator will not have peer or server |
| * location services available until they are started by calling startServerLocation or |
| * startPeerLocation on the locator object. |
| * |
| * @param port the tcp/ip port to listen on |
| * @param loggingSession the LoggingSession to use, may be a NullLoggingSession which does |
| * nothing |
| * @param logFile the file that log messages should be written to |
| * @param logWriter a log writer that should be used (logFile parameter is ignored) |
| * @param securityLogWriter the logWriter to be used for security related log messages |
| * @param distributedSystemProperties optional properties to configure the distributed system |
| * (e.g., mcast addr/port, other locators) |
| * @param workingDirectory the working directory to use for any files |
| */ |
| public static InternalLocator createLocator(int port, LoggingSession loggingSession, File logFile, |
| InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress, |
| String hostnameForClients, Properties distributedSystemProperties, Path workingDirectory) { |
| synchronized (locatorLock) { |
| if (hasLocator()) { |
| throw new IllegalStateException( |
| "A locator can not be created because one already exists in this JVM."); |
| } |
| InternalLocator locator = |
| new InternalLocator(port, loggingSession, logFile, logWriter, securityLogWriter, |
| bindAddress, hostnameForClients, distributedSystemProperties, null, |
| workingDirectory); |
| InternalLocator.locator = locator; |
| return locator; |
| } |
| } |
| |
| private static void setLocator(InternalLocator locator) { |
| synchronized (locatorLock) { |
| if (InternalLocator.locator != null && InternalLocator.locator != locator) { |
| throw new IllegalStateException( |
| "A locator can not be created because one already exists in this JVM."); |
| } |
| |
| InternalLocator.locator = locator; |
| } |
| } |
| |
| /** |
| * Creates a distribution locator that runs in this VM on the given port and bind address in the |
| * default working directory (Java "user.dir" System Property). |
| * |
| * <p> |
| * This is for internal use only as it does not create a distributed system unless told to do so. |
| * |
| * @param port the tcp/ip port to listen on |
| * @param logFile the file that log messages should be written to |
| * @param logWriter a log writer that should be used (logFile parameter is ignored) |
| * @param securityLogWriter the logWriter to be used for security related log messages |
| * @param startDistributedSystem if true, a distributed system is started |
| * @param distributedSystemProperties optional properties to configure the distributed system |
| * (e.g., mcast |
| * addr/port, other locators) |
| * @param hostnameForClients the name to give to clients for connecting to this locator |
| */ |
| public static InternalLocator startLocator(int port, File logFile, InternalLogWriter logWriter, |
| InternalLogWriter securityLogWriter, InetAddress bindAddress, boolean startDistributedSystem, |
| Properties distributedSystemProperties, String hostnameForClients) |
| throws IOException { |
| return startLocator(port, logFile, logWriter, securityLogWriter, bindAddress, |
| startDistributedSystem, distributedSystemProperties, hostnameForClients, |
| Paths.get(System.getProperty("user.dir"))); |
| } |
| |
| /** |
| * Creates a distribution locator that runs in this VM on the given port and bind address in the |
| * specified working directory. |
| * |
| * <p> |
| * This is for internal use only as it does not create a distributed system unless told to do so. |
| * |
| * @param port the tcp/ip port to listen on |
| * @param logFile the file that log messages should be written to |
| * @param logWriter a log writer that should be used (logFile parameter is ignored) |
| * @param securityLogWriter the logWriter to be used for security related log messages |
| * @param startDistributedSystem if true, a distributed system is started |
| * @param distributedSystemProperties optional properties to configure the distributed system |
| * (e.g., mcast |
| * addr/port, other locators) |
| * @param hostnameForClients the name to give to clients for connecting to this locator |
| * @param workingDirectory the working directory to use for any files |
| */ |
| public static InternalLocator startLocator(int port, File logFile, InternalLogWriter logWriter, |
| InternalLogWriter securityLogWriter, InetAddress bindAddress, boolean startDistributedSystem, |
| Properties distributedSystemProperties, String hostnameForClients, Path workingDirectory) |
| throws IOException { |
| System.setProperty(FORCE_LOCATOR_DM_TYPE, "true"); |
| InternalLocator newLocator = null; |
| |
| boolean startedLocator = false; |
| try { |
| |
| // if startDistributedSystem is true then Locator uses a NullLoggingSession (does nothing) |
| LoggingSession loggingSession = |
| startDistributedSystem ? NullLoggingSession.create() : LoggingSession.create(); |
| |
| newLocator = createLocator(port, loggingSession, logFile, logWriter, securityLogWriter, |
| bindAddress, hostnameForClients, distributedSystemProperties, |
| workingDirectory); |
| |
| loggingSession.createSession(newLocator); |
| loggingSession.startSession(); |
| |
| try { |
| newLocator.startPeerLocation(); |
| |
| if (startDistributedSystem) { |
| |
| try { |
| newLocator.startDistributedSystem(); |
| } catch (RuntimeException e) { |
| newLocator.stop(); |
| throw e; |
| } |
| |
| InternalDistributedSystem system = newLocator.internalDistributedSystem; |
| if (system != null) { |
| system.getDistributionManager().addHostedLocators(system.getDistributedMember(), |
| getLocatorStrings(), newLocator.isSharedConfigurationEnabled()); |
| } |
| } |
| } catch (IllegalStateException e) { |
| newLocator.stop(); |
| throw e; |
| } |
| |
| InternalDistributedSystem system = InternalDistributedSystem.getConnectedInstance(); |
| if (system != null) { |
| try { |
| newLocator.startServerLocation(system); |
| } catch (RuntimeException e) { |
| newLocator.stop(); |
| throw e; |
| } |
| } |
| |
| newLocator.endStartLocator(null); |
| startedLocator = true; |
| return newLocator; |
| |
| } finally { |
| System.clearProperty(FORCE_LOCATOR_DM_TYPE); |
| |
| if (!startedLocator) { |
| removeLocator(newLocator); |
| } |
| } |
| } |
| |
| /** |
| * Determines if this VM is a locator which must ignore a shutdown. |
| * |
| * @return true if this VM is a locator which should ignore a shutdown, false if it is a normal |
| * member. |
| */ |
| public static boolean isDedicatedLocator() { |
| InternalLocator internalLocator = getLocator(); |
| if (internalLocator == null) { |
| return false; |
| } |
| |
| InternalDistributedSystem system = internalLocator.internalDistributedSystem; |
| if (system == null) { |
| return false; |
| } |
| DistributionManager distributionManager = system.getDistributionManager(); |
| if (distributionManager.isLoner()) { |
| return false; |
| } |
| ClusterDistributionManager clusterDistributionManager = |
| (ClusterDistributionManager) system.getDistributionManager(); |
| return clusterDistributionManager.getDMType() == ClusterDistributionManager.LOCATOR_DM_TYPE; |
| } |
| |
| /** |
| * Creates a new {@code Locator} with the given port, log file, logWriter, and bind address. |
| * |
| * @param port the tcp/ip port to listen on |
| * @param logFile the file that log messages should be written to |
| * @param logWriter a log writer that should be used (logFile parameter is ignored) |
| * @param securityLogWriter the log writer to be used for security related log messages |
| * @param hostnameForClients the name to give to clients for connecting to this locator |
| * @param distributedSystemProperties optional properties to configure the distributed system |
| * (e.g., mcast addr/port, other locators) |
| * @param distributionConfig the config if being called from a distributed system; otherwise null. |
| * @param workingDirectory the working directory to use for files |
| */ |
| @VisibleForTesting |
| InternalLocator(int port, LoggingSession loggingSession, File logFile, |
| InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress, |
| String hostnameForClients, Properties distributedSystemProperties, |
| DistributionConfigImpl distributionConfig, Path workingDirectory) { |
| this.logFile = logFile; |
| this.bindAddress = bindAddress; |
| this.hostnameForClients = hostnameForClients; |
| |
| this.workingDirectory = workingDirectory; |
| |
| this.distributionConfig = distributionConfig; |
| |
| env = new Properties(); |
| |
| // set bind-address explicitly only if not wildcard and let any explicit |
| // value in distributedSystemProperties take precedence |
| if (bindAddress != null && !bindAddress.isAnyLocalAddress()) { |
| env.setProperty(BIND_ADDRESS, bindAddress.getHostAddress()); |
| } |
| |
| if (distributedSystemProperties != null) { |
| env.putAll(distributedSystemProperties); |
| } |
| env.setProperty(CACHE_XML_FILE, ""); |
| |
| // create a DC so that all of the lookup rules, gemfire.properties, etc, |
| // are considered and we have a config object we can trust |
| if (this.distributionConfig == null) { |
| this.distributionConfig = new DistributionConfigImpl(env); |
| env.clear(); |
| env.putAll(this.distributionConfig.getProps()); |
| } |
| |
| boolean hasLogFileButConfigDoesNot = |
| this.logFile != null && this.distributionConfig.getLogFile() |
| .toString().equals(DistributionConfig.DEFAULT_LOG_FILE.toString()); |
| if (logWriter == null && hasLogFileButConfigDoesNot) { |
| // LOG: this is(was) a hack for when logFile and config don't match -- if config specifies a |
| // different log-file things will break! |
| this.distributionConfig.unsafeSetLogFile(this.logFile); |
| } |
| |
| if (loggingSession == null) { |
| throw new Error("LoggingSession must not be null"); |
| } |
| this.loggingSession = loggingSession; |
| |
| // LOG: create LogWriters for GemFireTracer (or use whatever was passed in) |
| if (logWriter == null) { |
| LogWriterFactory.createLogWriterLogger(this.distributionConfig, false); |
| if (logger.isDebugEnabled()) { |
| logger.debug("LogWriter for locator is created."); |
| } |
| } |
| |
| if (securityLogWriter == null) { |
| securityLogWriter = LogWriterFactory.createLogWriterLogger(this.distributionConfig, true); |
| securityLogWriter.fine("SecurityLogWriter for locator is created."); |
| } |
| |
| SocketCreatorFactory.setDistributionConfig(this.distributionConfig); |
| |
| locatorListener = WANServiceProvider.createLocatorMembershipListener(); |
| if (locatorListener != null) { |
| // We defer setting the port until the handler is init'd - that way we'll have an actual port |
| // in the case where we're starting with port = 0. |
| locatorListener.setConfig(getConfig()); |
| } |
| handler = new PrimaryHandler(this, locatorListener); |
| handler.addHandler(InfoRequest.class, new InfoRequestHandler()); |
| |
| locatorStats = new LocatorStats(); |
| |
| server = new TcpServerFactory().makeTcpServer(port, this.bindAddress, null, |
| this.distributionConfig, handler, new DelayedPoolStatHelper(), toString(), this); |
| } |
| |
| public boolean isSharedConfigurationEnabled() { |
| return distributionConfig.getEnableClusterConfiguration(); |
| } |
| |
| private boolean loadFromSharedConfigDir() { |
| return distributionConfig.getLoadClusterConfigFromDir(); |
| } |
| |
| public boolean isSharedConfigurationRunning() { |
| return configurationPersistenceService != null |
| && configurationPersistenceService.getStatus() == SharedConfigurationStatus.RUNNING; |
| } |
| |
| public LocatorMembershipListener getLocatorMembershipListener() { |
| return locatorListener; |
| } |
| |
| /** |
| * @deprecated Please use {@link #getLocatorMembershipListener()} instead. |
| */ |
| @Deprecated |
| public LocatorMembershipListener getlocatorMembershipListener() { |
| return getLocatorMembershipListener(); |
| } |
| |
| private void startTcpServer() throws IOException { |
| logger.info("Starting {}", this); |
| server.start(); |
| } |
| |
| public InternalConfigurationPersistenceService getConfigurationPersistenceService() { |
| return configurationPersistenceService; |
| } |
| |
| public DistributionConfigImpl getConfig() { |
| return distributionConfig; |
| } |
| |
| public InternalCache getCache() { |
| if (internalCache == null) { |
| return GemFireCacheImpl.getInstance(); |
| } |
| return internalCache; |
| } |
| |
| /** |
| * Start peer location in this locator. If you plan on starting a distributed system later, this |
| * method should be called first so that the distributed system can use this locator. |
| * |
| * @return returns the port that the locator to which the locator is bound |
| * @since GemFire 5.7 |
| */ |
| int startPeerLocation() throws IOException { |
| if (isPeerLocator()) { |
| throw new IllegalStateException( |
| String.format("Peer location is already running for %s", this)); |
| } |
| logger.info("Starting peer location for {}", this); |
| |
| // check for settings that would require only locators to hold the |
| // coordinator - e.g., security and network-partition detection |
| boolean locatorsAreCoordinators; |
| boolean networkPartitionDetectionEnabled = |
| distributionConfig.getEnableNetworkPartitionDetection(); |
| String securityUDPDHAlgo = distributionConfig.getSecurityUDPDHAlgo(); |
| if (networkPartitionDetectionEnabled) { |
| locatorsAreCoordinators = true; |
| } else { |
| // check if security is enabled |
| String prop = distributionConfig.getSecurityPeerAuthInit(); |
| locatorsAreCoordinators = prop != null && !prop.isEmpty(); |
| if (!locatorsAreCoordinators) { |
| locatorsAreCoordinators = Boolean.getBoolean(LOCATORS_PREFERRED_AS_COORDINATORS); |
| } |
| } |
| |
| final String locatorsConfigValue = distributionConfig.getLocators(); |
| |
| netLocator = NetLocatorFactory.newLocatorHandler(bindAddress, locatorsConfigValue, |
| locatorsAreCoordinators, networkPartitionDetectionEnabled, locatorStats, securityUDPDHAlgo, |
| workingDirectory); |
| handler.addHandler(PeerLocatorRequest.class, netLocator); |
| peerLocator = true; |
| if (!server.isAlive()) { |
| startTcpServer(); |
| } |
| int boundPort = server.getPort(); |
| File productUseFile = workingDirectory.resolve("locator" + boundPort + "views.log").toFile(); |
| productUseLog = new ProductUseLog(productUseFile); |
| |
| return boundPort; |
| } |
| |
| /** |
| * @return the TcpHandler for peer to peer discovery |
| */ |
| public NetLocator getLocatorHandler() { |
| return netLocator; |
| } |
| |
| /** |
| * For backward-compatibility we retain this method |
| * |
| * @deprecated use a form of the method that does not have peerLocator/serverLocator parameters |
| */ |
| @Deprecated |
| public static InternalLocator startLocator(int locatorPort, File logFile, |
| InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress, |
| Properties distributedSystemProperties, boolean peerLocator, boolean serverLocator, |
| String hostnameForClients, boolean b1) throws IOException { |
| return startLocator(locatorPort, logFile, logWriter, securityLogWriter, bindAddress, true, |
| distributedSystemProperties, hostnameForClients); |
| } |
| |
| /** |
| * Start a distributed system whose life cycle is managed by this locator. When the locator is |
| * stopped, this distributed system will be disconnected. If a distributed system already exists, |
| * this method will have no affect. |
| * |
| * @since GemFire 5.7 |
| */ |
| private void startDistributedSystem() throws IOException { |
| InternalDistributedSystem existing = InternalDistributedSystem.getConnectedInstance(); |
| if (existing != null) { |
| // LOG: changed from config to info |
| logger.info("Using existing distributed system: {}", existing); |
| startCache(existing); |
| } else { |
| |
| StringBuilder sb = new StringBuilder(100); |
| if (bindAddress != null) { |
| sb.append(bindAddress.getHostAddress()); |
| } else { |
| sb.append(SocketCreator.getLocalHost().getHostAddress()); |
| } |
| sb.append('[').append(getPort()).append(']'); |
| String thisLocator = sb.toString(); |
| |
| if (peerLocator) { |
| // append this locator to the locators list from the config properties |
| boolean setLocatorsProp = false; |
| String locatorsConfigValue = distributionConfig.getLocators(); |
| if (StringUtils.isNotBlank(locatorsConfigValue)) { |
| if (!locatorsConfigValue.contains(thisLocator)) { |
| locatorsConfigValue = locatorsConfigValue + ',' + thisLocator; |
| setLocatorsProp = true; |
| } |
| } else { |
| locatorsConfigValue = thisLocator; |
| setLocatorsProp = true; |
| } |
| if (setLocatorsProp) { |
| Properties updateEnv = new Properties(); |
| updateEnv.setProperty(LOCATORS, locatorsConfigValue); |
| distributionConfig.setApiProps(updateEnv); |
| String locatorsPropertyName = GEMFIRE_PREFIX + LOCATORS; |
| if (System.getProperty(locatorsPropertyName) != null) { |
| System.setProperty(locatorsPropertyName, locatorsConfigValue); |
| } |
| } |
| // No longer default mcast-port to zero. |
| } |
| |
| Properties distributedSystemProperties = new Properties(); |
| // LogWriterAppender is now shared via that class |
| // using a DistributionConfig earlier in this method |
| distributedSystemProperties.put(DistributionConfig.DS_CONFIG_NAME, distributionConfig); |
| |
| logger.info("Starting distributed system"); |
| |
| internalDistributedSystem = |
| (InternalDistributedSystem) DistributedSystem.connect(distributedSystemProperties); |
| |
| if (peerLocator) { |
| netLocator.setServices( |
| ((GMSMembershipManager) internalDistributedSystem.getDM().getMembershipManager()) |
| .getServices()); |
| } |
| |
| internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false)); |
| |
| startCache(internalDistributedSystem); |
| |
| logger.info("Locator started on {}", thisLocator); |
| |
| internalDistributedSystem.setDependentLocator(this); |
| } |
| } |
| |
| private void startCache(DistributedSystem system) throws IOException { |
| InternalCache internalCache = GemFireCacheImpl.getInstance(); |
| if (internalCache == null) { |
| logger.info("Creating cache for locator."); |
| this.internalCache = new InternalCacheBuilder(system.getProperties()) |
| .create((InternalDistributedSystem) system); |
| internalCache = this.internalCache; |
| } else { |
| logger.info("Using existing cache for locator."); |
| ((InternalDistributedSystem) system).handleResourceEvent(ResourceEvent.LOCATOR_START, this); |
| } |
| startJmxManagerLocationService(internalCache); |
| |
| startClusterManagementService(); |
| } |
| |
| private void startClusterManagementService() throws IOException { |
| startConfigurationPersistenceService(); |
| |
| if (internalCache == null) { |
| return; |
| } |
| |
| clusterManagementService = new LocatorClusterManagementService(locator.internalCache, |
| locator.configurationPersistenceService); |
| |
| // start management rest service |
| AgentUtil agentUtil = new AgentUtil(GemFireVersion.getGemFireVersion()); |
| |
| // Find the V2 Management rest WAR file |
| URI gemfireManagementWar = agentUtil.findWarLocation("geode-web-management"); |
| if (gemfireManagementWar == null) { |
| logger.info( |
| "Unable to find GemFire V2 Management REST API WAR file; the Management REST Interface for Geode will not be accessible."); |
| return; |
| } |
| |
| Map<String, Object> serviceAttributes = new HashMap<>(); |
| serviceAttributes.put(HttpService.SECURITY_SERVICE_SERVLET_CONTEXT_PARAM, |
| internalCache.getSecurityService()); |
| serviceAttributes.put(HttpService.CLUSTER_MANAGEMENT_SERVICE_CONTEXT_PARAM, |
| clusterManagementService); |
| |
| String[] authEnabledComponents = distributionConfig.getSecurityAuthTokenEnabledComponents(); |
| |
| boolean managementAuthTokenEnabled = Arrays.stream(authEnabledComponents) |
| .anyMatch(AuthTokenEnabledComponents::hasManagement); |
| serviceAttributes.put(HttpService.AUTH_TOKEN_ENABLED_PARAM, managementAuthTokenEnabled); |
| |
| if (distributionConfig.getEnableManagementRestService()) { |
| internalCache.getOptionalService(HttpService.class).ifPresent(x -> { |
| try { |
| logger.info("Geode Property {}=true Geode Management Rest Service is enabled.", |
| ConfigurationProperties.ENABLE_MANAGEMENT_REST_SERVICE); |
| x.addWebApplication("/management", Paths.get(gemfireManagementWar), serviceAttributes); |
| } catch (Throwable e) { |
| logger.warn("Unable to start management service: {}", e.getMessage()); |
| } |
| }); |
| } else { |
| logger.info("Geode Property {}=false Geode Management Rest Service is disabled.", |
| ConfigurationProperties.ENABLE_MANAGEMENT_REST_SERVICE); |
| } |
| } |
| |
| /** |
| * End the initialization of the locator. This method should be called once the location services |
| * and distributed system are started. |
| * |
| * @param distributedSystem The distributed system to use for the statistics. |
| * @since GemFire 5.7 |
| */ |
| void endStartLocator(InternalDistributedSystem distributedSystem) { |
| env = null; |
| |
| if (distributedSystem == null) { |
| distributedSystem = InternalDistributedSystem.getConnectedInstance(); |
| } |
| |
| if (distributedSystem != null) { |
| onConnect(distributedSystem); |
| } else { |
| InternalDistributedSystem.addConnectListener(this); |
| } |
| |
| locatorDiscoverer = WANServiceProvider.createLocatorDiscoverer(); |
| if (locatorDiscoverer != null) { |
| locatorDiscoverer.discover(getPort(), distributionConfig, locatorListener, |
| hostnameForClients); |
| } |
| } |
| |
| /** |
| * Start server location services in this locator. Server location can only be started once there |
| * is a running distributed system. |
| * |
| * @param distributedSystem The distributed system which the server location services should use. |
| * If null, the method will try to find an already connected distributed system. |
| * @since GemFire 5.7 |
| */ |
| void startServerLocation(InternalDistributedSystem distributedSystem) throws IOException { |
| if (isServerLocator()) { |
| throw new IllegalStateException( |
| String.format("Server location is already running for %s", this)); |
| } |
| logger.info("Starting server location for {}", this); |
| |
| if (distributedSystem == null) { |
| distributedSystem = InternalDistributedSystem.getConnectedInstance(); |
| if (distributedSystem == null) { |
| throw new IllegalStateException( |
| "Since server location is enabled the distributed system must be connected."); |
| } |
| } |
| |
| ServerLocator serverLocator = new ServerLocator(getPort(), bindAddress, hostnameForClients, |
| logFile, productUseLog, getConfig().getName(), distributedSystem, locatorStats); |
| handler.addHandler(LocatorListRequest.class, serverLocator); |
| handler.addHandler(ClientConnectionRequest.class, serverLocator); |
| handler.addHandler(QueueConnectionRequest.class, serverLocator); |
| handler.addHandler(ClientReplacementRequest.class, serverLocator); |
| handler.addHandler(GetAllServersRequest.class, serverLocator); |
| handler.addHandler(LocatorStatusRequest.class, serverLocator); |
| |
| this.serverLocator = serverLocator; |
| if (!server.isAlive()) { |
| startTcpServer(); |
| } |
| // the product use is not guaranteed to be initialized until the server is started, so |
| // the last thing we do is tell it to start logging |
| productUseLog.monitorUse(distributedSystem); |
| } |
| |
| /** |
| * Stop this locator. |
| */ |
| @Override |
| public void stop() { |
| stop(false, false, true); |
| } |
| |
| /** |
| * Stop this locator |
| * |
| * @param stopForReconnect - stopping for distributed system reconnect |
| * @param waitForDisconnect - wait up to 60 seconds for the locator to completely stop |
| */ |
| public void stop(boolean forcedDisconnect, boolean stopForReconnect, boolean waitForDisconnect) { |
| boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| stoppedForReconnect = stopForReconnect; |
| this.forcedDisconnect = forcedDisconnect; |
| |
| if (server.isShuttingDown()) { |
| // fix for bug 46156 |
| // If we are already shutting down don't do all of this again. |
| // But, give the server a bit of time to shut down so a new |
| // locator can be created, if desired, when this method returns |
| if (!stopForReconnect && waitForDisconnect) { |
| long endOfWait = System.currentTimeMillis() + 60000; |
| if (isDebugEnabled && server.isAlive()) { |
| logger.debug("sleeping to wait for the locator server to shut down..."); |
| } |
| |
| while (server.isAlive() && System.currentTimeMillis() < endOfWait) { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| |
| if (isDebugEnabled) { |
| if (server.isAlive()) { |
| logger.debug( |
| "60 seconds have elapsed waiting for the locator server to shut down - terminating wait and returning"); |
| } else { |
| logger.debug("the locator server has shut down"); |
| } |
| } |
| } |
| return; |
| } |
| |
| if (locatorDiscoverer != null) { |
| locatorDiscoverer.stop(); |
| locatorDiscoverer = null; |
| } |
| |
| if (server.isAlive()) { |
| logger.info("Stopping {}", this); |
| try { |
| new TcpClient().stop(bindAddress, getPort()); |
| } catch (ConnectException ignore) { |
| // must not be running |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| // TcpServer up to SHUTDOWN_WAIT_TIME for its executor pool to shut down. |
| // We wait 2 * SHUTDOWN_WAIT_TIME here to account for that shutdown, and then our own. |
| server.join(TcpServer.SHUTDOWN_WAIT_TIME * 2); |
| |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| logger.warn("Interrupted while stopping {}", this, ex); |
| |
| // Continue running -- doing our best to stop everything... |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (server.isAlive()) { |
| logger.fatal("Could not stop {} in 60 seconds", this); |
| } |
| } |
| |
| removeLocator(this); |
| |
| handleShutdown(); |
| logger.info("{} is stopped", this); |
| |
| if (stoppedForReconnect) { |
| if (internalDistributedSystem != null) { |
| launchRestartThread(); |
| } |
| } |
| } |
| |
| /** |
| * answers whether this locator is currently stopped |
| */ |
| public boolean isStopped() { |
| return server == null || !server.isAlive(); |
| } |
| |
| void handleShutdown() { |
| if (!shutdownHandled.compareAndSet(false, true)) { |
| // already shutdown |
| return; |
| } |
| if (productUseLog != null) { |
| productUseLog.close(); |
| } |
| if (internalDistributedSystem != null) { |
| internalDistributedSystem.setDependentLocator(null); |
| } |
| |
| if (internalCache != null && !stoppedForReconnect && !forcedDisconnect) { |
| logger.info("Closing locator's cache"); |
| try { |
| internalCache.close(); |
| } catch (RuntimeException ex) { |
| logger.info("Could not close locator's cache because: {}", ex.getMessage(), ex); |
| } |
| } |
| |
| if (locatorStats != null) { |
| locatorStats.close(); |
| } |
| |
| if (locatorListener != null) { |
| locatorListener.clearLocatorInfo(); |
| } |
| |
| isSharedConfigurationStarted = false; |
| if (internalDistributedSystem != null && !forcedDisconnect) { |
| if (internalDistributedSystem.isConnected()) { |
| logger.info("Disconnecting distributed system for {}", this); |
| internalDistributedSystem.disconnect(); |
| } |
| } |
| } |
| |
| /** |
| * Waits for a locator to be told to stop. |
| * |
| * @throws InterruptedException thrown if the thread is interrupted |
| */ |
| public void waitToStop() throws InterruptedException { |
| boolean restarted; |
| do { |
| DistributedSystem system = internalDistributedSystem; |
| restarted = false; |
| server.join(); |
| if (stoppedForReconnect) { |
| logger.info("waiting for distributed system to disconnect..."); |
| while (system.isConnected()) { |
| Thread.sleep(5000); |
| } |
| logger.info("waiting for distributed system to reconnect..."); |
| try { |
| restarted = system.waitUntilReconnected(-1, TimeUnit.SECONDS); |
| } catch (CancelException e) { |
| // reconnect attempt failed |
| } |
| if (restarted) { |
| logger.info("system restarted"); |
| } else { |
| logger.info("system was not restarted"); |
| } |
| Thread restartThread = this.restartThread; |
| if (restartThread != null) { |
| logger.info("waiting for services to restart..."); |
| restartThread.join(); |
| this.restartThread = null; |
| logger.info("done waiting for services to restart"); |
| } |
| } |
| } while (restarted); |
| } |
| |
| /** |
| * launch a thread that will restart location services |
| */ |
| private void launchRestartThread() { |
| String threadName = "Location services restart thread"; |
| restartThread = new LoggingThread(threadName, () -> { |
| boolean restarted = false; |
| try { |
| restarted = attemptReconnect(); |
| logger.info("attemptReconnect returned {}", restarted); |
| } catch (InterruptedException e) { |
| logger.info("attempt to restart location services was interrupted", e); |
| } catch (IOException e) { |
| logger.info("attempt to restart location services terminated", e); |
| } finally { |
| if (!restarted) { |
| stoppedForReconnect = false; |
| } |
| reconnected = restarted; |
| } |
| restartThread = null; |
| }); |
| restartThread.start(); |
| } |
| |
| public boolean isReconnected() { |
| return reconnected; |
| } |
| |
| /** |
| * reconnects the locator to a restarting DistributedSystem. If quorum checks are enabled this |
| * will start peer location services before a distributed system is available if the quorum check |
| * succeeds. It will then wait for the system to finish reconnecting before returning. If quorum |
| * checks are not being done this merely waits for the distributed system to reconnect and then |
| * starts location services. |
| * |
| * @return true if able to reconnect the locator to the new distributed system |
| */ |
| private boolean attemptReconnect() throws InterruptedException, IOException { |
| boolean restarted = false; |
| if (stoppedForReconnect) { |
| logger.info("attempting to restart locator"); |
| boolean tcpServerStarted = false; |
| InternalDistributedSystem system = internalDistributedSystem; |
| long waitTime = system.getConfig().getMaxWaitTimeForReconnect() / 2; |
| QuorumChecker quorumChecker = null; |
| |
| while (system.getReconnectedSystem() == null && !system.isReconnectCancelled()) { |
| if (quorumChecker == null) { |
| quorumChecker = internalDistributedSystem.getQuorumChecker(); |
| if (quorumChecker != null) { |
| logger.info("The distributed system returned this quorum checker: {}", quorumChecker); |
| } |
| } |
| |
| if (quorumChecker != null && !tcpServerStarted) { |
| boolean start = quorumChecker |
| .checkForQuorum(3L * internalDistributedSystem.getConfig().getMemberTimeout()); |
| if (start) { |
| // start up peer location. server location is started after the DS finishes reconnecting |
| logger.info("starting peer location"); |
| if (locatorListener != null) { |
| locatorListener.clearLocatorInfo(); |
| } |
| stoppedForReconnect = false; |
| internalDistributedSystem = null; |
| internalCache = null; |
| restartWithoutSystem(); |
| tcpServerStarted = true; |
| setLocator(this); |
| } |
| } |
| |
| try { |
| system.waitUntilReconnected(waitTime, TimeUnit.MILLISECONDS); |
| } catch (CancelException e) { |
| logger.info("Attempt to reconnect failed and further attempts have been terminated"); |
| stoppedForReconnect = false; |
| return false; |
| } |
| } |
| |
| InternalDistributedSystem newSystem = |
| (InternalDistributedSystem) system.getReconnectedSystem(); |
| if (newSystem != null) { |
| boolean noprevlocator = false; |
| if (!hasLocator()) { |
| setLocator(this); |
| noprevlocator = true; |
| } |
| if (!tcpServerStarted) { |
| if (locatorListener != null) { |
| locatorListener.clearLocatorInfo(); |
| } |
| stoppedForReconnect = false; |
| } |
| |
| try { |
| restartWithSystem(newSystem, GemFireCacheImpl.getInstance()); |
| } catch (CancelException e) { |
| stoppedForReconnect = true; |
| if (noprevlocator) { |
| removeLocator(this); |
| } |
| return false; |
| } |
| |
| restarted = true; |
| } |
| } |
| |
| logger.info("restart thread exiting. Service was {}restarted", restarted ? "" : "not "); |
| return restarted; |
| } |
| |
| private void restartWithoutSystem() throws IOException { |
| synchronized (locatorLock) { |
| if (locator != this && hasLocator()) { |
| throw new IllegalStateException( |
| "A locator can not be created because one already exists in this JVM."); |
| } |
| internalDistributedSystem = null; |
| internalCache = null; |
| |
| logger.info("Locator restart: initializing TcpServer peer location services"); |
| handler.restarting(null, null, null); |
| server.restarting(); |
| |
| if (productUseLog.isClosed()) { |
| productUseLog.reopen(); |
| } |
| |
| if (!server.isAlive()) { |
| logger.info("Locator restart: starting TcpServer"); |
| startTcpServer(); |
| } |
| } |
| } |
| |
| private void restartWithSystem(InternalDistributedSystem newSystem, InternalCache newCache) |
| throws IOException { |
| synchronized (locatorLock) { |
| if (locator != this && hasLocator()) { |
| throw new IllegalStateException( |
| "A locator can not be created because one already exists in this JVM."); |
| } |
| } |
| internalDistributedSystem = newSystem; |
| internalCache = newCache; |
| internalDistributedSystem.setDependentLocator(this); |
| logger.info("Locator restart: initializing TcpServer"); |
| |
| try { |
| handler.restarting(newSystem, newCache, configurationPersistenceService); |
| server.restarting(); |
| } catch (CancelException e) { |
| internalDistributedSystem = null; |
| internalCache = null; |
| logger.info("Locator restart: attempt to restart location services failed", e); |
| throw e; |
| } |
| |
| if (productUseLog.isClosed()) { |
| productUseLog.reopen(); |
| } |
| productUseLog.monitorUse(newSystem); |
| |
| if (isSharedConfigurationEnabled()) { |
| configurationPersistenceService = |
| new InternalConfigurationPersistenceService(newCache, workingDirectory, |
| JAXBService.create()); |
| startClusterManagementService(); |
| } |
| |
| if (!server.isAlive()) { |
| logger.info("Locator restart: starting TcpServer"); |
| startTcpServer(); |
| } |
| |
| logger.info("Locator restart: initializing JMX manager"); |
| startJmxManagerLocationService(newCache); |
| endStartLocator(internalDistributedSystem); |
| logger.info("Locator restart completed"); |
| |
| handler.restartCompleted(newSystem); |
| } |
| |
| public ClusterManagementService getClusterManagementService() { |
| return clusterManagementService; |
| } |
| |
| @Override |
| public DistributedSystem getDistributedSystem() { |
| if (internalDistributedSystem == null) { |
| return InternalDistributedSystem.getAnyInstance(); |
| } |
| |
| return internalDistributedSystem; |
| } |
| |
| @Override |
| public boolean isPeerLocator() { |
| return peerLocator; |
| } |
| |
| @Override |
| public boolean isServerLocator() { |
| return serverLocator != null; |
| } |
| |
| /** |
| * Returns null if no server locator; otherwise returns the advisee that represents the server |
| * locator. |
| */ |
| public ServerLocator getServerLocatorAdvisee() { |
| return serverLocator; |
| } |
| |
| /** |
| * Return the port on which the locator is actually listening. If called before the locator has |
| * actually started, this method will return null. |
| * |
| * @return the port the locator is listening on or null if it has not yet been started |
| */ |
| @Override |
| public Integer getPort() { |
| if (server != null) { |
| return server.getPort(); |
| } |
| return null; |
| } |
| |
| @Override |
| public LogConfig getLogConfig() { |
| return distributionConfig; |
| } |
| |
| @Override |
| public StatisticsConfig getStatisticsConfig() { |
| return distributionConfig; |
| } |
| |
| @Override |
| public void addLogConfigListener(LogConfigListener logConfigListener) { |
| logConfigListeners.add(logConfigListener); |
| } |
| |
| @Override |
| public void removeLogConfigListener(LogConfigListener logConfigListener) { |
| logConfigListeners.remove(logConfigListener); |
| } |
| |
| public SharedConfigurationStatusResponse getSharedConfigurationStatus() { |
| ExecutorService waitingPoolExecutor = |
| internalCache.getDistributionManager().getExecutors().getWaitingThreadPool(); |
| Future<SharedConfigurationStatusResponse> statusFuture = |
| waitingPoolExecutor.submit(new FetchSharedConfigStatus()); |
| SharedConfigurationStatusResponse response; |
| |
| try { |
| response = statusFuture.get(5, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| logger.info("Exception occurred while fetching the status {}", getStackTrace(e)); |
| response = new SharedConfigurationStatusResponse(); |
| response.setStatus(SharedConfigurationStatus.UNDETERMINED); |
| } |
| return response; |
| } |
| |
| @Override |
| public void onConnect(InternalDistributedSystem sys) { |
| try { |
| locatorStats.hookupStats(sys, |
| SocketCreator.getLocalHost().getCanonicalHostName() + '-' + server.getBindAddress()); |
| } catch (UnknownHostException e) { |
| logger.warn(e); |
| } |
| } |
| |
| /** |
| * Returns collection of locator strings representing every locator instance hosted by this |
| * member. |
| * |
| * @see #getLocators() |
| */ |
| public static Collection<String> getLocatorStrings() { |
| Collection<String> locatorStrings; |
| try { |
| Collection<DistributionLocatorId> locatorIds = asDistributionLocatorIds(getLocators()); |
| locatorStrings = DistributionLocatorId.asStrings(locatorIds); |
| } catch (UnknownHostException ignored) { |
| locatorStrings = null; |
| } |
| if (locatorStrings == null || locatorStrings.isEmpty()) { |
| return null; |
| } |
| return locatorStrings; |
| } |
| |
| private void startConfigurationPersistenceService() throws IOException { |
| installRequestHandlers(); |
| |
| if (!distributionConfig.getEnableClusterConfiguration()) { |
| logger.info("Cluster configuration service is disabled"); |
| return; |
| } |
| |
| if (!distributionConfig.getJmxManager()) { |
| throw new IllegalStateException( |
| "Cannot start cluster configuration without jmx-manager=true"); |
| } |
| |
| if (isSharedConfigurationStarted) { |
| logger.info("Cluster configuration service is already started."); |
| return; |
| } |
| |
| if (!isDedicatedLocator()) { |
| logger.info( |
| "Cluster configuration service not enabled as it is only supported in dedicated locators"); |
| return; |
| } |
| |
| |
| if (locator.configurationPersistenceService == null) { |
| // configurationPersistenceService will already be created in case of auto-reconnect |
| locator.configurationPersistenceService = |
| new InternalConfigurationPersistenceService(locator.internalCache, workingDirectory, |
| JAXBService.create()); |
| } |
| locator.configurationPersistenceService |
| .initSharedConfiguration(locator.loadFromSharedConfigDir()); |
| logger.info( |
| "Cluster configuration service start up completed successfully and is now running ...."); |
| isSharedConfigurationStarted = true; |
| } |
| |
| public void startJmxManagerLocationService(InternalCache internalCache) { |
| if (internalCache.getJmxManagerAdvisor() != null) { |
| if (!handler.isHandled(JmxManagerLocatorRequest.class)) { |
| handler.addHandler(JmxManagerLocatorRequest.class, |
| new JmxManagerLocator(internalCache)); |
| } |
| } |
| } |
| |
| private void installRequestHandlers() { |
| if (!handler.isHandled(SharedConfigurationStatusRequest.class)) { |
| handler.addHandler(SharedConfigurationStatusRequest.class, |
| new SharedConfigurationStatusRequestHandler()); |
| logger.info("SharedConfigStatusRequestHandler installed"); |
| } |
| |
| if (!handler.isHandled(ClusterManagementServiceInfoRequest.class)) { |
| handler.addHandler(ClusterManagementServiceInfoRequest.class, |
| new ClusterManagementServiceInfoRequestHandler()); |
| logger.info("ClusterManagementServiceInfoRequestHandler installed"); |
| } |
| } |
| |
| public boolean hasHandlerForClass(Class messageClass) { |
| return handler.isHandled(messageClass); |
| } |
| |
| class FetchSharedConfigStatus implements Callable<SharedConfigurationStatusResponse> { |
| |
| @Override |
| public SharedConfigurationStatusResponse call() throws InterruptedException { |
| InternalLocator locator = InternalLocator.this; |
| |
| SharedConfigurationStatusResponse response; |
| if (locator.configurationPersistenceService != null) { |
| response = locator.configurationPersistenceService.createStatusResponse(); |
| } else { |
| response = new SharedConfigurationStatusResponse(); |
| response.setStatus(SharedConfigurationStatus.UNDETERMINED); |
| } |
| return response; |
| } |
| } |
| |
| /** |
| * A helper object so that the TcpServer can record its stats to the proper place. Stats are only |
| * recorded if a distributed system is started. |
| */ |
| protected class DelayedPoolStatHelper implements PoolStatHelper { |
| |
| @Override |
| public void startJob() { |
| locatorStats.incRequestInProgress(1); |
| |
| } |
| |
| @Override |
| public void endJob() { |
| locatorStats.incRequestInProgress(-1); |
| } |
| } |
| } |