| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.admin.internal; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_ADDRESS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.StringTokenizer; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.FutureTask; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.admin.AdminException; |
| import org.apache.geode.admin.Alert; |
| import org.apache.geode.admin.AlertLevel; |
| import org.apache.geode.admin.AlertListener; |
| import org.apache.geode.admin.BackupStatus; |
| import org.apache.geode.admin.CacheServer; |
| import org.apache.geode.admin.CacheServerConfig; |
| import org.apache.geode.admin.CacheVm; |
| import org.apache.geode.admin.ConfigurationParameter; |
| import org.apache.geode.admin.DistributedSystemConfig; |
| import org.apache.geode.admin.DistributionLocator; |
| import org.apache.geode.admin.DistributionLocatorConfig; |
| import org.apache.geode.admin.GemFireHealth; |
| import org.apache.geode.admin.ManagedEntity; |
| import org.apache.geode.admin.ManagedEntityConfig; |
| import org.apache.geode.admin.OperationCancelledException; |
| import org.apache.geode.admin.RuntimeAdminException; |
| import org.apache.geode.admin.SystemMember; |
| import org.apache.geode.admin.SystemMemberCacheListener; |
| import org.apache.geode.admin.SystemMembershipEvent; |
| import org.apache.geode.admin.SystemMembershipListener; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.persistence.PersistentID; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.FutureCancelledException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.InternalLocator; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.admin.ApplicationVM; |
| import org.apache.geode.internal.admin.GemFireVM; |
| import org.apache.geode.internal.admin.GfManagerAgent; |
| import org.apache.geode.internal.admin.GfManagerAgentConfig; |
| import org.apache.geode.internal.admin.GfManagerAgentFactory; |
| import org.apache.geode.internal.admin.SSLConfig; |
| import org.apache.geode.internal.admin.remote.CompactRequest; |
| import org.apache.geode.internal.admin.remote.DistributionLocatorId; |
| import org.apache.geode.internal.admin.remote.MissingPersistentIDsRequest; |
| import org.apache.geode.internal.admin.remote.PrepareRevokePersistentIDRequest; |
| import org.apache.geode.internal.admin.remote.RemoteApplicationVM; |
| import org.apache.geode.internal.admin.remote.RemoteTransportConfig; |
| import org.apache.geode.internal.admin.remote.RevokePersistentIDRequest; |
| import org.apache.geode.internal.admin.remote.ShutdownAllRequest; |
| import org.apache.geode.internal.cache.backup.BackupOperation; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberPattern; |
| import org.apache.geode.internal.logging.Banner; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.LogWriterFactory; |
| import org.apache.geode.internal.logging.LoggingSession; |
| import org.apache.geode.internal.logging.NullLoggingSession; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.util.concurrent.FutureResult; |
| |
| /** |
| * Represents a GemFire distributed system for remote administration/management. |
| * |
| * @since GemFire 3.5 |
| */ |
| public class AdminDistributedSystemImpl implements org.apache.geode.admin.AdminDistributedSystem, |
| org.apache.geode.internal.admin.JoinLeaveListener, |
| org.apache.geode.internal.admin.AlertListener, |
| org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** String identity of this distributed system */ |
| private String id; |
| |
| /** Latest alert broadcast by any system members */ |
| private Alert latestAlert; |
| |
| // ------------------------------------------------------------------------- |
| |
| /** Internal admin agent to delegate low-level work to */ |
| private volatile GfManagerAgent gfManagerAgent; |
| |
| /** Monitors the health of this distributed system */ |
| private GemFireHealth health; |
| |
| /** Set of non-Manager members in this system */ |
| private final Set applicationSet = new HashSet(); |
| |
| /** Set of DistributionLocators for this system */ |
| private final Set locatorSet = new HashSet(); |
| |
| /** Set of dedicated CacheServer members in this system */ |
| private final Set cacheServerSet = new HashSet(); |
| |
| /** Configuration defining this distributed system */ |
| private final DistributedSystemConfigImpl config; |
| |
| /** Controller for starting and stopping managed entities */ |
| private ManagedEntityController controller; |
| |
| /** Log file collator for gathering and merging system member logs */ |
| private LogCollator logCollator = new LogCollator(); |
| |
| /** |
| * The level above which alerts will be delivered to the alert listeners |
| */ |
| private AlertLevel alertLevel = AlertLevel.WARNING; |
| |
| /** The alert listeners registered on this distributed system. */ |
| private volatile Set<AlertListener> alertListeners = Collections.emptySet(); |
| private final Object alertLock = new Object(); |
| |
| private InternalLogWriter logWriter; |
| |
| /** The membership listeners registered on this distributed system */ |
| private volatile Set membershipListeners = Collections.EMPTY_SET; |
| private final Object membershipLock = new Object(); |
| |
| /* The region listeners registered on this distributed system */ |
| // for feature requests #32887 |
| private volatile List cacheListeners = Collections.EMPTY_LIST; |
| private final Object cacheListLock = new Object(); |
| |
| private final LoggingSession loggingSession; |
| |
| /** |
| * reference to AdminDistributedSystemImpl instance for feature requests #32887. |
| * <p> |
| * Guarded by {@link #CONNECTION_SYNC}. |
| * <p> |
| * TODO: reimplement this change and SystemMemberCacheEventProcessor to avoid using this static. |
| * SystemMemberCacheEvents should only be sent to Admin VMs that express interest. |
| * <p> |
| * This is volatile to allow SystemFailure to deliver fatal poison-pill to thisAdminDS without |
| * waiting on synchronization. |
| * |
| * @guarded.By CONNECTION_SYNC |
| */ |
| @MakeNotStatic |
| private static volatile AdminDistributedSystemImpl thisAdminDS; |
| |
| /** |
| * Provides synchronization for {@link #connect()} and {@link #disconnect()}. {@link #thisAdminDS} |
| * is also now protected by CONNECTION_SYNC and has its lifecycle properly tied to |
| * connect/disconnect. |
| */ |
| private static final Object CONNECTION_SYNC = new Object(); |
| |
| |
| // ------------------------------------------------------------------------- |
| // Constructor(s) |
| // ------------------------------------------------------------------------- |
| |
| private static LoggingSession createLoggingSession() { |
| return NullLoggingSession.create(); |
| } |
| |
| /** |
| * Constructs new DistributedSystemImpl with the given configuration. |
| * |
| * @param config configuration defining this distributed system |
| */ |
| public AdminDistributedSystemImpl(DistributedSystemConfigImpl config) { |
| loggingSession = createLoggingSession(); |
| |
| // init from config... |
| this.config = config; |
| |
| String systemId = this.config.getSystemId(); |
| if (systemId != null && systemId.length() > 0) { |
| this.id = systemId; |
| |
| } |
| if (this.getLocators() != null && this.getLocators().length() > 0) { |
| this.id = this.getLocators(); |
| |
| } else { |
| this.id = new StringBuffer(this.getMcastAddress()).append("[").append(this.getMcastPort()) |
| .append("]").toString(); |
| } |
| |
| // LOG: create LogWriterAppender unless one already exists |
| loggingSession.startSession(); |
| |
| // LOG: look in DistributedSystemConfigImpl for existing LogWriter to use |
| InternalLogWriter existingLogWriter = this.config.getInternalLogWriter(); |
| if (existingLogWriter != null) { |
| this.logWriter = existingLogWriter; |
| } else { |
| // LOG: create LogWriterLogger |
| this.logWriter = LogWriterFactory.createLogWriterLogger(this.config.createLogConfig(), false); |
| if (!Boolean.getBoolean(InternalLocator.INHIBIT_DM_BANNER)) { |
| // LOG: changed statement from config to info |
| this.logWriter.info(new Banner().getString()); |
| } else { |
| logger.debug("skipping banner - " + InternalLocator.INHIBIT_DM_BANNER + " is set to true"); |
| } |
| // Set this log writer in DistributedSystemConfigImpl |
| this.config.setInternalLogWriter(this.logWriter); |
| } |
| |
| // set up other details that depend on config attrs... |
| this.controller = ManagedEntityControllerFactory.createManagedEntityController(this); |
| initializeDistributionLocators(); |
| initializeCacheServers(); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Initialization |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Creates DistributionLocator instances for every locator entry in the |
| * {@link org.apache.geode.admin.DistributedSystemConfig} |
| */ |
| private void initializeDistributionLocators() { |
| DistributionLocatorConfig[] configs = this.config.getDistributionLocatorConfigs(); |
| if (configs.length == 0) { |
| // No work to do |
| return; |
| } |
| |
| for (int i = 0; i < configs.length; i++) { |
| // the Locator impl may vary in this class from the config... |
| DistributionLocatorConfig conf = configs[i]; |
| DistributionLocator locator = createDistributionLocatorImpl(conf); |
| this.locatorSet.add(new FutureResult(locator)); |
| } |
| // update locators string... |
| setLocators(parseLocatorSet()); |
| } |
| |
| /** |
| * Creates <code>CacheServer</code> instances for every cache server entry in the |
| * {@link org.apache.geode.admin.DistributedSystemConfig} |
| */ |
| private void initializeCacheServers() { |
| CacheServerConfig[] cacheServerConfigs = this.config.getCacheServerConfigs(); |
| for (int i = 0; i < cacheServerConfigs.length; i++) { |
| try { |
| CacheServerConfig conf = cacheServerConfigs[i]; |
| CacheServerConfigImpl copy = new CacheServerConfigImpl(conf); |
| this.cacheServerSet.add(new FutureResult(createCacheServer(copy))); |
| } catch (java.lang.Exception e) { |
| logger.warn(e.getMessage(), e); |
| continue; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (java.lang.Error e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.error(e.getMessage(), e); |
| continue; |
| } |
| } |
| } |
| |
| /** |
| * Checks to make sure that {@link #connect()} has been called. |
| * |
| * @throws IllegalStateException If {@link #connect()} has not been called. |
| */ |
| private void checkConnectCalled() { |
| if (this.gfManagerAgent == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Attributes of this DistributedSystem |
| // ------------------------------------------------------------------------- |
| |
| public GfManagerAgent getGfManagerAgent() { |
| return this.gfManagerAgent; |
| } |
| |
| @Override |
| public boolean isConnected() { |
| return this.gfManagerAgent != null && this.gfManagerAgent.isConnected(); |
| } |
| |
| @Override |
| public String getId() { |
| return this.id; |
| } |
| |
| @Override |
| public String getName() { |
| String name = this.config.getSystemName(); |
| if (name != null && name.length() > 0) { |
| return name; |
| |
| } else { |
| return getId(); |
| } |
| } |
| |
| public String getSystemName() { |
| return this.config.getSystemName(); |
| } |
| |
| @Override |
| public String getRemoteCommand() { |
| return this.config.getRemoteCommand(); |
| } |
| |
| @Override |
| public void setRemoteCommand(String remoteCommand) { |
| this.config.setRemoteCommand(remoteCommand); |
| } |
| |
| @Override |
| public void setAlertLevel(AlertLevel level) { |
| if (this.isConnected()) { |
| this.gfManagerAgent.setAlertLevel(level.getSeverity()); |
| } |
| |
| this.alertLevel = level; |
| } |
| |
| @Override |
| public AlertLevel getAlertLevel() { |
| return this.alertLevel; |
| } |
| |
| @Override |
| public void addAlertListener(AlertListener listener) { |
| synchronized (this.alertLock) { |
| Set<AlertListener> oldListeners = this.alertListeners; |
| if (!oldListeners.contains(listener)) { |
| Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners); |
| newListeners.add(listener); |
| this.alertListeners = newListeners; |
| } |
| } |
| } |
| |
| public int getAlertListenerCount() { |
| synchronized (this.alertLock) { |
| return this.alertListeners.size(); |
| } |
| } |
| |
| @Override |
| public void removeAlertListener(AlertListener listener) { |
| synchronized (this.alertLock) { |
| Set<AlertListener> oldListeners = this.alertListeners; |
| if (oldListeners.contains(listener)) { // fixed bug 34687 |
| Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners); |
| if (newListeners.remove(listener)) { |
| this.alertListeners = newListeners; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void addMembershipListener(SystemMembershipListener listener) { |
| synchronized (this.membershipLock) { |
| Set oldListeners = this.membershipListeners; |
| if (!oldListeners.contains(listener)) { |
| Set newListeners = new HashSet(oldListeners); |
| newListeners.add(listener); |
| this.membershipListeners = newListeners; |
| } |
| } |
| } |
| |
| @Override |
| public void removeMembershipListener(SystemMembershipListener listener) { |
| synchronized (this.membershipLock) { |
| Set oldListeners = this.membershipListeners; |
| if (oldListeners.contains(listener)) { // fixed bug 34687 |
| Set newListeners = new HashSet(oldListeners); |
| if (newListeners.remove(listener)) { |
| this.membershipListeners = newListeners; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String getMcastAddress() { |
| return this.config.getMcastAddress(); |
| } |
| |
| @Override |
| public int getMcastPort() { |
| return this.config.getMcastPort(); |
| } |
| |
| public boolean getDisableTcp() { |
| return this.config.getDisableTcp(); |
| } |
| |
| public boolean getDisableAutoReconnect() { |
| return this.config.getDisableAutoReconnect(); |
| } |
| |
| @Override |
| public String getLocators() { |
| return this.config.getLocators(); |
| } |
| |
| protected void setLocators(String locators) { |
| this.config.setLocators(locators); |
| } |
| |
| public String getMembershipPortRange() { |
| return this.getConfig().getMembershipPortRange(); |
| } |
| |
| /** get the direct-channel port to use, or zero if not set */ |
| public int getTcpPort() { |
| return this.getConfig().getTcpPort(); |
| } |
| |
| public void setTcpPort(int port) { |
| this.getConfig().setTcpPort(port); |
| } |
| |
| public void setMembershipPortRange(String membershipPortRange) { |
| this.getConfig().setMembershipPortRange(membershipPortRange); |
| } |
| |
| @Override |
| public DistributedSystemConfig getConfig() { |
| return this.config; |
| } |
| |
| /** |
| * Returns true if any members of this system are currently running. |
| */ |
| @Override |
| public boolean isRunning() { |
| if (this.gfManagerAgent == null) |
| return false; |
| // is there a better way?? |
| // this.gfManagerAgent.isConnected() ... this.gfManagerAgent.isListening() |
| |
| if (isAnyMemberRunning()) |
| return true; |
| return false; |
| } |
| |
| /** Returns true if this system can use multicast for communications */ |
| @Override |
| public boolean isMcastEnabled() { |
| return this.getMcastPort() > 0; |
| } |
| |
| ManagedEntityController getEntityController() { |
| return this.controller; |
| } |
| |
| private static final String TIMEOUT_MS_NAME = "AdminDistributedSystemImpl.TIMEOUT_MS"; |
| private static final int TIMEOUT_MS_DEFAULT = 60000; // 30000 -- see bug36470 |
| private static final int TIMEOUT_MS = |
| Integer.getInteger(TIMEOUT_MS_NAME, TIMEOUT_MS_DEFAULT).intValue(); |
| |
| |
| // ------------------------------------------------------------------------- |
| // Operations of this DistributedSystem |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Starts all managed entities in this system. |
| */ |
| @Override |
| public void start() throws AdminException { |
| // Wait for each managed entity to start (see bug 32569) |
| DistributionLocator[] locs = getDistributionLocators(); |
| for (int i = 0; i < locs.length; i++) { |
| locs[i].start(); |
| } |
| for (int i = 0; i < locs.length; i++) { |
| try { |
| if (!locs[i].waitToStart(TIMEOUT_MS)) { |
| throw new AdminException( |
| String.format("%s did not start after %s ms", |
| new Object[] {locs[i], Integer.valueOf(TIMEOUT_MS)})); |
| } |
| |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| throw new AdminException( |
| String.format("Interrupted while waiting for %s to start.", |
| locs[i]), |
| ex); |
| } |
| } |
| |
| CacheServer[] servers = getCacheServers(); |
| for (int i = 0; i < servers.length; i++) { |
| servers[i].start(); |
| } |
| for (int i = 0; i < servers.length; i++) { |
| try { |
| if (!servers[i].waitToStart(TIMEOUT_MS)) { |
| throw new AdminException( |
| String.format("%s did not start after %s ms", |
| new Object[] {servers[i], Integer.valueOf(TIMEOUT_MS)})); |
| } |
| |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| throw new AdminException( |
| String.format("Interrupted while waiting for %s to start.", |
| servers[i]), |
| ex); |
| } |
| } |
| } |
| |
| /** |
| * Stops all GemFire managers that are members of this system. |
| */ |
| @Override |
| public void stop() throws AdminException { |
| // Stop cache server before GemFire managers because the cache |
| // server might host a cache proxy that is dependent on the |
| // manager. See bug 32569. |
| |
| // Wait for each managed entity to stop (see bug 32569) |
| long timeout = 30; |
| |
| CacheServer[] servers = getCacheServers(); |
| for (int i = 0; i < servers.length; i++) { |
| servers[i].stop(); |
| } |
| for (int i = 0; i < servers.length; i++) { |
| try { |
| if (!servers[i].waitToStop(timeout * 1000)) { |
| throw new AdminException( |
| String.format("%s did not stop after %s seconds.", |
| new Object[] {servers[i], Long.valueOf(timeout)})); |
| } |
| |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| throw new AdminException( |
| String.format("Interrupted while waiting for %s to stop.", |
| servers[i]), |
| ex); |
| } |
| } |
| |
| DistributionLocator[] locs = getDistributionLocators(); |
| for (int i = 0; i < locs.length; i++) { |
| locs[i].stop(); |
| } |
| for (int i = 0; i < locs.length; i++) { |
| try { |
| if (!locs[i].waitToStop(timeout * 1000)) { |
| throw new AdminException( |
| String.format("%s did not stop after %s seconds.", |
| new Object[] {locs[i], Long.valueOf(timeout)})); |
| } |
| |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| throw new AdminException( |
| String.format("Interrupted while waiting for %s to stop.", |
| locs[i]), |
| ex); |
| } |
| } |
| } |
| |
| /** Display merged system member logs */ |
| @Override |
| public String displayMergedLogs() { |
| return this.logCollator.collateLogs(this.gfManagerAgent); |
| } |
| |
| /** |
| * Returns the license for this GemFire product; else null if unable to retrieve license |
| * information |
| * |
| * @return license for this GemFire product |
| */ |
| @Override |
| public java.util.Properties getLicense() { |
| SystemMember member = findFirstRunningMember(); |
| if (member != null) { |
| return new Properties(); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Sets the distribution-related portion of the given managed entity's configuration so that the |
| * entity is part of this distributed system. |
| * |
| * @throws AdminException TODO-javadocs |
| */ |
| private void setDistributionParameters(SystemMember member) throws AdminException { |
| |
| Assert.assertTrue(member instanceof ManagedSystemMemberImpl); |
| |
| // set some config parms to match this system... |
| ConfigurationParameter[] configParms = new ConfigurationParameter[] { |
| new ConfigurationParameterImpl(MCAST_PORT, Integer.valueOf(this.config.getMcastPort())), |
| new ConfigurationParameterImpl(LOCATORS, this.config.getLocators()), |
| new ConfigurationParameterImpl(MCAST_ADDRESS, |
| InetAddressUtil.toInetAddress(this.config.getMcastAddress())), |
| new ConfigurationParameterImpl(DISABLE_TCP, Boolean.valueOf(this.config.getDisableTcp())),}; |
| member.setConfiguration(configParms); |
| } |
| |
| /** |
| * Handles an <code>ExecutionException</code> by examining its cause and throwing an appropriate |
| * runtime exception. |
| */ |
| private static void handle(ExecutionException ex) { |
| Throwable cause = ex.getCause(); |
| |
| if (cause instanceof OperationCancelledException) { |
| // Operation was cancelled, we don't necessary want to propagate |
| // this up to the user. |
| return; |
| } |
| if (cause instanceof CancelException) { // bug 37285 |
| throw new FutureCancelledException( |
| "Future cancelled due to shutdown", |
| ex); |
| } |
| |
| // Don't just throw the cause because the stack trace can be |
| // misleading. For instance, the cause might have occurred in a |
| // different thread. In addition to the cause, we also want to |
| // know which code was waiting for the Future. |
| throw new RuntimeAdminException( |
| "While waiting for Future", |
| ex); |
| } |
| |
| protected void checkCancellation() { |
| DistributionManager dm = this.getDistributionManager(); |
| // TODO does dm == null mean we're dead? |
| if (dm != null) { |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| } |
| } |
| |
| /** |
| * Returns a list of manageable SystemMember instances for each member of this distributed system. |
| * |
| * @return array of system members for each non-manager member |
| */ |
| @Override |
| public SystemMember[] getSystemMemberApplications() throws org.apache.geode.admin.AdminException { |
| synchronized (this.applicationSet) { |
| Collection coll = new ArrayList(this.applicationSet.size()); |
| APPS: for (Iterator iter = this.applicationSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| coll.add(future.get()); |
| break; |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue APPS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue APPS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } // APPS |
| SystemMember[] array = new SystemMember[coll.size()]; |
| coll.toArray(array); |
| return array; |
| } |
| } |
| |
| /** |
| * Display in readable format the latest Alert in this distributed system. |
| * |
| * TODO: create an external admin api object for Alert |
| */ |
| @Override |
| public String getLatestAlert() { |
| if (this.latestAlert == null) { |
| return ""; |
| } |
| return this.latestAlert.toString(); |
| } |
| |
| /** |
| * Connects to the currently configured system. |
| */ |
| @Override |
| public void connect() { |
| connect(this.logWriter); |
| } |
| |
| /** |
| * Connects to the currently configured system. This method is public for internal use only |
| * (testing, for example). |
| * |
| * <p> |
| * |
| * See {@link org.apache.geode.distributed.DistributedSystem#connect} for a list of exceptions |
| * that may be thrown. |
| * |
| * @param logWriter the InternalLogWriter to use for any logging |
| */ |
| public void connect(InternalLogWriter logWriter) { |
| synchronized (CONNECTION_SYNC) { |
| // Check if the gfManagerAgent is NOT null. |
| // If it is already listening, then just return since the connection is already established OR |
| // in process. |
| // Otherwise cleanup the state of AdminDistributedSystemImpl. This needs to happen |
| // automatically. |
| if (this.gfManagerAgent != null) { |
| if (this.gfManagerAgent.isListening()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "The RemoteGfManagerAgent is already listening for this AdminDistributedSystem."); |
| } |
| return; |
| } |
| this.disconnect(); |
| } |
| |
| if (thisAdminDS != null) { // TODO: beef up toString and add thisAdminDS |
| throw new IllegalStateException( |
| "Only one AdminDistributedSystem connection can be made at once."); |
| } |
| |
| thisAdminDS = this; // added for feature requests #32887 |
| |
| if (this.getLocators().length() == 0) { |
| this.id = this.getMcastAddress() + "[" + this.getMcastPort() + "]"; |
| |
| } else { |
| this.id = this.getLocators(); |
| } |
| |
| if (this.config instanceof DistributedSystemConfigImpl) { |
| ((DistributedSystemConfigImpl) this.config).validate(); |
| ((DistributedSystemConfigImpl) this.config).setDistributedSystem(this); |
| } |
| |
| // LOG: passes the AdminDistributedSystemImpl LogWriterLogger into GfManagerAgentConfig for |
| // RemoteGfManagerAgent |
| GfManagerAgent agent = GfManagerAgentFactory.getManagerAgent(buildAgentConfig(logWriter)); |
| this.gfManagerAgent = agent; |
| |
| // sync to prevent bug 33341 Admin API can double-represent system members |
| synchronized (this.membershipListenerLock) { |
| // build the list of applications... |
| ApplicationVM[] apps = this.gfManagerAgent.listApplications(); |
| for (int i = 0; i < apps.length; i++) { |
| try { |
| nodeJoined(null, apps[i]); |
| } catch (RuntimeAdminException e) { |
| this.logWriter.warning("encountered a problem processing member " + apps[i]); |
| } |
| } |
| } |
| |
| // Build admin objects for all locators (see bug 31959) |
| String locators = this.getLocators(); |
| StringTokenizer st = new StringTokenizer(locators, ","); |
| NEXT: while (st.hasMoreTokens()) { |
| String locator = st.nextToken(); |
| int first = locator.indexOf("["); |
| int last = locator.indexOf("]"); |
| String host = locator.substring(0, first); |
| int colidx = host.lastIndexOf('@'); |
| if (colidx < 0) { |
| colidx = host.lastIndexOf(':'); |
| } |
| String bindAddr = null; |
| if (colidx > 0 && colidx < (host.length() - 1)) { |
| String orig = host; |
| bindAddr = host.substring(colidx + 1, host.length()); |
| host = host.substring(0, colidx); |
| // if the host contains a colon and there's no '@', we probably |
| // parsed an ipv6 address incorrectly - try again |
| if (host.indexOf(':') >= 0) { |
| int bindidx = orig.lastIndexOf('@'); |
| if (bindidx >= 0) { |
| host = orig.substring(0, bindidx); |
| bindAddr = orig.substring(bindidx + 1); |
| } else { |
| host = orig; |
| bindAddr = null; |
| } |
| } |
| } |
| int port = Integer.parseInt(locator.substring(first + 1, last)); |
| |
| synchronized (this.locatorSet) { |
| LOCATORS: for (Iterator iter = this.locatorSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| DistributionLocatorImpl impl = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| impl = (DistributionLocatorImpl) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue LOCATORS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue LOCATORS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| DistributionLocatorConfig conf = impl.getConfig(); |
| |
| InetAddress host1 = InetAddressUtil.toInetAddress(host); |
| InetAddress host2 = InetAddressUtil.toInetAddress(conf.getHost()); |
| if (port == conf.getPort() && host1.equals(host2)) { |
| // Already have an admin object for this locator |
| continue NEXT; |
| } |
| } |
| } |
| |
| // None of the existing locators matches the locator in the |
| // string. Contact the locator to get information and create |
| // an admin object for it. |
| InetAddress bindAddress = null; |
| if (bindAddr != null) { |
| bindAddress = InetAddressUtil.toInetAddress(bindAddr); |
| } |
| DistributionLocatorConfig conf = |
| DistributionLocatorConfigImpl.createConfigFor(host, port, bindAddress); |
| if (conf != null) { |
| DistributionLocator impl = createDistributionLocatorImpl(conf); |
| synchronized (this.locatorSet) { |
| this.locatorSet.add(new FutureResult(impl)); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Polls to determine whether or not the connection to the distributed system has been made. |
| */ |
| @Override |
| public boolean waitToBeConnected(long timeout) throws InterruptedException { |
| |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| |
| checkConnectCalled(); |
| |
| long start = System.currentTimeMillis(); |
| while (System.currentTimeMillis() - start < timeout) { |
| if (this.gfManagerAgent.isInitialized()) { |
| return true; |
| |
| } else { |
| Thread.sleep(100); |
| } |
| } |
| |
| return this.isConnected(); |
| } |
| |
| /** |
| * Closes all connections and resources to the connected distributed system. |
| * |
| * @see org.apache.geode.distributed.DistributedSystem#disconnect() |
| */ |
| @Override |
| public void disconnect() { |
| synchronized (CONNECTION_SYNC) { |
| loggingSession.stopSession(); |
| try { |
| if (thisAdminDS == this) { |
| thisAdminDS = null; |
| } |
| if (this.gfManagerAgent != null && this.gfManagerAgent.isListening()) { |
| synchronized (this) { |
| if (this.health != null) { |
| this.health.close(); |
| } |
| } |
| this.gfManagerAgent.removeJoinLeaveListener(this); |
| this.gfManagerAgent.disconnect(); |
| } |
| this.gfManagerAgent = null; |
| if (this.config instanceof DistributedSystemConfigImpl) { |
| ((DistributedSystemConfigImpl) this.config).setDistributedSystem(null); |
| } |
| } finally { |
| loggingSession.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Returns the DistributionManager this implementation is using to connect to the distributed |
| * system. |
| */ |
| public DistributionManager getDistributionManager() { |
| if (this.gfManagerAgent == null) { |
| return null; |
| } |
| return this.gfManagerAgent.getDM(); |
| |
| } |
| |
| /** |
| * Returns the internal admin API's agent used for administering this |
| * <code>AdminDistributedSystem</code>. |
| * |
| * @since GemFire 4.0 |
| */ |
| public GfManagerAgent getAdminAgent() { |
| return this.gfManagerAgent; |
| } |
| |
| /** |
| * Adds a new, unstarted <code>DistributionLocator</code> to this distributed system. |
| */ |
| @Override |
| public DistributionLocator addDistributionLocator() { |
| DistributionLocatorConfig conf = new DistributionLocatorConfigImpl(); |
| DistributionLocator locator = createDistributionLocatorImpl(conf); |
| synchronized (this.locatorSet) { |
| this.locatorSet.add(new FutureResult(locator)); |
| } |
| |
| // update locators string... |
| setLocators(parseLocatorSet()); |
| return locator; |
| } |
| |
| @Override |
| public DistributionLocator[] getDistributionLocators() { |
| synchronized (this.locatorSet) { |
| Collection coll = new ArrayList(this.locatorSet.size()); |
| LOCATORS: for (Iterator iter = this.locatorSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| coll.add(future.get()); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue LOCATORS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue LOCATORS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| |
| DistributionLocator[] array = new DistributionLocator[coll.size()]; |
| coll.toArray(array); |
| return array; |
| } |
| } |
| |
| /** |
| * Updates the locator string that is used to discover members of the distributed system. |
| * |
| * @see #getLocators |
| */ |
| void updateLocatorsString() { |
| this.setLocators(parseLocatorSet()); |
| } |
| |
| protected String parseLocatorSet() { |
| StringBuffer sb = new StringBuffer(); |
| LOCATORS: for (Iterator iter = this.locatorSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| DistributionLocator locator = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| locator = (DistributionLocator) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue LOCATORS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue LOCATORS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| sb.append(locator.getConfig().getHost()); |
| sb.append("[").append(locator.getConfig().getPort()).append("]"); |
| |
| if (iter.hasNext()) { |
| sb.append(","); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Listener callback methods |
| // ------------------------------------------------------------------------- |
| |
| /** sync to prevent bug 33341 Admin API can double-represent system members */ |
| private final Object membershipListenerLock = new Object(); |
| |
| // --------- org.apache.geode.internal.admin.JoinLeaveListener --------- |
| /** |
| * Listener callback for when a member has joined this DistributedSystem. |
| * <p> |
| * React by adding the SystemMember to this system's internal lists, if they are not already |
| * there. Notice that we add a {@link Future} into the list so that the admin object is not |
| * initialized while locks are held. |
| * |
| * @param source the distributed system that fired nodeJoined |
| * @param vm the VM that joined |
| * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeJoined |
| */ |
| @Override |
| public void nodeJoined(GfManagerAgent source, final GemFireVM vm) { |
| // sync to prevent bug 33341 Admin API can double-represent system members |
| synchronized (this.membershipListenerLock) { |
| |
| // does it already exist? |
| SystemMember member = findSystemMember(vm); |
| |
| // if not then create it... |
| if (member == null) { |
| FutureTask future = null; |
| if (vm instanceof ApplicationVM) { |
| final ApplicationVM app = (ApplicationVM) vm; |
| if (app.isDedicatedCacheServer()) { |
| synchronized (this.cacheServerSet) { |
| future = new AdminFutureTask(vm.getId(), new Callable() { |
| @Override |
| public Object call() throws Exception { |
| logger.info(LogMarker.DM_MARKER, "Adding new CacheServer for {}", |
| vm); |
| return createCacheServer(app); |
| } |
| }); |
| |
| this.cacheServerSet.add(future); |
| } |
| |
| } else { |
| synchronized (this.applicationSet) { |
| future = new AdminFutureTask(vm.getId(), new Callable() { |
| @Override |
| public Object call() throws Exception { |
| logger.info(LogMarker.DM_MARKER, "Adding new Application for {}", |
| vm); |
| return createSystemMember(app); |
| } |
| }); |
| this.applicationSet.add(future); |
| } |
| } |
| |
| } else { |
| Assert.assertTrue(false, "Unknown GemFireVM type: " + vm.getClass().getName()); |
| } |
| |
| // Wait for the SystemMember to be created. We want to do this |
| // outside of the "set" locks. |
| future.run(); |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| member = (SystemMember) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| return; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| Assert.assertTrue(member != null); |
| |
| // moved this up into the if that creates a new member to fix bug 34517 |
| SystemMembershipEvent event = new SystemMembershipEventImpl(member.getDistributedMember()); |
| for (Iterator iter = this.membershipListeners.iterator(); iter.hasNext();) { |
| SystemMembershipListener listener = (SystemMembershipListener) iter.next(); |
| listener.memberJoined(event); |
| } |
| } |
| |
| } |
| } |
| |
| /** |
| * Listener callback for when a member of this DistributedSystem has left. |
| * <p> |
| * Reacts by removing the member. |
| * |
| * @param source the distributed system that fired nodeCrashed |
| * @param vm the VM that left |
| * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeLeft |
| */ |
| @Override |
| public void nodeLeft(GfManagerAgent source, GemFireVM vm) { |
| // sync to prevent bug 33341 Admin API can double-represent system members |
| synchronized (this.membershipListenerLock) { |
| // member has left... |
| SystemMember member = AdminDistributedSystemImpl.this.removeSystemMember(vm.getId()); |
| if (member == null) { |
| return; // reinstated this early-out because removal does not fix 39429 |
| } |
| |
| // Can't call member.getId() because it is nulled-out when the |
| // SystemMember is removed. |
| SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId()); |
| for (Iterator iter = this.membershipListeners.iterator(); iter.hasNext();) { |
| SystemMembershipListener listener = (SystemMembershipListener) iter.next(); |
| listener.memberLeft(event); |
| } |
| } |
| } |
| |
| /** |
| * Listener callback for when a member of this DistributedSystem has crashed. |
| * <p> |
| * Reacts by removing the member. |
| * |
| * @param source the distributed system that fired nodeCrashed |
| * @param vm the VM that crashed |
| * @see org.apache.geode.internal.admin.JoinLeaveListener#nodeCrashed |
| */ |
| @Override |
| public void nodeCrashed(GfManagerAgent source, GemFireVM vm) { |
| // sync to prevent bug 33341 Admin API can double-represent system members |
| synchronized (this.membershipListenerLock) { |
| // member has crashed... |
| SystemMember member = AdminDistributedSystemImpl.this.removeSystemMember(vm.getId()); |
| if (member == null) { |
| // Unknown member crashed. Hmm... |
| return; |
| } |
| |
| // Can't call member.getId() because it is nulled-out when the |
| // SystemMember is removed. |
| SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId()); |
| for (Iterator iter = this.membershipListeners.iterator(); iter.hasNext();) { |
| SystemMembershipListener listener = (SystemMembershipListener) iter.next(); |
| listener.memberCrashed(event); |
| } |
| } |
| } |
| |
| // ----------- org.apache.geode.internal.admin.AlertListener ----------- |
| /** |
| * Listener callback for when a SystemMember of this DistributedSystem has crashed. |
| * |
| * @param alert the latest alert from the system |
| * @see org.apache.geode.internal.admin.AlertListener#alert |
| */ |
| @Override |
| public void alert(org.apache.geode.internal.admin.Alert alert) { |
| if (AlertLevel.forSeverity(alert.getLevel()).ordinal < alertLevel.ordinal) { |
| return; |
| } |
| Alert alert2 = new AlertImpl(alert); |
| this.latestAlert = alert2; |
| for (Iterator<AlertListener> iter = this.alertListeners.iterator(); iter.hasNext();) { |
| AlertListener listener = iter.next(); |
| listener.alert(alert2); |
| } |
| } |
| |
| @Override |
| public void onDisconnect(InternalDistributedSystem sys) { |
| logger.debug("Calling AdminDistributedSystemImpl#onDisconnect"); |
| disconnect(); |
| logger.debug("Completed AdminDistributedSystemImpl#onDisconnect"); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Template methods overriden from superclass... |
| // ------------------------------------------------------------------------- |
| |
| protected CacheServer createCacheServer(ApplicationVM member) throws AdminException { |
| |
| return new CacheServerImpl(this, member); |
| } |
| |
| protected CacheServer createCacheServer(CacheServerConfigImpl conf) throws AdminException { |
| |
| return new CacheServerImpl(this, conf); |
| } |
| |
| /** |
| * Override createSystemMember by instantiating SystemMemberImpl |
| * |
| * @throws AdminException TODO-javadocs |
| */ |
| protected SystemMember createSystemMember(ApplicationVM app) |
| throws org.apache.geode.admin.AdminException { |
| return new SystemMemberImpl(this, app); |
| } |
| |
| /** |
| * Constructs & returns a SystemMember instance using the corresponding InternalDistributedMember |
| * object. |
| * |
| * @param member InternalDistributedMember instance for which a SystemMember instance is to be |
| * constructed. |
| * @return constructed SystemMember instance |
| * @throws org.apache.geode.admin.AdminException if construction of SystemMember instance fails |
| * @since GemFire 6.5 |
| */ |
| protected SystemMember createSystemMember(InternalDistributedMember member) |
| throws org.apache.geode.admin.AdminException { |
| return new SystemMemberImpl(this, member); |
| } |
| |
| /** |
| * Template-method for creating a new <code>DistributionLocatorImpl</code> instance. |
| */ |
| protected DistributionLocatorImpl createDistributionLocatorImpl(DistributionLocatorConfig conf) { |
| return new DistributionLocatorImpl(conf, this); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Non-public implementation methods... TODO: narrow access levels |
| // ------------------------------------------------------------------------- |
| |
| // TODO: public void connect(...) could stand to have some internals factored out |
| |
| /** |
| * Returns List of Locators including Locators or Multicast. |
| * |
| * @return list of locators or multicast values |
| */ |
| protected List parseLocators() { |
| |
| // assumes host[port] format, delimited by "," |
| List locatorIds = new ArrayList(); |
| if (isMcastEnabled()) { |
| String mcastId = new StringBuffer(this.getMcastAddress()).append("[") |
| .append(this.getMcastPort()).append("]").toString(); |
| locatorIds.add(new DistributionLocatorId(mcastId)); |
| } |
| StringTokenizer st = new StringTokenizer(this.getLocators(), ","); |
| while (st.hasMoreTokens()) { |
| locatorIds.add(new DistributionLocatorId(st.nextToken())); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| StringBuffer sb = new StringBuffer("Locator set is: "); |
| for (Iterator iter = locatorIds.iterator(); iter.hasNext();) { |
| sb.append(iter.next()); |
| sb.append(" "); |
| } |
| logger.debug(sb); |
| } |
| |
| return locatorIds; |
| } |
| |
| /** |
| * Returns whether or not a <code>SystemMember</code> corresponds to a <code>GemFireVM</code>. |
| * |
| * @param examineConfig Should we take the configuration of the member into consideration? In |
| * general, we want to consider the configuration when a member starts up. But when we are |
| * notified that it has shut down, we do not want to examine the configuration because that |
| * might involve contacting the member. Which, of course, cannot be done because it has |
| * shut down. |
| */ |
| private boolean isSame(SystemMemberImpl member, GemFireVM vm, boolean examineConfig) { |
| if (vm.equals(member.getGemFireVM())) { |
| return true; |
| } |
| |
| InternalDistributedMember memberId = member.getInternalId(); |
| InternalDistributedMember vmId = vm.getId(); |
| |
| if (vmId.equals(memberId)) { |
| return true; |
| } |
| |
| if ((member instanceof ManagedSystemMemberImpl) && examineConfig) { |
| |
| // We can't compare information about managers because the |
| // member might have already gone away. Attempts to send it |
| // messages (to get its product directory, for instance) will |
| // time out. |
| |
| ManagedSystemMemberImpl entity = (ManagedSystemMemberImpl) member; |
| |
| // Make sure that the type of the managed entity matches the |
| // type of the internal admin object. |
| if (entity instanceof CacheServer) { |
| if (!(vm instanceof ApplicationVM)) { |
| return false; |
| } |
| |
| ApplicationVM app = (ApplicationVM) vm; |
| if (!app.isDedicatedCacheServer()) { |
| return false; |
| } |
| } |
| |
| ManagedEntityConfig conf = entity.getEntityConfig(); |
| InetAddress managedHost = InetAddressUtil.toInetAddress(conf.getHost()); |
| File managedWorkingDir = new File(conf.getWorkingDirectory()); |
| File managedProdDir = new File(conf.getProductDirectory()); |
| |
| InetAddress vmHost = vm.getHost(); |
| File vmWorkingDir = vm.getWorkingDirectory(); |
| File vmProdDir = vm.getGeodeHomeDir(); |
| |
| if (vmHost.equals(managedHost) && isSameFile(vmWorkingDir, managedWorkingDir) |
| && isSameFile(vmProdDir, managedProdDir)) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Returns whether or not the names of the two files represent the same file. |
| */ |
| private boolean isSameFile(File file1, File file2) { |
| if (file1.equals(file2)) { |
| return true; |
| } |
| |
| if (file1.getAbsoluteFile().equals(file2.getAbsoluteFile())) { |
| return true; |
| } |
| |
| try { |
| if (file1.getCanonicalFile().equals(file2.getCanonicalFile())) { |
| return true; |
| } |
| } catch (IOException ex) { |
| // oh well... |
| logger.info("While getting canonical file", ex); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Finds and returns the <code>SystemMember</code> that corresponds to the given |
| * <code>GemFireVM</code> or <code>null</code> if no <code>SystemMember</code> corresponds. |
| */ |
| protected SystemMember findSystemMember(GemFireVM vm) { |
| return findSystemMember(vm, true); |
| } |
| |
| /** |
| * Finds and returns the <code>SystemMember</code> that corresponds to the given |
| * <code>GemFireVM</code> or <code>null</code> if no Finds and returns the |
| * <code>SystemMember</code> that corresponds to the given <code>GemFireVM</code> or |
| * <code>null</code> if no <code>SystemMember</code> corresponds. |
| * |
| * |
| * @param vm GemFireVM instance |
| * @param compareConfig Should the members' configurations be compared? <code>true</code> when the |
| * member has joined, <code>false</code> when the member has left Should the members' |
| * configurations be compared? <code>true</code> when the member has joined, |
| * <code>false</code> when the member has left. Additionally also used to check if system |
| * member config is to be synchronized with the VM. |
| */ |
| protected SystemMember findSystemMember(GemFireVM vm, boolean compareConfig) { |
| |
| SystemMemberImpl member = null; |
| |
| synchronized (this.cacheServerSet) { |
| SERVERS: for (Iterator iter = this.cacheServerSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| CacheServerImpl cacheServer = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| cacheServer = (CacheServerImpl) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue SERVERS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue SERVERS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| if (isSame(cacheServer, vm, compareConfig)) { |
| member = cacheServer; |
| break; |
| } |
| } |
| } |
| |
| if (member == null) { |
| synchronized (this.applicationSet) { |
| APPS: for (Iterator iter = this.applicationSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| SystemMemberImpl application = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| application = (SystemMemberImpl) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue APPS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| continue APPS; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| if (isSame(application, vm, compareConfig)) { |
| member = application; |
| break; |
| } |
| } // APPS |
| } |
| } |
| |
| if (member != null && compareConfig) { |
| try { |
| member.setGemFireVM(vm); |
| |
| } catch (AdminException ex) { |
| logger.warn("Could not set the GemFire VM.", ex); |
| } |
| } |
| |
| return member; |
| } |
| |
| /** |
| * Removes a SystemMember from this system's list of known members. |
| * |
| * @param systemMember the member to remove |
| * @return the system member that was removed; null if no match was found |
| */ |
| protected SystemMember removeSystemMember(SystemMember systemMember) { |
| return removeSystemMember(((SystemMemberImpl) systemMember).getInternalId()); |
| } |
| |
| /** |
| * Removes a SystemMember from this system's list of known members. This method is called in |
| * response to a member leaving the system. TODO: this method is a mess of defns |
| * |
| * @param internalId the unique id that specifies which member to remove |
| * @return the system member that was removed; null if no match was found |
| */ |
| protected SystemMember removeSystemMember(InternalDistributedMember internalId) { |
| if (internalId == null) |
| return null; |
| |
| boolean found = false; |
| SystemMemberImpl member = null; |
| |
| synchronized (this.cacheServerSet) { |
| SERVERS: for (Iterator iter = this.cacheServerSet.iterator(); iter.hasNext() && !found;) { |
| Future future = (Future) iter.next(); |
| if (future instanceof AdminFutureTask) { |
| AdminFutureTask task = (AdminFutureTask) future; |
| if (task.getMemberId().equals(internalId)) { |
| future.cancel(true); |
| |
| } else { |
| // This is not the member we are looking for... |
| continue SERVERS; |
| } |
| } |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| member = (SystemMemberImpl) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue SERVERS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; // Dead code |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| InternalDistributedMember cacheServerId = member.getInternalId(); |
| if (internalId.equals(cacheServerId)) { |
| // found a match... |
| iter.remove(); |
| found = true; |
| } |
| } // SERVERS |
| } |
| |
| synchronized (this.applicationSet) { |
| for (Iterator iter = this.applicationSet.iterator(); iter.hasNext() && !found;) { |
| Future future = (Future) iter.next(); |
| try { |
| if (future instanceof AdminFutureTask) { |
| AdminFutureTask task = (AdminFutureTask) future; |
| if (task.getMemberId().equals(internalId)) { |
| iter.remove(); // Only remove applications |
| found = true; |
| if (future.isDone()) { |
| member = (SystemMemberImpl) future.get(); |
| } |
| break; |
| } else { |
| // This is not the member we are looking for... |
| continue; |
| } |
| } |
| if (future.isDone()) { |
| member = (SystemMemberImpl) future.get(); |
| } else { |
| future.cancel(true); |
| } |
| |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| checkCancellation(); |
| throw new RuntimeException( |
| "Interrupted", ex); |
| |
| } catch (CancellationException ex) { |
| continue; |
| |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; // Dead code |
| } |
| |
| InternalDistributedMember applicationId = member.getInternalId(); |
| if (internalId.equals(applicationId)) { |
| // found a match... |
| iter.remove(); // Only remove applications |
| found = true; |
| } |
| } |
| } |
| |
| if (found) { |
| try { |
| if (member != null) { |
| member.setGemFireVM(null); |
| } |
| |
| } catch (AdminException ex) { |
| logger.fatal("Unexpected AdminException", ex); |
| } |
| return member; |
| |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Couldn't remove member {}", internalId); |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Builds the configuration needed to connect to a GfManagerAgent which is the main gateway into |
| * the internal.admin api. GfManagerAgent is used to actually connect to the distributed gemfire |
| * system. |
| * |
| * @param logWriter the LogWriterI18n to use for any logging |
| * @return the configuration needed to connect to a GfManagerAgent |
| */ |
| // LOG: saves LogWriterLogger from AdminDistributedSystemImpl for RemoteGfManagerAgentConfig |
| private GfManagerAgentConfig buildAgentConfig(InternalLogWriter logWriter) { |
| RemoteTransportConfig conf = new RemoteTransportConfig(isMcastEnabled(), getDisableTcp(), |
| getDisableAutoReconnect(), getBindAddress(), buildSSLConfig(), parseLocators(), |
| getMembershipPortRange(), getTcpPort(), ClusterDistributionManager.ADMIN_ONLY_DM_TYPE); |
| return new GfManagerAgentConfig(getSystemName(), conf, logWriter, this.alertLevel.getSeverity(), |
| this, this); |
| } |
| |
| protected SSLConfig buildSSLConfig() { |
| SSLConfig conf = new SSLConfig(); |
| if (getConfig() != null) { |
| conf.setEnabled(getConfig().isSSLEnabled()); |
| conf.setProtocols(getConfig().getSSLProtocols()); |
| conf.setCiphers(getConfig().getSSLCiphers()); |
| conf.setRequireAuth(getConfig().isSSLAuthenticationRequired()); |
| conf.setProperties(getConfig().getSSLProperties()); |
| } |
| return conf; |
| } |
| |
| /** |
| * Returns the currently configured address to bind to when administering this system. |
| */ |
| private String getBindAddress() { |
| return this.config.getBindAddress(); |
| } |
| |
| /** Returns whether or not the given member is running */ |
| private boolean isRunning(SystemMember member) { |
| if (member instanceof ManagedEntity) { |
| return ((ManagedEntity) member).isRunning(); |
| |
| } else { |
| // member must be an application VM. It is running |
| return true; |
| } |
| } |
| |
| /** Returns any member manager that is known to be running */ |
| private SystemMember findFirstRunningMember() { |
| synchronized (this.cacheServerSet) { |
| SERVERS: for (Iterator iter = this.cacheServerSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| SystemMember member = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| member = (SystemMember) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue SERVERS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; // Dead code |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| if (isRunning(member)) { |
| return member; |
| } |
| } |
| } |
| |
| synchronized (this.applicationSet) { |
| APPS: for (Iterator iter = this.applicationSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| SystemMember member = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| member = (SystemMember) future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue APPS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; // Dead code |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| |
| if (isRunning(member)) { |
| return member; |
| } |
| } // APPS |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Returns the instance of system member that is running either as a CacheVm or only ApplicationVm |
| * for the given string representation of the id. |
| * |
| * @param memberId string representation of the member identifier |
| * @return instance of system member which could be either as a CacheVm or Application VM |
| */ |
| protected SystemMember findCacheOrAppVmById(String memberId) { |
| SystemMember found = null; |
| |
| if (memberId != null) { |
| try { |
| boolean foundSender = false; |
| CacheVm[] cacheVms = getCacheVms(); |
| |
| /* |
| * cacheVms could be null. See AdminDistributedSystemImpl.getCacheVmsCollection() for |
| * ExecutionException |
| */ |
| if (cacheVms != null) { |
| for (CacheVm cacheVm : cacheVms) { |
| if (cacheVm.getId().equals(memberId) && cacheVm instanceof CacheVm) { |
| found = (SystemMember) cacheVm; |
| foundSender = true; |
| break; |
| } |
| } |
| } |
| |
| if (!foundSender) { |
| SystemMember[] appVms = getSystemMemberApplications(); |
| |
| for (SystemMember appVm : appVms) { |
| if (appVm.getId().equals(memberId) && appVm instanceof SystemMember) { |
| found = (SystemMember) appVm; |
| foundSender = true; |
| break; |
| } |
| } |
| |
| } |
| } catch (AdminException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Could not find System Member for member id: {}", memberId, e); |
| } |
| } |
| } |
| |
| return found; |
| } |
| |
| /** Returns true if any member application is known to be running */ |
| protected boolean isAnyMemberRunning() { |
| return findFirstRunningMember() != null; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Health methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Lazily initializes the GemFire health monitor |
| * |
| * @see #createGemFireHealth |
| */ |
| @Override |
| public GemFireHealth getGemFireHealth() { |
| synchronized (this) { |
| if (this.health == null || this.health.isClosed()) { |
| try { |
| this.health = createGemFireHealth(this.gfManagerAgent); |
| |
| } catch (AdminException ex) { |
| throw new RuntimeAdminException( |
| "An AdminException was thrown while getting the GemFire health.", |
| ex); |
| } |
| } |
| |
| return this.health; |
| } |
| } |
| |
| /** |
| * A "template factory" method for creating an instance of <code>GemFireHealth</code>. It can be |
| * overridden by subclasses to produce instances of different <code>GemFireHealth</code> |
| * implementations. |
| * |
| * @see #getGemFireHealth |
| */ |
| protected GemFireHealth createGemFireHealth(GfManagerAgent agent) throws AdminException { |
| |
| if (agent == null) { |
| throw new IllegalStateException( |
| "GfManagerAgent must not be null"); |
| } |
| return new GemFireHealthImpl(agent, this); |
| } |
| |
| @Override |
| public CacheVm addCacheVm() throws AdminException { |
| return (CacheVm) addCacheServer(); |
| } |
| |
| @Override |
| public CacheServer addCacheServer() throws AdminException { |
| CacheServerConfigImpl conf = new CacheServerConfigImpl(); |
| CacheServer server = createCacheServer(conf); |
| setDistributionParameters(server); |
| |
| synchronized (this.cacheServerSet) { |
| this.cacheServerSet.add(new FutureResult(server)); |
| } |
| |
| return server; |
| } |
| |
| private Collection getCacheVmsCollection() throws AdminException { |
| synchronized (this.cacheServerSet) { |
| Collection coll = new ArrayList(this.cacheServerSet.size()); |
| SERVERS: for (Iterator iter = this.cacheServerSet.iterator(); iter.hasNext();) { |
| Future future = (Future) iter.next(); |
| Object get = null; |
| for (;;) { |
| checkCancellation(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| get = future.get(); |
| break; // success |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| continue; // keep trying |
| } catch (CancellationException ex) { |
| continue SERVERS; |
| } catch (ExecutionException ex) { |
| handle(ex); |
| return null; // Dead code |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| coll.add(get); |
| } // SERVERS |
| return coll; |
| } |
| } |
| |
| /** |
| * Returns all the cache server members of the distributed system which are hosting a client queue |
| * for the particular durable-client having the given durableClientId |
| * |
| * @param durableClientId - durable-id of the client |
| * @return array of CacheServer(s) having the queue for the durable client |
| * |
| * @since GemFire 5.6 |
| */ |
| @Override |
| public CacheServer[] getCacheServers(String durableClientId) throws AdminException { |
| Collection serversForDurableClient = new ArrayList(); |
| CacheServer[] servers = getCacheServers(); |
| |
| for (int i = 0; i < servers.length; i++) { |
| RemoteApplicationVM vm = (RemoteApplicationVM) ((CacheServerImpl) servers[i]).getGemFireVM(); |
| if (vm != null && vm.hasDurableClient(durableClientId)) { |
| serversForDurableClient.add(servers[i]); |
| } |
| } |
| CacheServer[] array = new CacheServer[serversForDurableClient.size()]; |
| serversForDurableClient.toArray(array); |
| return array; |
| } |
| |
| @Override |
| public CacheVm[] getCacheVms() throws AdminException { |
| Collection coll = getCacheVmsCollection(); |
| if (coll == null) |
| return null; |
| CacheVm[] array = new CacheVm[coll.size()]; |
| coll.toArray(array); |
| return array; |
| } |
| |
| @Override |
| public CacheServer[] getCacheServers() throws AdminException { |
| Collection coll = getCacheVmsCollection(); |
| if (coll == null) |
| return null; |
| CacheServer[] array = new CacheServer[coll.size()]; |
| coll.toArray(array); |
| return array; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Overriden java.lang.Object methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Returns a string representation of the object. |
| * |
| * @return a string representation of the object |
| */ |
| @Override // GemStoneAddition |
| public String toString() { |
| return getName(); |
| } |
| |
| /** |
| * returns instance of AdminDistributedSystem that is current connected. See |
| * <code>thisAdminDS</code>. (for feature requests #32887) |
| * <p> |
| * TODO: remove this static method during reimplementation of |
| * {@link SystemMemberCacheEventProcessor} |
| * |
| */ |
| public static AdminDistributedSystemImpl getConnectedInstance() { |
| synchronized (CONNECTION_SYNC) { |
| return thisAdminDS; |
| } |
| } |
| |
| @Override |
| public void addCacheListener(SystemMemberCacheListener listener) { |
| synchronized (this.cacheListLock) { |
| // never modify cacheListeners in place. |
| // this allows iteration without concurrent mod worries |
| List oldListeners = this.cacheListeners; |
| if (!oldListeners.contains(listener)) { |
| List newListeners = new ArrayList(oldListeners); |
| newListeners.add(listener); |
| this.cacheListeners = newListeners; |
| } |
| } |
| } |
| |
| @Override |
| public void removeCacheListener(SystemMemberCacheListener listener) { |
| synchronized (this.cacheListLock) { |
| List oldListeners = this.cacheListeners; |
| if (oldListeners.contains(listener)) { |
| List newListeners = new ArrayList(oldListeners); |
| if (newListeners.remove(listener)) { |
| if (newListeners.isEmpty()) { |
| newListeners = Collections.EMPTY_LIST; |
| } |
| this.cacheListeners = newListeners; |
| } |
| } |
| } |
| } |
| |
| public List getCacheListeners() { |
| return this.cacheListeners; |
| } |
| |
| @Override |
| public SystemMember lookupSystemMember(DistributedMember distributedMember) |
| throws AdminException { |
| if (distributedMember == null) |
| return null; |
| SystemMember[] members = getSystemMemberApplications(); |
| for (int i = 0; i < members.length; i++) { |
| if (distributedMember.equals(members[i].getDistributedMember())) { |
| return members[i]; |
| } |
| } |
| return null; |
| } |
| |
| //////////////////////// Inner Classes //////////////////////// |
| |
| /** |
| * Object that converts an <code>internal.admin.Alert</code> into an external |
| * <code>admin.Alert</code>. |
| */ |
| public class AlertImpl implements Alert { |
| /** The Alert to which most behavior is delegated */ |
| private final org.apache.geode.internal.admin.Alert alert; |
| private SystemMember systemMember; |
| |
| /////////////////////// Constructors /////////////////////// |
| |
| /** |
| * Creates a new <code>Alert</code> that delegates to the given object. |
| */ |
| AlertImpl(org.apache.geode.internal.admin.Alert alert) { |
| this.alert = alert; |
| GemFireVM vm = alert.getGemFireVM(); |
| |
| /* |
| * Related to #39657. Avoid setting GemFireVM again in the system member. Eager initialization |
| * of member variable - systemMember. |
| */ |
| this.systemMember = vm == null ? null : findSystemMember(vm, false); |
| if (this.systemMember == null) { |
| /* |
| * try to use sender information to construct the SystemMember that can be used for disply |
| * purpose at least |
| */ |
| InternalDistributedMember sender = alert.getSender(); |
| if (sender != null) { |
| try { |
| this.systemMember = AdminDistributedSystemImpl.this.createSystemMember(sender); |
| } catch (AdminException e) { |
| /* |
| * AdminException might be thrown if creation of System Member instance fails. |
| */ |
| this.systemMember = null; |
| } |
| } // else this.systemMember will be null |
| } |
| } |
| |
| ////////////////////// Instance Methods ////////////////////// |
| |
| @Override |
| public AlertLevel getLevel() { |
| return AlertLevel.forSeverity(alert.getLevel()); |
| } |
| |
| /* |
| * Eager initialization of system member is done while creating this alert only. |
| */ |
| @Override |
| public SystemMember getSystemMember() { |
| return systemMember; |
| } |
| |
| @Override |
| public String getConnectionName() { |
| return alert.getConnectionName(); |
| } |
| |
| @Override |
| public String getSourceId() { |
| return alert.getSourceId(); |
| } |
| |
| @Override |
| public String getMessage() { |
| return alert.getMessage(); |
| } |
| |
| @Override |
| public java.util.Date getDate() { |
| return alert.getDate(); |
| } |
| |
| @Override |
| public String toString() { |
| return alert.toString(); |
| } |
| } |
| |
| /** |
| * A JSR-166 <code>FutureTask</code> whose {@link #get} method properly handles an |
| * <code>ExecutionException</code> that wraps an <code>InterruptedException</code>. This is |
| * necessary because there are places in the admin API that wrap |
| * <code>InterruptedException</code>s. See bug 32634. |
| * |
| * <P> |
| * |
| * This is by no means an ideal solution to this problem. It would be better to modify the code |
| * invoked by the <code>Callable</code> to explicitly throw <code>InterruptedException</code>. |
| */ |
| static class AdminFutureTask extends FutureTask { |
| |
| /** |
| * The id of the member whose admin object we are creating. Keeping track of this allows us to |
| * cancel a FutureTask for a member that has gone away. |
| */ |
| private final InternalDistributedMember memberId; |
| |
| public AdminFutureTask(InternalDistributedMember memberId, Callable callable) { |
| super(callable); |
| this.memberId = memberId; |
| } |
| |
| /** |
| * Returns the id of the member of the distributed system for which this <code>FutureTask</code> |
| * is doing work. |
| */ |
| public InternalDistributedMember getMemberId() { |
| return this.memberId; |
| } |
| |
| /** |
| * If the <code>ExecutionException</code> is caused by an <code>InterruptedException</code>, |
| * throw the <code>CancellationException</code> instead. |
| */ |
| @Override |
| public Object get() throws InterruptedException, ExecutionException { |
| |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| try { |
| return super.get(); |
| |
| } catch (ExecutionException ex) { |
| for (Throwable cause = ex.getCause(); cause != null; cause = cause.getCause()) { |
| if (cause instanceof InterruptedException) { |
| // We interrupted the runnable but we don't want the thread |
| // that called get() to think that the runnable was interrupted. |
| CancellationException ex2 = new CancellationException( |
| "by interrupt"); |
| ex2.setStackTrace(cause.getStackTrace()); |
| throw ex2; |
| } |
| } |
| |
| throw ex; |
| } |
| |
| } |
| |
| } |
| |
| public DistributedMember getDistributedMember() { |
| return getDistributionManager().getId(); |
| } |
| |
| private void connectAdminDS() { |
| connect((InternalLogWriter) this.logWriter); |
| try { |
| thisAdminDS.waitToBeConnected(3000); |
| } catch (InterruptedException ie) { |
| logger.warn("Interrupted while waiting to connect", ie); |
| } |
| } |
| |
| @Override |
| public Set<PersistentID> getMissingPersistentMembers() throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| return getMissingPersistentMembers(dm); |
| } |
| |
| public static Set<PersistentID> getMissingPersistentMembers(DistributionManager dm) { |
| return MissingPersistentIDsRequest.send(dm); |
| } |
| |
| @Override |
| public void revokePersistentMember(InetAddress host, String directory) throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| revokePersistentMember(dm, host, directory); |
| |
| } |
| |
| @Override |
| public void revokePersistentMember(UUID diskStoreID) throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| revokePersistentMember(dm, diskStoreID); |
| |
| } |
| |
| public static void revokePersistentMember(DistributionManager dm, UUID diskStoreID) { |
| PersistentMemberPattern pattern = new PersistentMemberPattern(diskStoreID); |
| boolean success = false; |
| try { |
| // make sure that the disk store we're revoking is actually missing |
| boolean found = false; |
| Set<PersistentID> details = getMissingPersistentMembers(dm); |
| if (details != null) { |
| for (PersistentID id : details) { |
| if (id.getUUID().equals(diskStoreID)) { |
| found = true; |
| break; |
| } |
| } |
| } |
| if (!found) { |
| return; |
| } |
| |
| // Fix for 42607 - verify that the persistent id is not already |
| // running before revoking it. |
| PrepareRevokePersistentIDRequest.send(dm, pattern); |
| success = true; |
| } finally { |
| if (success) { |
| // revoke the persistent member if were able to prepare the revoke |
| RevokePersistentIDRequest.send(dm, pattern); |
| } else { |
| // otherwise, cancel the revoke. |
| PrepareRevokePersistentIDRequest.cancel(dm, pattern); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @deprecated use {@link #revokePersistentMember(UUID)} instead |
| */ |
| public static void revokePersistentMember(DistributionManager dm, InetAddress host, |
| String directory) { |
| |
| PersistentMemberPattern pattern = |
| new PersistentMemberPattern(host, directory, System.currentTimeMillis()); |
| boolean success = false; |
| try { |
| // Fix for 42607 - verify that the persistent id is not already |
| // running before revoking it. |
| PrepareRevokePersistentIDRequest.send(dm, pattern); |
| success = true; |
| } finally { |
| if (success) { |
| // revoke the persistent member if were able to prepare the revoke |
| RevokePersistentIDRequest.send(dm, pattern); |
| } else { |
| // otherwise, cancel the revoke. |
| PrepareRevokePersistentIDRequest.cancel(dm, pattern); |
| } |
| } |
| } |
| |
| @Override |
| public Set shutDownAllMembers() throws AdminException { |
| return shutDownAllMembers(0); |
| } |
| |
| @Override |
| public Set shutDownAllMembers(long timeout) throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| return shutDownAllMembers(dm, timeout); |
| } |
| |
| /** |
| * Shutdown all members. |
| * |
| * @param timeout the amount of time (in ms) to spending trying to shutdown the members |
| * gracefully. After this time period, the members will be forceable shut down. If the |
| * timeout is exceeded, persistent recovery after the shutdown may need to do a GII. -1 |
| * indicates that the shutdown should wait forever. |
| */ |
| public static Set shutDownAllMembers(DistributionManager dm, long timeout) { |
| return ShutdownAllRequest.send(dm, timeout); |
| } |
| |
| @Override |
| public BackupStatus backupAllMembers(File targetDir) throws AdminException { |
| return backupAllMembers(targetDir, null); |
| } |
| |
| @Override |
| public BackupStatus backupAllMembers(File targetDir, File baselineDir) throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| return backupAllMembers(dm, targetDir, baselineDir); |
| } |
| |
| public static BackupStatus backupAllMembers(DistributionManager dm, File targetDir, |
| File baselineDir) throws AdminException { |
| String baselineDirectory = baselineDir == null ? null : baselineDir.toString(); |
| return new BackupOperation(dm, dm.getCache()).backupAllMembers(targetDir.toString(), |
| baselineDirectory); |
| } |
| |
| @Override |
| public Map<DistributedMember, Set<PersistentID>> compactAllDiskStores() throws AdminException { |
| connectAdminDS(); |
| DistributionManager dm = getDistributionManager(); |
| if (dm == null) { |
| throw new IllegalStateException( |
| "connect() has not been invoked on this AdminDistributedSystem."); |
| } |
| return compactAllDiskStores(dm); |
| } |
| |
| public static Map<DistributedMember, Set<PersistentID>> compactAllDiskStores( |
| DistributionManager dm) throws AdminException { |
| return CompactRequest.send(dm); |
| } |
| |
| /** |
| * This method can be used to process ClientMembership events sent for BridgeMembership by bridge |
| * servers to all admin members. |
| * |
| * NOTE: Not implemented currently. JMX implementation which is a subclass of this class i.e. |
| * AdminDistributedSystemJmxImpl implements it. |
| * |
| * @param senderId id of the member that sent the ClientMembership changes for processing (could |
| * be null) |
| * @param clientId id of a client for which the notification was sent |
| * @param clientHost host on which the client is/was running |
| * @param eventType denotes whether the client Joined/Left/Crashed should be one of |
| * ClientMembershipMessage#JOINED, ClientMembershipMessage#LEFT, |
| * ClientMembershipMessage#CRASHED |
| */ |
| public void processClientMembership(String senderId, String clientId, String clientHost, |
| int eventType) {} |
| |
| @Override |
| public void setAlertLevelAsString(String level) { |
| AlertLevel newAlertLevel = AlertLevel.forName(level); |
| |
| if (newAlertLevel != null) { |
| setAlertLevel(newAlertLevel); |
| } else { |
| System.out.println("ERROR:: " + level |
| + " is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF"); |
| throw new IllegalArgumentException(String.format("%s", |
| level + " is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF")); |
| } |
| } |
| |
| @Override |
| public String getAlertLevelAsString() { |
| return getAlertLevel().getName(); |
| } |
| } |