blob: c9f3db90ea125edc2716ea5585614a2c188a12bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.admin.internal;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.geode.CancelException;
import org.apache.geode.admin.AdminDistributedSystem;
import org.apache.geode.admin.DistributedSystemHealthConfig;
import org.apache.geode.admin.GemFireHealth;
import org.apache.geode.admin.GemFireHealthConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.admin.GemFireVM;
import org.apache.geode.internal.admin.GfManagerAgent;
import org.apache.geode.internal.admin.HealthListener;
import org.apache.geode.internal.admin.JoinLeaveListener;
/**
* Provides the implementation of the <code>GemFireHealth</code> administration API. This class is
* responsible for {@linkplain GemFireVM#addHealthListener sending} the {@link GemFireHealthConfig}s
* to the remote member VM in which the health is calcualted.
*
*
* @since GemFire 3.5
*/
public class GemFireHealthImpl implements GemFireHealth, JoinLeaveListener, HealthListener {
/** The distributed system whose health is being monitored */
private final GfManagerAgent agent;
/** The default configuration for checking GemFire health */
protected GemFireHealthConfig defaultConfig;
/**
* Maps the name of a host to its <code>GemFireHealthConfig</code>. Note that the mappings are
* created lazily.
*/
private final Map hostConfigs;
/**
* Maps the name of a host to all of the members (<code>GemFireVM</code>s) that run on that host.
*/
private final Map hostMembers;
/** The members that are known to be in {@link #OKAY_HEALTH}. */
private Collection okayHealth;
/** The members that are known to be in {@link #POOR_HEALTH}. */
private Collection poorHealth;
/** The overall health of GemFire */
private GemFireHealth.Health overallHealth;
/** Is this GemFireHealthImpl closed? */
private boolean isClosed;
/**
* The configuration specifying how the health of the distributed system should be computed.
*/
protected volatile DistributedSystemHealthConfig dsHealthConfig;
/** Monitors the health of the entire distributed system */
private DistributedSystemHealthMonitor dsHealthMonitor = null;
/**
* The distributed system whose health is monitored by this <Code>GemFireHealth</code>.
*/
private final AdminDistributedSystem system;
/////////////////////// Constructors ///////////////////////
/**
* Creates a new <code>GemFireHealthImpl</code> that monitors the health of member of the given
* distributed system.
*/
protected GemFireHealthImpl(GfManagerAgent agent, AdminDistributedSystem system) {
this.agent = agent;
this.system = system;
this.hostConfigs = new HashMap();
this.hostMembers = new HashMap();
this.okayHealth = new HashSet();
this.poorHealth = new HashSet();
this.overallHealth = GOOD_HEALTH;
this.isClosed = false;
GemFireVM[] apps = this.agent.listApplications();
for (int i = 0; i < apps.length; i++) {
GemFireVM member = apps[i];
this.noteNewMember(member);
}
agent.addJoinLeaveListener(this);
setDefaultGemFireHealthConfig(createGemFireHealthConfig(null));
setDistributedSystemHealthConfig(createDistributedSystemHealthConfig());
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("closed=" + isClosed);
sb.append("; hostMembers=" + hostMembers);
sb.append("; okayHealth=" + okayHealth);
sb.append("; poorHealth=" + poorHealth);
sb.append("; overallHealth=" + overallHealth);
sb.append("; diagnosis=" + getDiagnosis());
return sb.toString();
}
////////////////////// Instance Methods //////////////////////
/**
* Returns the <code>DistributedSystem</code> whose health this <code>GemFireHealth</code>
* monitors.
*/
public AdminDistributedSystem getDistributedSystem() {
return this.system;
}
/**
* A "template factory" method for creating a <code>DistributedSystemHealthConfig</code>. It can
* be overridden by subclasses to produce instances of different
* <code>DistributedSystemHealthConfig</code> implementations.
*/
protected DistributedSystemHealthConfig createDistributedSystemHealthConfig() {
return new DistributedSystemHealthConfigImpl();
}
/**
* A "template factory" method for creating a <code>GemFireHealthConfig</code>. It can be
* overridden by subclasses to produce instances of different <code>GemFireHealthConfig</code>
* implementations.
*
* @param hostName The host whose health we are configuring
*/
protected GemFireHealthConfig createGemFireHealthConfig(String hostName) {
return new GemFireHealthConfigImpl(hostName);
}
/**
* Throws an {@link IllegalStateException} if this <code>GemFireHealthImpl</code> is closed.
*/
private void checkClosed() {
if (this.isClosed) {
throw new IllegalStateException(
"Cannot access a closed GemFireHealth instance.");
}
}
/**
* Returns the overall health of GemFire. Note that this method does not contact any of the member
* VMs. Instead, it relies on the members to alert it of changes in its health via a
* {@link HealthListener}.
*/
@Override
public GemFireHealth.Health getHealth() {
checkClosed();
return this.overallHealth;
}
/**
* Resets the overall health to be {@link #GOOD_HEALTH}. It also resets the health in the member
* VMs.
*
* @see GemFireVM#resetHealthStatus
*/
@Override
public void resetHealth() {
checkClosed();
this.overallHealth = GOOD_HEALTH;
this.okayHealth.clear();
this.poorHealth.clear();
synchronized (this) {
for (Iterator iter = hostMembers.values().iterator(); iter.hasNext();) {
List members = (List) iter.next();
for (Iterator iter2 = members.iterator(); iter2.hasNext();) {
GemFireVM member = (GemFireVM) iter2.next();
member.resetHealthStatus();
}
}
}
}
/**
* Aggregates the diagnoses from all members of the distributed system.
*/
@Override
public String getDiagnosis() {
checkClosed();
StringBuffer sb = new StringBuffer();
synchronized (this) {
for (Iterator iter = hostMembers.values().iterator(); iter.hasNext();) {
List members = (List) iter.next();
for (Iterator iter2 = members.iterator(); iter2.hasNext();) {
GemFireVM member = (GemFireVM) iter2.next();
String[] diagnoses = member.getHealthDiagnosis(this.overallHealth);
for (int i = 0; i < diagnoses.length; i++) {
sb.append(diagnoses[i]).append("\n");;
}
}
}
}
return sb.toString();
}
/**
* Starts a new {@link DistributedSystemHealthMonitor}
*/
@Override
public void setDistributedSystemHealthConfig(DistributedSystemHealthConfig config) {
synchronized (this.hostConfigs) {
// If too many threads are changing the health config, then we
// will might get an OutOfMemoryError trying to start a new
// health monitor thread.
if (this.dsHealthMonitor != null) {
this.dsHealthMonitor.stop();
}
this.dsHealthConfig = config;
DistributedSystemHealthEvaluator eval =
new DistributedSystemHealthEvaluator(config, this.agent.getDM());
int interval = this.getDefaultGemFireHealthConfig().getHealthEvaluationInterval();
this.dsHealthMonitor = new DistributedSystemHealthMonitor(eval, this, interval);
this.dsHealthMonitor.start();
}
}
@Override
public DistributedSystemHealthConfig getDistributedSystemHealthConfig() {
checkClosed();
return this.dsHealthConfig;
}
@Override
public GemFireHealthConfig getDefaultGemFireHealthConfig() {
checkClosed();
return this.defaultConfig;
}
@Override
public void setDefaultGemFireHealthConfig(GemFireHealthConfig config) {
checkClosed();
if (config.getHostName() != null) {
throw new IllegalArgumentException(
String.format("The GemFireHealthConfig for %s cannot serve as the default health config.",
config.getHostName()));
}
this.defaultConfig = config;
synchronized (this) {
for (Iterator iter = this.hostMembers.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
InetAddress hostIpAddress = (InetAddress) entry.getKey();
List members = (List) entry.getValue();
GemFireHealthConfig hostConfig = (GemFireHealthConfig) hostConfigs.get(hostIpAddress);
if (hostConfig == null) {
hostConfig = config;
}
for (Iterator iter2 = members.iterator(); iter2.hasNext();) {
GemFireVM member = (GemFireVM) iter2.next();
Assert.assertTrue(member.getHost().equals(hostIpAddress));
member.addHealthListener(this, hostConfig);
}
}
}
// We only need to do this if the health monitoring interval has
// change. This is probably not the most efficient way of doing
// things.
if (this.dsHealthConfig != null) {
setDistributedSystemHealthConfig(this.dsHealthConfig);
}
}
/**
* Returns the GemFireHealthConfig object for the given host name.
*
* @param hostName host name for which the GemFire Health Config is needed
*
* @throws IllegalArgumentException if host with given name could not be found
*/
@Override
public synchronized GemFireHealthConfig getGemFireHealthConfig(String hostName) {
checkClosed();
InetAddress hostIpAddress = null;
try {
hostIpAddress = InetAddress.getByName(hostName);
} catch (UnknownHostException e) {
throw new IllegalArgumentException(
String.format("Could not find a host with name %s.",
hostName),
e);
}
GemFireHealthConfig config = (GemFireHealthConfig) this.hostConfigs.get(hostIpAddress);
if (config == null) {
config = createGemFireHealthConfig(hostName);
this.hostConfigs.put(hostIpAddress, config);
}
return config;
}
/**
* Sets the GemFireHealthConfig object for the given host name.
*
* @param hostName host name for which the GemFire Health Config is needed
* @param config GemFireHealthConfig object to set
*
* @throws IllegalArgumentException if (1) given host name & the host name in the given config do
* not match OR (2) host with given name could not be found OR (3) there are no GemFire
* components running on the given host
*/
@Override
public void setGemFireHealthConfig(String hostName, GemFireHealthConfig config) {
checkClosed();
synchronized (this) {
String configHost = config.getHostName();
if (configHost == null || !configHost.equals(hostName)) {
StringBuffer sb = new StringBuffer();
sb.append("The GemFireHealthConfig configures ");
if (configHost == null) {
sb.append("the default host ");
} else {
sb.append("host \"");
sb.append(config.getHostName());
sb.append("\" ");
}
sb.append("not \"" + hostName + "\"");
throw new IllegalArgumentException(sb.toString());
}
InetAddress hostIpAddress = null;
try {
hostIpAddress = InetAddress.getByName(hostName);
} catch (UnknownHostException e) {
throw new IllegalArgumentException(
String.format("Could not find a host with name %s.",
hostName),
e);
}
List members = (List) this.hostMembers.get(hostIpAddress);
if (members == null || members.isEmpty()) {
throw new IllegalArgumentException(
String.format("There are no GemFire components on host %s.",
hostName));
}
for (Iterator iter = members.iterator(); iter.hasNext();) {
GemFireVM member = (GemFireVM) iter.next();
member.addHealthListener(this, config);
}
}
}
/**
* Tells the members of the distributed system that we are no longer interested in monitoring
* their health.
*
* @see GemFireVM#removeHealthListener
*/
@Override
public void close() {
this.agent.removeJoinLeaveListener(this);
synchronized (this) {
if (this.isClosed) {
return;
}
this.isClosed = true;
if (this.dsHealthMonitor != null) {
this.dsHealthMonitor.stop();
this.dsHealthMonitor = null;
}
try {
for (Iterator iter = hostMembers.values().iterator(); iter.hasNext();) {
List members = (List) iter.next();
for (Iterator iter2 = members.iterator(); iter2.hasNext();) {
GemFireVM member = (GemFireVM) iter2.next();
member.removeHealthListener();
}
}
} catch (CancelException e) {
// if the DS is disconnected, stop trying to distribute to other members
}
hostConfigs.clear();
hostMembers.clear();
okayHealth.clear();
poorHealth.clear();
}
}
@Override
public boolean isClosed() {
return this.isClosed;
}
/**
* Makes note of the newly-joined member
*/
private void noteNewMember(GemFireVM member) {
InetAddress hostIpAddress = member.getHost();
List members = (List) this.hostMembers.get(hostIpAddress);
if (members == null) {
members = new ArrayList();
this.hostMembers.put(hostIpAddress, members);
}
members.add(member);
}
@Override
public synchronized void nodeJoined(GfManagerAgent source, GemFireVM joined) {
noteNewMember(joined);
InetAddress hostIpAddress = joined.getHost();
GemFireHealthConfig config = (GemFireHealthConfig) this.hostConfigs.get(hostIpAddress);
if (config == null) {
config = this.getDefaultGemFireHealthConfig();
}
joined.addHealthListener(this, config);
}
/**
* Makes note of the newly-left member
*/
@Override
public synchronized void nodeLeft(GfManagerAgent source, GemFireVM left) {
InetAddress hostIpAddress = left.getHost();
List members = (List) this.hostMembers.get(hostIpAddress);
if (members != null) {
members.remove(left);
if (members.isEmpty()) {
// No more members on the host
this.hostConfigs.remove(hostIpAddress);
this.hostMembers.remove(hostIpAddress);
}
}
this.okayHealth.remove(left);
this.poorHealth.remove(left);
reevaluateHealth();
}
/**
* Does the same thing as {@link #nodeLeft}
*/
@Override
public void nodeCrashed(GfManagerAgent source, GemFireVM crashed) {
nodeLeft(source, crashed);
}
/**
* Re-evaluates the overall health of GemFire
*/
private void reevaluateHealth() {
if (!this.poorHealth.isEmpty()) {
this.overallHealth = POOR_HEALTH;
} else if (!this.okayHealth.isEmpty()) {
this.overallHealth = OKAY_HEALTH;
} else {
this.overallHealth = GOOD_HEALTH;
}
}
@Override
public void healthChanged(GemFireVM member, GemFireHealth.Health status) {
if (status == GOOD_HEALTH) {
this.okayHealth.remove(member);
this.poorHealth.remove(member);
} else if (status == OKAY_HEALTH) {
this.okayHealth.add(member);
this.poorHealth.remove(member);
} else if (status == POOR_HEALTH) {
this.okayHealth.remove(member);
this.poorHealth.add(member);
} else {
Assert.assertTrue(false, "Unknown health code: " + status);
}
reevaluateHealth();
}
}