blob: 3136cd19131a118c623b401e7ad7060d8e202d9d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.admin.internal;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.admin.AdminException;
import com.gemstone.gemfire.admin.Alert;
import com.gemstone.gemfire.admin.AlertLevel;
import com.gemstone.gemfire.admin.AlertListener;
import com.gemstone.gemfire.admin.BackupStatus;
import com.gemstone.gemfire.admin.CacheServer;
import com.gemstone.gemfire.admin.CacheServerConfig;
import com.gemstone.gemfire.admin.CacheVm;
import com.gemstone.gemfire.admin.ConfigurationParameter;
import com.gemstone.gemfire.admin.DistributedSystemConfig;
import com.gemstone.gemfire.admin.DistributionLocator;
import com.gemstone.gemfire.admin.DistributionLocatorConfig;
import com.gemstone.gemfire.admin.GemFireHealth;
import com.gemstone.gemfire.admin.ManagedEntity;
import com.gemstone.gemfire.admin.ManagedEntityConfig;
import com.gemstone.gemfire.admin.OperationCancelledException;
import com.gemstone.gemfire.admin.RuntimeAdminException;
import com.gemstone.gemfire.admin.SystemMember;
import com.gemstone.gemfire.admin.SystemMemberCacheListener;
import com.gemstone.gemfire.admin.SystemMembershipEvent;
import com.gemstone.gemfire.admin.SystemMembershipListener;
import com.gemstone.gemfire.cache.persistence.PersistentID;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.FutureCancelledException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.Banner;
import com.gemstone.gemfire.internal.admin.ApplicationVM;
import com.gemstone.gemfire.internal.admin.GemFireVM;
import com.gemstone.gemfire.internal.admin.GfManagerAgent;
import com.gemstone.gemfire.internal.admin.GfManagerAgentConfig;
import com.gemstone.gemfire.internal.admin.GfManagerAgentFactory;
import com.gemstone.gemfire.internal.admin.SSLConfig;
import com.gemstone.gemfire.internal.admin.remote.CompactRequest;
import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
import com.gemstone.gemfire.internal.admin.remote.MissingPersistentIDsRequest;
import com.gemstone.gemfire.internal.admin.remote.PrepareRevokePersistentIDRequest;
import com.gemstone.gemfire.internal.admin.remote.RemoteApplicationVM;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import com.gemstone.gemfire.internal.admin.remote.RevokePersistentIDRequest;
import com.gemstone.gemfire.internal.admin.remote.ShutdownAllRequest;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberPattern;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LogWriterFactory;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppender;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppenders;
import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
/**
* Represents a GemFire distributed system for remote administration/management.
*
* @author Kirk Lund
* @since 3.5
*/
public class AdminDistributedSystemImpl
implements com.gemstone.gemfire.admin.AdminDistributedSystem,
com.gemstone.gemfire.internal.admin.JoinLeaveListener,
com.gemstone.gemfire.internal.admin.AlertListener,
com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener {
private static final Logger logger = LogService.getLogger();
/** String identity of this distributed system */
private String id;
/** Latest alert broadcast by any system members */
private Alert latestAlert;
// -------------------------------------------------------------------------
/** Internal admin agent to delegate low-level work to */
private volatile GfManagerAgent gfManagerAgent;
/** Monitors the health of this distributed system */
private GemFireHealth health;
/** Set of non-Manager members in this system */
private final Set applicationSet = new HashSet();
/** Set of DistributionLocators for this system */
private final Set locatorSet = new HashSet();
/** Set of dedicated CacheServer members in this system */
private final Set cacheServerSet = new HashSet();
/** Configuration defining this distributed system */
private final DistributedSystemConfigImpl config;
/** Controller for starting and stopping managed entities */
private ManagedEntityController controller;
/** Log file collator for gathering and merging system member logs */
private LogCollator logCollator = new LogCollator();
/** The level above which alerts will be delivered to the alert
* listeners */
private AlertLevel alertLevel = AlertLevel.WARNING;
/** The alert listeners registered on this distributed system. */
private volatile Set<AlertListener> alertListeners = Collections.emptySet();
private final Object alertLock = new Object();
private LogWriterAppender logWriterAppender;
private InternalLogWriter logWriter;
/** The membership listeners registered on this distributed system */
private volatile Set membershipListeners = Collections.EMPTY_SET;
private final Object membershipLock = new Object();
/* The region listeners registered on this distributed system */
//for feature requests #32887
private volatile List cacheListeners = Collections.EMPTY_LIST;
private final Object cacheListLock = new Object();
/**
* reference to AdminDistributedSystemImpl instance
* for feature requests #32887.
* <p>
* Guarded by {@link #CONNECTION_SYNC}.
* <p>
* TODO: reimplement this change and SystemMemberCacheEventProcessor to avoid
* using this static. SystemMemberCacheEvents should only be sent to Admin
* VMs that express interest.
* <p>
* This is volatile to allow SystemFailure to deliver fatal poison-pill
* to thisAdminDS without waiting on synchronization.
*
* @guarded.By CONNECTION_SYNC
*/
private static volatile AdminDistributedSystemImpl thisAdminDS;
/**
* Provides synchronization for {@link #connect()} and {@link #disconnect()}.
* {@link #thisAdminDS} is also now protected by CONNECTION_SYNC and has its
* lifecycle properly tied to connect/disconnect.
*/
private static final Object CONNECTION_SYNC = new Object();
// -------------------------------------------------------------------------
// Constructor(s)
// -------------------------------------------------------------------------
/**
* Constructs new DistributedSystemImpl with the given configuration.
*
* @param config configuration defining this distributed system
*/
public AdminDistributedSystemImpl(DistributedSystemConfigImpl config) {
// init from config...
this.config = config;
String systemId = this.config.getSystemId();
if (systemId != null && systemId.length() > 0) {
this.id = systemId;
} if (this.getLocators() != null && this.getLocators().length() > 0) {
this.id = this.getLocators();
} else {
this.id = new StringBuffer(this.getMcastAddress()).append("[").append(
this.getMcastPort()).append("]").toString();
}
// LOG: create LogWriterAppender unless one already exists
this.logWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.MAIN, false, this.config.createLogConfig(), false);
// LOG: look in DistributedSystemConfigImpl for existing LogWriter to use
InternalLogWriter existingLogWriter = this.config.getInternalLogWriter();
if (existingLogWriter != null) {
this.logWriter = existingLogWriter;
} else {
// LOG: create LogWriterLogger
this.logWriter = LogWriterFactory.createLogWriterLogger(false, false, this.config.createLogConfig(), false);
// LOG: changed statement from config to info
this.logWriter.info(Banner.getString(null));
// Set this log writer in DistributedSystemConfigImpl
this.config.setInternalLogWriter(this.logWriter);
}
// set up other details that depend on config attrs...
this.controller = ManagedEntityControllerFactory.createManagedEntityController(this);
initializeDistributionLocators();
initializeCacheServers();
}
// -------------------------------------------------------------------------
// Initialization
// -------------------------------------------------------------------------
/**
* Creates DistributionLocator instances for every locator entry in the
* {@link com.gemstone.gemfire.admin.DistributedSystemConfig}
*/
private void initializeDistributionLocators() {
DistributionLocatorConfig[] configs =
this.config.getDistributionLocatorConfigs();
if (configs.length == 0) {
// No work to do
return;
}
for (int i = 0; i < configs.length; i++) {
// the Locator impl may vary in this class from the config...
DistributionLocatorConfig conf = configs[i];
DistributionLocator locator =
createDistributionLocatorImpl(conf);
this.locatorSet.add(new FutureResult(locator));
}
// update locators string...
setLocators(parseLocatorSet());
}
/**
* Creates <code>CacheServer</code> instances for every cache server
* entry in the {@link
* com.gemstone.gemfire.admin.DistributedSystemConfig}
*/
private void initializeCacheServers() {
CacheServerConfig[] cacheServerConfigs =
this.config.getCacheServerConfigs();
for (int i = 0; i < cacheServerConfigs.length; i++) {
try {
CacheServerConfig conf = cacheServerConfigs[i];
CacheServerConfigImpl copy =
new CacheServerConfigImpl(conf);
this.cacheServerSet.add(new FutureResult(createCacheServer(copy)));
} catch (java.lang.Exception e) {
logger.warn(e.getMessage(), e);
continue;
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (java.lang.Error e) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.error(e.getMessage(), e);
continue;
}
}
}
/**
* Checks to make sure that {@link #connect()} has been called.
*
* @throws IllegalStateException
* If {@link #connect()} has not been called.
*/
private void checkConnectCalled() {
if (this.gfManagerAgent == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
}
// -------------------------------------------------------------------------
// Attributes of this DistributedSystem
// -------------------------------------------------------------------------
public GfManagerAgent getGfManagerAgent() {
return this.gfManagerAgent;
}
public boolean isConnected() {
return this.gfManagerAgent != null && this.gfManagerAgent.isConnected();
}
public String getId() {
return this.id;
}
public String getName() {
String name = this.config.getSystemName();
if (name != null && name.length() > 0) {
return name;
} else {
return getId();
}
}
public String getSystemName() {
return this.config.getSystemName();
}
public String getRemoteCommand() {
return this.config.getRemoteCommand();
}
public void setRemoteCommand(String remoteCommand) {
this.config.setRemoteCommand(remoteCommand);
}
public void setAlertLevel(AlertLevel level) {
if (this.isConnected()) {
this.gfManagerAgent.setAlertLevel(level.getSeverity());
}
this.alertLevel = level;
}
public AlertLevel getAlertLevel() {
return this.alertLevel;
}
public void addAlertListener(AlertListener listener) {
synchronized (this.alertLock) {
Set<AlertListener> oldListeners = this.alertListeners;
if (!oldListeners.contains(listener)) {
Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners);
newListeners.add(listener);
this.alertListeners = newListeners;
}
}
}
public int getAlertListenerCount() {
synchronized (this.alertLock) {
return this.alertListeners.size();
}
}
public void removeAlertListener(AlertListener listener) {
synchronized (this.alertLock) {
Set<AlertListener> oldListeners = this.alertListeners;
if (oldListeners.contains(listener)) { // fixed bug 34687
Set<AlertListener> newListeners = new HashSet<AlertListener>(oldListeners);
if (newListeners.remove(listener)) {
this.alertListeners = newListeners;
}
}
}
}
public void addMembershipListener(SystemMembershipListener listener) {
synchronized (this.membershipLock) {
Set oldListeners = this.membershipListeners;
if (!oldListeners.contains(listener)) {
Set newListeners = new HashSet(oldListeners);
newListeners.add(listener);
this.membershipListeners = newListeners;
}
}
}
public void removeMembershipListener(SystemMembershipListener listener) {
synchronized (this.membershipLock) {
Set oldListeners = this.membershipListeners;
if (oldListeners.contains(listener)) { // fixed bug 34687
Set newListeners = new HashSet(oldListeners);
if (newListeners.remove(listener)) {
this.membershipListeners = newListeners;
}
}
}
}
public String getMcastAddress() {
return this.config.getMcastAddress();
}
public int getMcastPort() {
return this.config.getMcastPort();
}
public boolean getDisableTcp() {
return this.config.getDisableTcp();
}
public boolean getDisableAutoReconnect() {
return this.config.getDisableAutoReconnect();
}
public String getLocators() {
return this.config.getLocators();
}
protected void setLocators(String locators) {
this.config.setLocators(locators);
}
public String getMembershipPortRange() {
return this.getConfig().getMembershipPortRange();
}
/** get the direct-channel port to use, or zero if not set */
public int getTcpPort() {
return this.getConfig().getTcpPort();
}
public void setTcpPort(int port) {
this.getConfig().setTcpPort(port);
}
public void setMembershipPortRange(String membershipPortRange) {
this.getConfig().setMembershipPortRange(membershipPortRange);
}
public DistributedSystemConfig getConfig() {
return this.config;
}
/**
* Returns true if any members of this system are currently running.
*/
public boolean isRunning() {
if (this.gfManagerAgent == null) return false;
// is there a better way??
// this.gfManagerAgent.isConnected() ... this.gfManagerAgent.isListening()
if (isAnyMemberRunning()) return true;
return false;
}
/** Returns true if this system is using multicast instead of locators */
public boolean isMcastDiscovery() {
return this.isMcastEnabled() && (this.getLocators().length() == 0);
}
/** Returns true if this system can use multicast for communications */
public boolean isMcastEnabled() {
return this.getMcastPort() > 0 ;
}
ManagedEntityController getEntityController() {
return this.controller;
}
static private final String TIMEOUT_MS_NAME
= "AdminDistributedSystemImpl.TIMEOUT_MS";
static private final int TIMEOUT_MS_DEFAULT = 60000; // 30000 -- see bug36470
static private final int TIMEOUT_MS
= Integer.getInteger(TIMEOUT_MS_NAME, TIMEOUT_MS_DEFAULT).intValue();
// -------------------------------------------------------------------------
// Operations of this DistributedSystem
// -------------------------------------------------------------------------
/**
* Starts all managed entities in this system.
*/
public void start() throws AdminException {
// Wait for each managed entity to start (see bug 32569)
DistributionLocator[] locs = getDistributionLocators();
for (int i = 0; i < locs.length; i++) {
locs[i].start();
}
for (int i = 0; i < locs.length; i++) {
try {
if (!locs[i].waitToStart(TIMEOUT_MS)) {
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_START_AFTER_1_MS.toLocalizedString(new Object[] {locs[i], Integer.valueOf(TIMEOUT_MS)}));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_START.toLocalizedString(locs[i]), ex);
}
}
CacheServer[] servers = getCacheServers();
for (int i = 0; i < servers.length; i++) {
servers[i].start();
}
for (int i = 0; i < servers.length; i++) {
try {
if (!servers[i].waitToStart(TIMEOUT_MS)) {
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_START_AFTER_1_MS.toLocalizedString(new Object[] {servers[i], Integer.valueOf(TIMEOUT_MS)}));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_START.toLocalizedString(servers[i]), ex);
}
}
}
/**
* Stops all GemFire managers that are members of this system.
*/
public void stop() throws AdminException {
// Stop cache server before GemFire managers because the cache
// server might host a cache proxy that is dependent on the
// manager. See bug 32569.
// Wait for each managed entity to stop (see bug 32569)
long timeout = 30;
CacheServer[] servers = getCacheServers();
for (int i = 0; i < servers.length; i++) {
servers[i].stop();
}
for (int i = 0; i < servers.length; i++) {
try {
if (!servers[i].waitToStop(timeout * 1000)) {
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_STOP_AFTER_1_SECONDS.toLocalizedString(new Object[] {servers[i], Long.valueOf(timeout)}));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_STOP.toLocalizedString(servers[i]), ex);
}
}
DistributionLocator[] locs = getDistributionLocators();
for (int i = 0; i < locs.length; i++) {
locs[i].stop();
}
for (int i = 0; i < locs.length; i++) {
try {
if (!locs[i].waitToStop(timeout * 1000)) {
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_0_DID_NOT_STOP_AFTER_1_SECONDS.toLocalizedString(new Object[] {locs[i], Long.valueOf(timeout)}));
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new AdminException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED_WHILE_WAITING_FOR_0_TO_STOP.toLocalizedString(locs[i]), ex);
}
}
}
/** Display merged system member logs */
public String displayMergedLogs() {
return this.logCollator.collateLogs(this.gfManagerAgent);
}
/**
* Returns the license for this GemFire product; else null if unable to
* retrieve license information
*
* @return license for this GemFire product
*/
public java.util.Properties getLicense() {
SystemMember member = findFirstRunningMember();
if (member != null) {
return new Properties();
} else {
return null;
}
}
/**
* Sets the distribution-related portion of the given managed entity's
* configuration so that the entity is part of this distributed system.
*
* @throws AdminException
* TODO-javadocs
*/
private void setDistributionParameters(SystemMember member)
throws AdminException {
Assert.assertTrue(member instanceof ManagedSystemMemberImpl);
// set some config parms to match this system...
ConfigurationParameter[] configParms = new ConfigurationParameter[] {
new ConfigurationParameterImpl(
DistributionConfig.MCAST_PORT_NAME,
Integer.valueOf(this.config.getMcastPort())),
new ConfigurationParameterImpl(
DistributionConfig.LOCATORS_NAME,
this.config.getLocators()),
new ConfigurationParameterImpl(
DistributionConfig.MCAST_ADDRESS_NAME,
InetAddressUtil.toInetAddress(this.config.getMcastAddress())),
new ConfigurationParameterImpl(
DistributionConfig.DISABLE_TCP_NAME,
Boolean.valueOf(this.config.getDisableTcp()) ),
};
member.setConfiguration(configParms);
}
/**
* Handles an <code>ExecutionException</code> by examining its cause
* and throwing an appropriate runtime exception.
*/
private static void handle(ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause instanceof OperationCancelledException) {
// Operation was cancelled, we don't necessary want to propagate
// this up to the user.
return;
}
if (cause instanceof CancelException) { // bug 37285
throw new FutureCancelledException(LocalizedStrings.AdminDistributedSystemImpl_FUTURE_CANCELLED_DUE_TO_SHUTDOWN.toLocalizedString(), ex);
}
// Don't just throw the cause because the stack trace can be
// misleading. For instance, the cause might have occurred in a
// different thread. In addition to the cause, we also want to
// know which code was waiting for the Future.
throw new RuntimeAdminException(LocalizedStrings.AdminDistributedSystemImpl_WHILE_WAITING_FOR_FUTURE.toLocalizedString(), ex);
}
protected void checkCancellation() {
DM dm = this.getDistributionManager();
// TODO does dm == null mean we're dead?
if (dm != null) {
dm.getCancelCriterion().checkCancelInProgress(null);
}
}
/**
* Returns a list of manageable SystemMember instances for each
* member of this distributed system.
*
* @return array of system members for each non-manager member
*/
public SystemMember[] getSystemMemberApplications()
throws com.gemstone.gemfire.admin.AdminException {
synchronized(this.applicationSet) {
Collection coll = new ArrayList(this.applicationSet.size());
APPS: for (Iterator iter = this.applicationSet.iterator();
iter.hasNext(); ) {
Future future = (Future) iter.next();
// this.logger.info("DEBUG: getSystemMemberApplications: " + future);
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
coll.add(future.get());
break;
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
// this.logger.info("DEBUG: cancelled: " + future, ex);
continue APPS;
}
catch (ExecutionException ex) {
// this.logger.info("DEBUG: executed: " + future);
handle(ex);
continue APPS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
} // APPS
SystemMember[] array = new SystemMember[coll.size()];
coll.toArray(array);
return array;
}
}
/**
* Display in readable format the latest Alert in this distributed system.
*
* TODO: create an external admin api object for Alert
*/
public String getLatestAlert() {
if (this.latestAlert == null) {
return "";
}
return this.latestAlert.toString();
}
/**
* Connects to the currently configured system.
*/
public void connect() {
connect(this.logWriter);
}
/**
* Connects to the currently configured system. This method is
* public for internal use only (testing, for example).
*
* <p>
*
* See {@link
* com.gemstone.gemfire.distributed.DistributedSystem#connect} for a
* list of exceptions that may be thrown.
*
* @param logWriter the InternalLogWriter to use for any logging
*/
public void connect(InternalLogWriter logWriter) {
synchronized (CONNECTION_SYNC) {
//Check if the gfManagerAgent is NOT null.
//If it is already listening, then just return since the connection is already established OR in process.
//Otherwise cleanup the state of AdminDistributedSystemImpl. This needs to happen automatically.
if(this.gfManagerAgent != null) {
if(this.gfManagerAgent.isListening()) {
if (logger.isDebugEnabled()) {
logger.debug("The RemoteGfManagerAgent is already listening for this AdminDistributedSystem.");
}
return;
}
this.disconnect();
}
if (thisAdminDS != null) { // TODO: beef up toString and add thisAdminDS
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_ONLY_ONE_ADMINDISTRIBUTEDSYSTEM_CONNECTION_CAN_BE_MADE_AT_ONCE.toLocalizedString());
}
thisAdminDS = this; //added for feature requests #32887
if (this.getLocators().length() == 0) {
this.id =
this.getMcastAddress() + "[" + this.getMcastPort() + "]";
} else {
this.id = this.getLocators();
}
if (this.config instanceof DistributedSystemConfigImpl) {
((DistributedSystemConfigImpl) this.config).validate();
((DistributedSystemConfigImpl) this.config).setDistributedSystem(this);
}
// LOG: passes the AdminDistributedSystemImpl LogWriterLogger into GfManagerAgentConfig for RemoteGfManagerAgent
GfManagerAgent agent =
GfManagerAgentFactory.getManagerAgent(buildAgentConfig(logWriter));
this.gfManagerAgent = agent;
// sync to prevent bug 33341 Admin API can double-represent system members
synchronized(this.membershipListenerLock) {
// build the list of applications...
ApplicationVM[] apps = this.gfManagerAgent.listApplications();
for (int i = 0; i < apps.length; i++) {
try {
nodeJoined(null, apps[i]);
} catch (RuntimeAdminException e) {
this.logWriter.warning("encountered a problem processing member " + apps[i]);
}
}
}
// Build admin objects for all locators (see bug 31959)
String locators = this.getLocators();
StringTokenizer st = new StringTokenizer(locators, ",");
NEXT:
while(st.hasMoreTokens()) {
String locator = st.nextToken();
int first = locator.indexOf("[");
int last = locator.indexOf("]");
String host = locator.substring(0, first);
int colidx = host.lastIndexOf('@');
if (colidx < 0) {
colidx = host.lastIndexOf(':');
}
String bindAddr = null;
if (colidx > 0 && colidx < (host.length()-1)) {
String orig = host;
bindAddr = host.substring(colidx+1, host.length());
host = host.substring(0, colidx);
// if the host contains a colon and there's no '@', we probably
// parsed an ipv6 address incorrectly - try again
if (host.indexOf(':') >= 0) {
int bindidx = orig.lastIndexOf('@');
if (bindidx >= 0) {
host = orig.substring(0, bindidx);
bindAddr = orig.substring(bindidx+1);
}
else {
host = orig;
bindAddr = null;
}
}
}
int port = Integer.parseInt(locator.substring(first+1, last));
synchronized (this.locatorSet) {
LOCATORS:
for (Iterator iter = this.locatorSet.iterator();
iter.hasNext(); ) {
Future future = (Future) iter.next();
DistributionLocatorImpl impl = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
impl = (DistributionLocatorImpl) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue LOCATORS;
}
catch (ExecutionException ex) {
handle(ex);
continue LOCATORS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
DistributionLocatorConfig conf = impl.getConfig();
InetAddress host1 = InetAddressUtil.toInetAddress(host);
InetAddress host2 =
InetAddressUtil.toInetAddress(conf.getHost());
if (port == conf.getPort() && host1.equals(host2)) {
// Already have an admin object for this locator
continue NEXT;
}
}
}
// None of the existing locators matches the locator in the
// string. Contact the locator to get information and create
// an admin object for it.
InetAddress bindAddress = null;
if (bindAddr != null) {
bindAddress = InetAddressUtil.toInetAddress(bindAddr);
}
DistributionLocatorConfig conf =
DistributionLocatorConfigImpl.createConfigFor(host, port,
bindAddress);
if (conf != null) {
DistributionLocator impl =
createDistributionLocatorImpl(conf);
synchronized (this.locatorSet) {
this.locatorSet.add(new FutureResult(impl));
}
}
}
}
}
/**
* Polls to determine whether or not the connection to the
* distributed system has been made.
*/
public boolean waitToBeConnected(long timeout)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
checkConnectCalled();
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
if (this.gfManagerAgent.isInitialized()) {
return true;
} else {
Thread.sleep(100);
}
}
return this.isConnected();
}
/**
* Closes all connections and resources to the connected distributed system.
*
* @see com.gemstone.gemfire.distributed.DistributedSystem#disconnect()
*/
public void disconnect() {
synchronized (CONNECTION_SYNC) {
// if (!isConnected()) {
// throw new IllegalStateException(this + " is not connected");
// }
// Assert.assertTrue(thisAdminDS == this);
if (this.logWriterAppender != null) {
LogWriterAppenders.stop(LogWriterAppenders.Identifier.MAIN);
}
try {
if (thisAdminDS == this) {
thisAdminDS = null;
}
if (this.gfManagerAgent != null && this.gfManagerAgent.isListening()){
synchronized (this) {
if (this.health != null) {
this.health.close();
}
}
this.gfManagerAgent.removeJoinLeaveListener(this);
this.gfManagerAgent.disconnect();
}
this.gfManagerAgent = null;
if (this.config instanceof DistributedSystemConfigImpl) {
((DistributedSystemConfigImpl) this.config).setDistributedSystem(null);
}
} finally {
if (logWriterAppender != null) {
LogWriterAppenders.destroy(LogWriterAppenders.Identifier.MAIN);
}
}
}
}
/**
* Returns the DistributionManager this implementation is using to
* connect to the distributed system.
*/
public DM getDistributionManager() {
if (this.gfManagerAgent == null) {
return null;
}
return this.gfManagerAgent.getDM();
}
/**
* Returns the internal admin API's agent used for administering
* this <code>AdminDistributedSystem</code>.
*
* @since 4.0
*/
public GfManagerAgent getAdminAgent() {
return this.gfManagerAgent;
}
/**
* Adds a new, unstarted <code>DistributionLocator</code> to this
* distributed system.
*/
public DistributionLocator addDistributionLocator() {
DistributionLocatorConfig conf =
new DistributionLocatorConfigImpl();
DistributionLocator locator =
createDistributionLocatorImpl(conf);
synchronized (this.locatorSet) {
this.locatorSet.add(new FutureResult(locator));
}
// update locators string...
setLocators(parseLocatorSet());
return locator;
}
public DistributionLocator[] getDistributionLocators() {
synchronized(this.locatorSet) {
Collection coll = new ArrayList(this.locatorSet.size());
LOCATORS: for (Iterator iter = this.locatorSet.iterator();
iter.hasNext();) {
Future future = (Future) iter.next();
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
coll.add(future.get());
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue LOCATORS;
}
catch (ExecutionException ex) {
handle(ex);
continue LOCATORS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
}
DistributionLocator[] array =
new DistributionLocator[coll.size()];
coll.toArray(array);
return array;
}
}
/**
* Updates the locator string that is used to discover members of
* the distributed system.
*
* @see #getLocators
*/
void updateLocatorsString() {
this.setLocators(parseLocatorSet());
}
protected String parseLocatorSet() {
StringBuffer sb = new StringBuffer();
LOCATORS: for (Iterator iter = this.locatorSet.iterator(); iter.hasNext();) {
Future future = (Future) iter.next();
DistributionLocator locator = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
locator = (DistributionLocator) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue LOCATORS;
}
catch (ExecutionException ex) {
handle(ex);
continue LOCATORS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
sb.append(locator.getConfig().getHost());
sb.append("[").append(locator.getConfig().getPort()).append("]");
if (iter.hasNext()) {
sb.append(",");
}
}
return sb.toString();
}
// -------------------------------------------------------------------------
// Listener callback methods
// -------------------------------------------------------------------------
/** sync to prevent bug 33341 Admin API can double-represent system members */
private final Object membershipListenerLock = new Object();
// --------- com.gemstone.gemfire.internal.admin.JoinLeaveListener ---------
/**
* Listener callback for when a member has joined this DistributedSystem.
* <p>
* React by adding the SystemMember to this system's
* internal lists, if they are not already there. Notice that we
* add a {@link Future} into the list so that the admin object is
* not initialized while locks are held.
*
* @param source the distributed system that fired nodeJoined
* @param vm the VM that joined
* @see com.gemstone.gemfire.internal.admin.JoinLeaveListener#nodeJoined
*/
public void nodeJoined(GfManagerAgent source, final GemFireVM vm) {
// sync to prevent bug 33341 Admin API can double-represent system members
synchronized(this.membershipListenerLock) {
// this.logger.info("DEBUG: nodeJoined: " + vm.getId(), new RuntimeException("STACK"));
// does it already exist?
SystemMember member = findSystemMember(vm);
// if not then create it...
if (member == null) {
// this.logger.info("DEBUG: no existing member: " + vm.getId());
FutureTask future = null;
//try {
if (vm instanceof ApplicationVM) {
final ApplicationVM app = (ApplicationVM) vm;
if (app.isDedicatedCacheServer()) {
synchronized (this.cacheServerSet) {
future = new AdminFutureTask(vm.getId(), new Callable() {
public Object call() throws Exception {
logger.info(LogMarker.DM, LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_ADDING_NEW_CACHESERVER_FOR__0, vm));
return createCacheServer(app);
}
});
this.cacheServerSet.add(future);
}
} else {
synchronized (this.applicationSet) {
future = new AdminFutureTask(vm.getId(), new Callable() {
public Object call() throws Exception {
logger.info(LogMarker.DM, LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_ADDING_NEW_APPLICATION_FOR__0, vm));
return createSystemMember(app);
}
});
this.applicationSet.add(future);
}
}
} else {
Assert.assertTrue(false, "Unknown GemFireVM type: " +
vm.getClass().getName());
}
// } catch (AdminException ex) {
// String s = "Could not create a SystemMember for " + vm;
// this.logger.warning(s, ex);
// }
// Wait for the SystemMember to be created. We want to do this
// outside of the "set" locks.
future.run();
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
member = (SystemMember) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
// this.logger.info("DEBUG: run cancelled: " + future, ex);
return;
}
catch (ExecutionException ex) {
// this.logger.info("DEBUG: run executed: " + future, ex);
handle(ex);
return;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
Assert.assertTrue(member != null);
// moved this up into the if that creates a new member to fix bug 34517
SystemMembershipEvent event = new SystemMembershipEventImpl(member.getDistributedMember());
for (Iterator iter = this.membershipListeners.iterator();
iter.hasNext(); ) {
SystemMembershipListener listener =
(SystemMembershipListener) iter.next();
listener.memberJoined(event);
}
// } else {
// this.logger.info("DEBUG: found existing member: " + member);
}
}
}
/**
* Listener callback for when a member of this DistributedSystem has left.
* <p>
* Reacts by removing the member.
*
* @param source the distributed system that fired nodeCrashed
* @param vm the VM that left
* @see com.gemstone.gemfire.internal.admin.JoinLeaveListener#nodeLeft
*/
public void nodeLeft(GfManagerAgent source, GemFireVM vm) {
// sync to prevent bug 33341 Admin API can double-represent system members
synchronized(this.membershipListenerLock) {
// member has left...
SystemMember member =
AdminDistributedSystemImpl.this.removeSystemMember(vm.getId());
if (member == null) {
return; // reinstated this early-out because removal does not fix 39429
}
// Can't call member.getId() because it is nulled-out when the
// SystemMember is removed.
SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId());
for (Iterator iter = this.membershipListeners.iterator();
iter.hasNext(); ) {
SystemMembershipListener listener =
(SystemMembershipListener) iter.next();
listener.memberLeft(event);
}
}
}
/**
* Listener callback for when a member of this DistributedSystem has crashed.
* <p>
* Reacts by removing the member.
*
* @param source the distributed system that fired nodeCrashed
* @param vm the VM that crashed
* @see com.gemstone.gemfire.internal.admin.JoinLeaveListener#nodeCrashed
*/
public void nodeCrashed(GfManagerAgent source, GemFireVM vm) {
// sync to prevent bug 33341 Admin API can double-represent system members
synchronized(this.membershipListenerLock) {
// member has crashed...
SystemMember member =
AdminDistributedSystemImpl.this.removeSystemMember(vm.getId());
if (member == null) {
// Unknown member crashed. Hmm...
return;
}
// Can't call member.getId() because it is nulled-out when the
// SystemMember is removed.
SystemMembershipEvent event = new SystemMembershipEventImpl(vm.getId());
for (Iterator iter = this.membershipListeners.iterator();
iter.hasNext(); ) {
SystemMembershipListener listener =
(SystemMembershipListener) iter.next();
listener.memberCrashed(event);
}
}
}
// ----------- com.gemstone.gemfire.internal.admin.AlertListener -----------
/**
* Listener callback for when a SystemMember of this DistributedSystem has
* crashed.
*
* @param alert the latest alert from the system
* @see com.gemstone.gemfire.internal.admin.AlertListener#alert
*/
public void alert(com.gemstone.gemfire.internal.admin.Alert alert) {
if (AlertLevel.forSeverity(alert.getLevel()).ordinal < alertLevel.ordinal) {
return;
}
Alert alert2 = new AlertImpl(alert);
this.latestAlert = alert2;
for (Iterator<AlertListener> iter = this.alertListeners.iterator();
iter.hasNext(); ) {
AlertListener listener = iter.next();
listener.alert(alert2);
}
}
public void onDisconnect(InternalDistributedSystem sys) {
logger.debug("Calling AdminDistributedSystemImpl#onDisconnect");
disconnect();
logger.debug("Completed AdminDistributedSystemImpl#onDisconnect");
}
// -------------------------------------------------------------------------
// Template methods overriden from superclass...
// -------------------------------------------------------------------------
protected CacheServer createCacheServer(ApplicationVM member)
throws AdminException {
return new CacheServerImpl(this, member);
}
protected CacheServer createCacheServer(CacheServerConfigImpl conf)
throws AdminException {
return new CacheServerImpl(this, conf);
}
/** Override createSystemMember by instantiating SystemMemberImpl
*
* @throws AdminException TODO-javadocs
*/
protected SystemMember createSystemMember(ApplicationVM app)
throws com.gemstone.gemfire.admin.AdminException {
return new SystemMemberImpl(this, app);
}
/**
* Constructs & returns a SystemMember instance using the corresponding
* InternalDistributedMember object.
*
* @param member
* InternalDistributedMember instance for which a SystemMember
* instance is to be constructed.
* @return constructed SystemMember instance
* @throws com.gemstone.gemfire.admin.AdminException
* if construction of SystemMember instance fails
* @since 6.5
*/
protected SystemMember createSystemMember(InternalDistributedMember member)
throws com.gemstone.gemfire.admin.AdminException {
return new SystemMemberImpl(this, member);
}
/**
* Template-method for creating a new
* <code>DistributionLocatorImpl</code> instance.
*/
protected DistributionLocatorImpl
createDistributionLocatorImpl(DistributionLocatorConfig conf) {
return new DistributionLocatorImpl(conf, this);
}
// -------------------------------------------------------------------------
// Non-public implementation methods... TODO: narrow access levels
// -------------------------------------------------------------------------
// TODO: public void connect(...) could stand to have some internals factored out
/**
* Returns List of Locators including Locators or Multicast.
*
* @return list of locators or multicast values
*/
protected List parseLocators() {
// assumes host[port] format, delimited by ","
List locatorIds = new ArrayList();
if (isMcastEnabled()) {
String mcastId = new StringBuffer(
this.getMcastAddress()).append("[").append(
this.getMcastPort()).append("]").toString();
locatorIds.add(new DistributionLocatorId(mcastId));
}
if (!isMcastDiscovery()) {
StringTokenizer st = new StringTokenizer(this.getLocators(), ",");
while (st.hasMoreTokens()) {
locatorIds.add(new DistributionLocatorId(st.nextToken()));
}
}
if (logger.isDebugEnabled()) {
StringBuffer sb = new StringBuffer("Locator set is: ");
for (Iterator iter = locatorIds.iterator(); iter.hasNext(); ) {
sb.append(iter.next());
sb.append(" ");
}
logger.debug(sb);
}
return locatorIds;
}
/**
* Returns whether or not a <code>SystemMember</code> corresponds
* to a <code>GemFireVM</code>.
*
* @param examineConfig
* Should we take the configuration of the member into
* consideration? In general, we want to consider the
* configuration when a member starts up. But when we are
* notified that it has shut down, we do not want to examine
* the configuration because that might involve contacting
* the member. Which, of course, cannot be done because it
* has shut down.
*/
private boolean isSame(SystemMemberImpl member, GemFireVM vm,
boolean examineConfig) {
if (vm.equals(member.getGemFireVM())) {
return true;
}
InternalDistributedMember memberId = member.getInternalId();
InternalDistributedMember vmId = vm.getId();
if (vmId.equals(memberId)) {
return true;
}
if ((member instanceof ManagedSystemMemberImpl) &&
examineConfig) {
// We can't compare information about managers because the
// member might have already gone away. Attempts to send it
// messages (to get its product directory, for instance) will
// time out.
ManagedSystemMemberImpl entity =
(ManagedSystemMemberImpl) member;
// Make sure that the type of the managed entity matches the
// type of the internal admin object.
if (entity instanceof CacheServer) {
if (!(vm instanceof ApplicationVM)) {
return false;
}
ApplicationVM app = (ApplicationVM) vm;
if (!app.isDedicatedCacheServer()) {
return false;
}
}
ManagedEntityConfig conf = entity.getEntityConfig();
InetAddress managedHost =
InetAddressUtil.toInetAddress(conf.getHost());
File managedWorkingDir = new File(conf.getWorkingDirectory());
File managedProdDir = new File(conf.getProductDirectory());
InetAddress vmHost = vm.getHost();
File vmWorkingDir = vm.getWorkingDirectory();
File vmProdDir = vm.getGemFireDir();
if (vmHost.equals(managedHost) &&
isSameFile(vmWorkingDir, managedWorkingDir) &&
isSameFile(vmProdDir, managedProdDir)) {
return true;
}
}
return false;
}
/**
* Returns whether or not the names of the two files represent the
* same file.
*/
private boolean isSameFile(File file1, File file2) {
if (file1.equals(file2)) {
return true;
}
if (file1.getAbsoluteFile().equals(file2.getAbsoluteFile())) {
return true;
}
try {
if (file1.getCanonicalFile().equals(file2.getCanonicalFile())) {
return true;
}
// StringBuffer sb = new StringBuffer();
// sb.append("File 1: ");
// sb.append(file1);
// sb.append("\nFile 2: ");
// sb.append(file2);
// sb.append("\n Absolute 1: ");
// sb.append(file1.getAbsoluteFile());
// sb.append("\n Absolute 2: ");
// sb.append(file2.getAbsoluteFile());
// sb.append("\n Canonical 1: ");
// sb.append(file1.getCanonicalFile());
// sb.append("\n Canonical 2: ");
// sb.append(file2.getCanonicalFile());
// logger.info(sb.toString());
} catch (IOException ex) {
// oh well...
logger.info(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystemImpl_WHILE_GETTING_CANONICAL_FILE), ex);
}
return false;
}
/**
* Finds and returns the <code>SystemMember</code> that corresponds
* to the given <code>GemFireVM</code> or <code>null</code> if no
* <code>SystemMember</code> corresponds.
*/
protected SystemMember findSystemMember(GemFireVM vm) {
return findSystemMember(vm, true);
}
/**
* Finds and returns the <code>SystemMember</code> that corresponds to the
* given <code>GemFireVM</code> or <code>null</code> if no Finds and returns
* the <code>SystemMember</code> that corresponds to the given
* <code>GemFireVM</code> or <code>null</code> if no <code>SystemMember</code>
* corresponds.
*
*
* @param vm
* GemFireVM instance
* @param compareConfig
* Should the members' configurations be compared? <code>true</code>
* when the member has joined, <code>false</code> when the member has
* left Should the members' configurations be compared?
* <code>true</code> when the member has joined, <code>false</code>
* when the member has left. Additionally also used to check if system
* member config is to be synchronized with the VM.
*
* @throws AdminException
* if member configuration can not be synchronized with VM
*/
protected SystemMember findSystemMember(GemFireVM vm,
boolean compareConfig) {
SystemMemberImpl member = null;
synchronized (this.cacheServerSet) {
SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
iter.hasNext(); ) {
Future future = (Future) iter.next();
CacheServerImpl cacheServer = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
cacheServer = (CacheServerImpl) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue SERVERS;
}
catch (ExecutionException ex) {
handle(ex);
continue SERVERS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
if (isSame(cacheServer, vm, compareConfig)) {
member = cacheServer;
break;
}
}
}
if (member == null) {
synchronized (this.applicationSet) {
APPS: for (Iterator iter = this.applicationSet.iterator();
iter.hasNext(); ) {
Future future = (Future) iter.next();
SystemMemberImpl application = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
application = (SystemMemberImpl) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue APPS;
}
catch (ExecutionException ex) {
handle(ex);
continue APPS;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
if (isSame(application, vm, compareConfig)) {
member = application;
break;
}
} // APPS
}
}
if (member != null && compareConfig) {
try {
member.setGemFireVM(vm);
} catch (AdminException ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystem_COULD_NOT_SET_THE_GEMFIRE_VM), ex);
}
}
return member;
}
/**
* Removes a SystemMember from this system's list of known members.
*
* @param systemMember the member to remove
* @return the system member that was removed; null if no match was found
*/
protected SystemMember removeSystemMember(SystemMember systemMember) {
return removeSystemMember(
((SystemMemberImpl) systemMember).getInternalId());
}
/**
* Removes a SystemMember from this system's list of known members. This
* method is called in response to a member leaving the system.
* TODO: this method is a mess of defns
*
* @param internalId the unique id that specifies which member to remove
* @return the system member that was removed; null if no match was found
*/
protected SystemMember removeSystemMember(InternalDistributedMember internalId) {
if (internalId == null) return null;
// this.logger.info("DEBUG: removeSystemMember: " + internalId, new RuntimeException("STACK"));
boolean found = false;
SystemMemberImpl member = null;
synchronized(this.cacheServerSet) {
SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
iter.hasNext() && !found; ) {
Future future = (Future) iter.next();
if (future instanceof AdminFutureTask) {
AdminFutureTask task = (AdminFutureTask) future;
if (task.getMemberId().equals(internalId)) {
// this.logger.info("DEBUG: removeSystemMember cs cancelling: " + future);
future.cancel(true);
} else {
// This is not the member we are looking for...
continue SERVERS;
}
}
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
member = (SystemMemberImpl) future.get();
break; // success
} catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
} catch (CancellationException ex) {
continue SERVERS;
} catch (ExecutionException ex) {
handle(ex);
return null; // Dead code
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
InternalDistributedMember cacheServerId = member.getInternalId();
if (internalId.equals(cacheServerId)) {
// found a match...
iter.remove();
found = true;
}
} // SERVERS
}
synchronized(this.applicationSet) {
for (Iterator iter = this.applicationSet.iterator();
iter.hasNext() && !found; ) {
Future future = (Future) iter.next();
try {
if (future instanceof AdminFutureTask) {
AdminFutureTask task = (AdminFutureTask) future;
if (task.getMemberId().equals(internalId)) {
iter.remove(); // Only remove applications
found = true;
if (future.isDone()) {
member = (SystemMemberImpl) future.get();
}
break;
} else {
// This is not the member we are looking for...
continue;
}
}
if (future.isDone()) {
member = (SystemMemberImpl) future.get();
} else {
// this.logger.info("DEBUG: removeSystemMember as cancelling: " + future);
future.cancel(true);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
checkCancellation();
throw new RuntimeException(LocalizedStrings.AdminDistributedSystemImpl_INTERRUPTED.toLocalizedString(), ex);
} catch (CancellationException ex) {
continue;
} catch (ExecutionException ex) {
handle(ex);
return null; // Dead code
}
InternalDistributedMember applicationId = member.getInternalId();
if (internalId.equals(applicationId)) {
// found a match...
iter.remove(); // Only remove applications
found = true;
}
}
}
if (found) {
try {
if (member != null) {
member.setGemFireVM(null);
}
} catch (AdminException ex) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.AdminDistributedSystem_UNEXPECTED_ADMINEXCEPTION), ex);
}
return member;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't remove member {}", internalId);
}
return null;
}
}
/**
* Builds the configuration needed to connect to a GfManagerAgent which is the
* main gateway into the internal.admin api. GfManagerAgent is used to
* actually connect to the distributed gemfire system.
*
* @param logWriter the LogWriterI18n to use for any logging
* @return the configuration needed to connect to a GfManagerAgent
*/
// LOG: saves LogWriterLogger from AdminDistributedSystemImpl for RemoteGfManagerAgentConfig
private GfManagerAgentConfig buildAgentConfig(InternalLogWriter logWriter) {
RemoteTransportConfig conf = new RemoteTransportConfig(
isMcastEnabled(), isMcastDiscovery(), getDisableTcp(),
getDisableAutoReconnect(),
getBindAddress(), buildSSLConfig(), parseLocators(),
getMembershipPortRange(), getTcpPort());
return new GfManagerAgentConfig(
getSystemName(), conf, logWriter, this.alertLevel.getSeverity(), this, this);
}
protected SSLConfig buildSSLConfig() {
SSLConfig conf = new SSLConfig();
if (getConfig() != null) {
conf.setEnabled(getConfig().isSSLEnabled());
conf.setProtocols(getConfig().getSSLProtocols());
conf.setCiphers(getConfig().getSSLCiphers());
conf.setRequireAuth(getConfig().isSSLAuthenticationRequired());
conf.setProperties(getConfig().getSSLProperties());
}
return conf;
}
/**
* Returns the currently configured address to bind to when administering
* this system.
*/
private String getBindAddress() {
return this.config.getBindAddress();
// String bindAddress =
// System.getProperty("gemfire.jg-bind-address");
// if (bindAddress == null || bindAddress.length() == 0) {
// return DistributionConfig.DEFAULT_BIND_ADDRESS;
// }
// return bindAddress;
}
/** Returns whether or not the given member is running */
private boolean isRunning(SystemMember member) {
if (member instanceof ManagedEntity) {
return ((ManagedEntity) member).isRunning();
} else {
// member must be an application VM. It is running
return true;
}
}
/** Returns any member manager that is known to be running */
private SystemMember findFirstRunningMember() {
synchronized(this.cacheServerSet) {
SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
iter.hasNext();){
Future future = (Future) iter.next();
SystemMember member = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
member = (SystemMember) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue SERVERS;
}
catch (ExecutionException ex) {
handle(ex);
return null; // Dead code
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
if (isRunning(member)) {
return member;
}
}
}
synchronized(this.applicationSet) {
APPS: for (Iterator iter = this.applicationSet.iterator();
iter.hasNext();) {
Future future = (Future) iter.next();
SystemMember member = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
member = (SystemMember) future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue APPS;
}
catch (ExecutionException ex) {
handle(ex);
return null; // Dead code
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
if (isRunning(member)) {
return member;
}
} // APPS
}
return null;
}
/**
* Returns the instance of system member that is running either as a CacheVm
* or only ApplicationVm for the given string representation of the id.
*
* @param memberId
* string representation of the member identifier
* @return instance of system member which could be either as a CacheVm or
* Application VM
*/
protected SystemMember findCacheOrAppVmById(String memberId) {
SystemMember found = null;
if (memberId != null) {
try {
boolean foundSender = false;
CacheVm[] cacheVms = getCacheVms();
/* cacheVms could be null. See
* AdminDistributedSystemImpl.getCacheVmsCollection() for
* ExecutionException */
if (cacheVms != null) {
for (CacheVm cacheVm : cacheVms) {
if (cacheVm.getId().equals(memberId) &&
cacheVm instanceof CacheVm) {
found = (SystemMember) cacheVm;
foundSender = true;
break;
}
}
}
if (!foundSender) {
SystemMember[] appVms = getSystemMemberApplications();
for (SystemMember appVm : appVms) {
if (appVm.getId().equals(memberId) &&
appVm instanceof SystemMember) {
found = (SystemMember) appVm;
foundSender = true;
break;
}
}
}
} catch (AdminException e) {
if (logger.isDebugEnabled()) {
logger.debug("Could not find System Member for member id: {}", memberId, e);
}
}
}
return found;
}
/** Returns true if any member application is known to be running */
protected boolean isAnyMemberRunning() {
return findFirstRunningMember() != null;
}
// -------------------------------------------------------------------------
// Health methods
// -------------------------------------------------------------------------
/**
* Lazily initializes the GemFire health monitor
*
* @see #createGemFireHealth
*/
public final GemFireHealth getGemFireHealth() {
synchronized (this) {
if (this.health == null || this.health.isClosed()) {
try {
this.health = createGemFireHealth(this.gfManagerAgent);
} catch (AdminException ex) {
throw new RuntimeAdminException(LocalizedStrings.AdminDistributedSystemImpl_AN_ADMINEXCEPTION_WAS_THROWN_WHILE_GETTING_THE_GEMFIRE_HEALTH.toLocalizedString(), ex);
}
}
return this.health;
}
}
/**
* A "template factory" method for creating an instance of
* <code>GemFireHealth</code>. It can be overridden by subclasses
* to produce instances of different <code>GemFireHealth</code>
* implementations.
*
* @see #getGemFireHealth
*/
protected GemFireHealth createGemFireHealth(GfManagerAgent agent)
throws AdminException {
if (agent == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_GFMANAGERAGENT_MUST_NOT_BE_NULL.toLocalizedString());
}
return new GemFireHealthImpl(agent, this);
}
public CacheVm addCacheVm() throws AdminException {
return (CacheVm)addCacheServer();
}
public CacheServer addCacheServer() throws AdminException {
CacheServerConfigImpl conf = new CacheServerConfigImpl();
CacheServer server = createCacheServer(conf);
setDistributionParameters(server);
synchronized (this.cacheServerSet) {
this.cacheServerSet.add(new FutureResult(server));
}
return server;
}
private Collection getCacheVmsCollection() throws AdminException {
synchronized(this.cacheServerSet) {
Collection coll = new ArrayList(this.cacheServerSet.size());
SERVERS: for (Iterator iter = this.cacheServerSet.iterator();
iter.hasNext(); ) {
Future future = (Future) iter.next();
Object get = null;
for (;;) {
checkCancellation();
boolean interrupted = Thread.interrupted();
try {
get = future.get();
break; // success
}
catch (InterruptedException ex) {
interrupted = true;
continue; // keep trying
}
catch (CancellationException ex) {
continue SERVERS;
}
catch (ExecutionException ex) {
handle(ex);
return null; // Dead code
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
coll.add(get);
} // SERVERS
return coll;
}
}
/**
* Returns all the cache server members of the distributed system which are
* hosting a client queue for the particular durable-client having the given
* durableClientId
*
* @param durableClientId -
* durable-id of the client
* @return array of CacheServer(s) having the queue for the durable client
* @throws AdminException
*
* @since 5.6
*/
public CacheServer[] getCacheServers(String durableClientId)
throws AdminException
{
Collection serversForDurableClient = new ArrayList();
CacheServer[] servers = getCacheServers();
for (int i = 0; i < servers.length; i++) {
RemoteApplicationVM vm = (RemoteApplicationVM)((CacheServerImpl)servers[i])
.getGemFireVM();
if (vm != null && vm.hasDurableClient(durableClientId)) {
serversForDurableClient.add(servers[i]);
}
}
CacheServer[] array = new CacheServer[serversForDurableClient.size()];
serversForDurableClient.toArray(array);
return array;
}
public CacheVm[] getCacheVms() throws AdminException {
Collection coll = getCacheVmsCollection();
if (coll == null) return null;
CacheVm[] array = new CacheVm[coll.size()];
coll.toArray(array);
return array;
}
public CacheServer[] getCacheServers() throws AdminException {
Collection coll = getCacheVmsCollection();
if (coll == null) return null;
CacheServer[] array = new CacheServer[coll.size()];
coll.toArray(array);
return array;
}
// -------------------------------------------------------------------------
// Overriden java.lang.Object methods
// -------------------------------------------------------------------------
/**
* Returns a string representation of the object.
*
* @return a string representation of the object
*/
@Override // GemStoneAddition
public String toString() {
return getName();
}
/**
* returns instance of AdminDistributedSystem that is current connected. See
* <code>thisAdminDS</code>. (for feature requests #32887)
* <p>
* TODO: remove this static method during reimplementation of
* {@link SystemMemberCacheEventProcessor}
*
* @return AdminDistributedSystem
*/
public static AdminDistributedSystemImpl getConnectedInstance() {
synchronized (CONNECTION_SYNC) {
return thisAdminDS;
}
}
public void addCacheListener(SystemMemberCacheListener listener) {
synchronized (this.cacheListLock) {
// never modify cacheListeners in place.
// this allows iteration without concurrent mod worries
List oldListeners = this.cacheListeners;
if (!oldListeners.contains(listener)) {
List newListeners = new ArrayList(oldListeners);
newListeners.add(listener);
this.cacheListeners = newListeners;
}
}
}
public void removeCacheListener(SystemMemberCacheListener listener) {
synchronized (this.cacheListLock) {
List oldListeners = this.cacheListeners;
if (oldListeners.contains(listener)) {
List newListeners = new ArrayList(oldListeners);
if (newListeners.remove(listener)) {
if (newListeners.isEmpty()) {
newListeners = Collections.EMPTY_LIST;
}
this.cacheListeners = newListeners;
}
}
}
}
public List getCacheListeners() {
return this.cacheListeners;
}
public SystemMember lookupSystemMember(DistributedMember distributedMember)
throws AdminException {
if (distributedMember == null) return null;
SystemMember[] members = getSystemMemberApplications();
for (int i = 0; i < members.length; i++) {
if (distributedMember.equals(members[i].getDistributedMember())) {
return members[i];
}
}
return null;
}
//////////////////////// Inner Classes ////////////////////////
/**
* Object that converts an <code>internal.admin.Alert</code> into an
* external <code>admin.Alert</code>.
*/
public class AlertImpl implements Alert {
/** The Alert to which most behavior is delegated */
private final com.gemstone.gemfire.internal.admin.Alert alert;
private SystemMember systemMember;
/////////////////////// Constructors ///////////////////////
/**
* Creates a new <code>Alert</code> that delegates to the given
* object.
*/
AlertImpl(com.gemstone.gemfire.internal.admin.Alert alert) {
this.alert = alert;
GemFireVM vm = alert.getGemFireVM();
/*
* Related to #39657.
* Avoid setting GemFireVM again in the system member.
* Eager initialization of member variable - systemMember.
*/
this.systemMember = vm == null ? null : findSystemMember(vm, false);
if (this.systemMember == null) {
/*
* try to use sender information to construct the SystemMember that can
* be used for disply purpose at least
*/
InternalDistributedMember sender = alert.getSender();
if (sender != null) {
try {
this.systemMember =
AdminDistributedSystemImpl.this.createSystemMember(sender);
} catch (AdminException e) {
/*
* AdminException might be thrown if creation of System Member
* instance fails.
*/
this.systemMember = null;
}
} //else this.systemMember will be null
}
}
////////////////////// Instance Methods //////////////////////
public AlertLevel getLevel() {
return AlertLevel.forSeverity(alert.getLevel());
}
/*
* Eager initialization of system member is done while creating this alert
* only.
*/
public SystemMember getSystemMember() {
return systemMember;
}
public String getConnectionName() {
return alert.getConnectionName();
}
public String getSourceId() {
return alert.getSourceId();
}
public String getMessage() {
return alert.getMessage();
}
public java.util.Date getDate() {
return alert.getDate();
}
@Override
public String toString() {
return alert.toString();
}
}
/**
* A JSR-166 <code>FutureTask</code> whose {@link #get} method
* properly handles an <code>ExecutionException</code> that wraps an
* <code>InterruptedException</code>. This is necessary because
* there are places in the admin API that wrap
* <code>InterruptedException</code>s. See bug 32634.
*
* <P>
*
* This is by no means an ideal solution to this problem. It would
* be better to modify the code invoked by the <code>Callable</code>
* to explicitly throw <code>InterruptedException</code>.
*/
static class AdminFutureTask extends FutureTask {
/** The id of the member whose admin object we are creating.
* Keeping track of this allows us to cancel a FutureTask for a
* member that has gone away. */
private final InternalDistributedMember memberId;
public AdminFutureTask(InternalDistributedMember memberId,
Callable callable) {
super(callable);
this.memberId = memberId;
}
/**
* Returns the id of the member of the distributed system for
* which this <code>FutureTask</code> is doing work.
*/
public InternalDistributedMember getMemberId() {
return this.memberId;
}
/**
* If the <code>ExecutionException</code> is caused by an
* <code>InterruptedException</code>, throw the
* <code>CancellationException</code> instead.
*/
@Override
public Object get()
throws InterruptedException, ExecutionException {
if (Thread.interrupted()) throw new InterruptedException();
try {
return super.get();
} catch (ExecutionException ex) {
for (Throwable cause = ex.getCause(); cause != null;
cause = cause.getCause()) {
if (cause instanceof InterruptedException) {
// We interrupted the runnable but we don't want the thread
// that called get to think he was interrupted.
CancellationException ex2 = new CancellationException(LocalizedStrings.AdminDistributedSystemImpl_BY_INTERRUPT.toLocalizedString());
ex2.setStackTrace(cause.getStackTrace());
throw ex2;
}
}
throw ex;
}
}
}
public DistributedMember getDistributedMember() {
return getDistributionManager().getId();
}
private void connectAdminDS() {
connect((InternalLogWriter)this.logWriter);
try {
thisAdminDS.waitToBeConnected(3000);
} catch (InterruptedException ie) {
logger.warn("Interrupted while waiting to connect", ie);
}
}
public Set<PersistentID> getMissingPersistentMembers()
throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
return getMissingPersistentMembers(dm);
}
public static Set<PersistentID> getMissingPersistentMembers(DM dm) {
return MissingPersistentIDsRequest.send(dm);
}
public void revokePersistentMember(InetAddress host,
String directory) throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
revokePersistentMember(dm, host, directory);
}
public void revokePersistentMember(UUID diskStoreID) throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
revokePersistentMember(dm, diskStoreID);
}
public static void revokePersistentMember(DM dm, UUID diskStoreID) {
PersistentMemberPattern pattern = new PersistentMemberPattern(diskStoreID);
boolean success = false;
try {
// make sure that the disk store we're revoking is actually missing
boolean found = false;
Set<PersistentID> details = getMissingPersistentMembers(dm);
if (details != null) {
for (PersistentID id : details) {
if (id.getUUID().equals(diskStoreID)) {
found = true;
break;
}
}
}
if (!found) {
return;
}
//Fix for 42607 - verify that the persistent id is not already
//running before revoking it.
PrepareRevokePersistentIDRequest.send(dm, pattern);
success = true;
} finally {
if(success) {
//revoke the persistent member if were able to prepare the revoke
RevokePersistentIDRequest.send(dm, pattern);
} else {
//otherwise, cancel the revoke.
PrepareRevokePersistentIDRequest.cancel(dm, pattern);
}
}
}
/**
*
* @deprecated use {@link #revokePersistentMember(UUID)} instead
*/
public static void revokePersistentMember(DM dm, InetAddress host, String directory) {
PersistentMemberPattern pattern = new PersistentMemberPattern(host, directory, System.currentTimeMillis());
boolean success = false;
try {
//Fix for 42607 - verify that the persistent id is not already
//running before revoking it.
PrepareRevokePersistentIDRequest.send(dm, pattern);
success = true;
} finally {
if(success) {
//revoke the persistent member if were able to prepare the revoke
RevokePersistentIDRequest.send(dm, pattern);
} else {
//otherwise, cancel the revoke.
PrepareRevokePersistentIDRequest.cancel(dm, pattern);
}
}
}
public Set shutDownAllMembers() throws AdminException {
return shutDownAllMembers(0);
}
public Set shutDownAllMembers(long timeout) throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
return shutDownAllMembers(dm, timeout);
}
/**
* Shutdown all members.
* @param dm
* @param timeout the amount of time (in ms) to spending trying to shutdown the members
* gracefully. After this time period, the members will be forceable shut down. If the
* timeout is exceeded, persistent recovery after the shutdown may need to do a GII. -1
* indicates that the shutdown should wait forever.
*/
public static Set shutDownAllMembers(DM dm, long timeout) {
return ShutdownAllRequest.send(dm, timeout);
}
public BackupStatus backupAllMembers(File targetDir) throws AdminException {
return backupAllMembers(targetDir, null);
}
public BackupStatus backupAllMembers(File targetDir, File baselineDir) throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
return backupAllMembers(dm, targetDir, baselineDir);
}
public static BackupStatus backupAllMembers(DM dm, File targetDir, File baselineDir)
throws AdminException {
Set<PersistentID> missingMembers = getMissingPersistentMembers(dm);
Set recipients = dm.getOtherDistributionManagerIds();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
targetDir = new File(targetDir, format.format(new Date()));
FlushToDiskRequest.send(dm, recipients);
Map<DistributedMember, Set<PersistentID>> existingDataStores
= PrepareBackupRequest.send(dm, recipients);
Map<DistributedMember, Set<PersistentID>> successfulMembers
= FinishBackupRequest.send(dm, recipients, targetDir, baselineDir);
// It's possible that when calling getMissingPersistentMembers, some members are
// still creating/recovering regions, and at FinishBackupRequest.send, the
// regions at the members are ready. Logically, since the members in successfulMembers
// should override the previous missingMembers
for(Set<PersistentID> onlineMembersIds : successfulMembers.values()) {
missingMembers.removeAll(onlineMembersIds);
}
existingDataStores.keySet().removeAll(successfulMembers.keySet());
for(Set<PersistentID> lostMembersIds : existingDataStores.values()) {
missingMembers.addAll(lostMembersIds);
}
return new BackupStatusImpl(successfulMembers, missingMembers);
}
public Map<DistributedMember, Set<PersistentID>> compactAllDiskStores() throws AdminException {
connectAdminDS();
DM dm = getDistributionManager();
if(dm == null) {
throw new IllegalStateException(LocalizedStrings.AdminDistributedSystemImpl_CONNECT_HAS_NOT_BEEN_INVOKED_ON_THIS_ADMINDISTRIBUTEDSYSTEM.toLocalizedString());
}
return compactAllDiskStores(dm);
}
public static Map<DistributedMember, Set<PersistentID>> compactAllDiskStores(DM dm)
throws AdminException {
return CompactRequest.send(dm);
}
/**
* This method can be used to process ClientMembership events sent for
* BridgeMembership by bridge servers to all admin members.
*
* NOTE: Not implemented currently. JMX implementation which is a subclass of
* this class i.e. AdminDistributedSystemJmxImpl implements it.
*
* @param senderId
* id of the member that sent the ClientMembership changes for
* processing (could be null)
* @param clientId
* id of a client for which the notification was sent
* @param clientHost
* host on which the client is/was running
* @param eventType
* denotes whether the client Joined/Left/Crashed should be one of
* ClientMembershipMessage#JOINED, ClientMembershipMessage#LEFT,
* ClientMembershipMessage#CRASHED
*/
public void processClientMembership(String senderId, String clientId,
String clientHost, int eventType) {
}
public void setAlertLevelAsString(String level) {
AlertLevel newAlertLevel = AlertLevel.forName(level);
if (newAlertLevel != null) {
setAlertLevel(newAlertLevel);
} else {
System.out.println("ERROR:: "+level+" is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF");
throw new IllegalArgumentException(LocalizedStrings.DEBUG.toLocalizedString(level+" is invalid. Allowed alert levels are: WARNING, ERROR, SEVERE, OFF"));
}
}
public String getAlertLevelAsString() {
return getAlertLevel().getName();
}
}