blob: 83462456ccce093dc75661e00ac45888417d434a [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2013 VMware, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. VMware products are covered by
* more patents listed at http://www.vmware.com/go/patents.
*========================================================================
*/
package com.gemstone.gemfire.internal.cache.control;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.GemFireStatSampler;
import com.gemstone.gemfire.internal.LocalStatListener;
import com.gemstone.gemfire.internal.SetUtils;
import com.gemstone.gemfire.internal.StatisticsImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds.MemoryState;
import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* Allows for the setting of eviction and critical thresholds. These thresholds
* are compared against current heap usage and, with the help of {#link
* InternalResourceManager}, dispatches events when the thresholds are crossed.
* Gathering memory usage information from the JVM is done using a listener on
* the MemoryMXBean, by polling the JVM and as a listener on GemFire Statistics
* output in order to accommodate differences in the various JVMs.
*
* @author Kirk Lund
* @author Mitch Thomas
* @author Swapnil Bawaskar
* @author David Hoots
* @since 9.0
*/
public class HeapMemoryMonitor implements NotificationListener, ResourceMonitor {
private static final Logger logger = LogService.getLogger();
// Allow for an unknown heap pool for VMs we may support in the future.
private static final String HEAP_POOL = System.getProperty("gemfire.ResourceManager.HEAP_POOL");
// Property for setting the JVM polling interval (below)
public static final String POLLER_INTERVAL_PROP = "gemfire.heapPollerInterval";
// Internal for polling the JVM for changes in heap memory usage.
private static final int POLLER_INTERVAL = Integer.getInteger(POLLER_INTERVAL_PROP, 500).intValue();
// This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
private ScheduledExecutorService pollerExecutor;
// Listener for heap memory usage as reported by the Cache stats.
private final LocalStatListener statListener = new LocalHeapStatListener();
private volatile MemoryThresholds thresholds = new MemoryThresholds(tenuredPoolMaxMemory);
private volatile MemoryEvent mostRecentEvent = new MemoryEvent(ResourceType.HEAP_MEMORY, MemoryState.DISABLED,
MemoryState.DISABLED, null, 0L, true, this.thresholds);
private volatile MemoryState currentState = MemoryState.DISABLED;
//Set when startMonitoring() and stopMonitoring() are called
private Boolean started = false;
// Set to true when setEvictionThreshold(...) is called.
private boolean hasEvictionThreshold = false;
// Only change state when these counters exceed {@link HeapMemoryMonitor#memoryStateChangeTolerance}
private int criticalToleranceCounter;
private int evictionToleranceCounter;
private final InternalResourceManager resourceManager;
private final ResourceAdvisor resourceAdvisor;
private final GemFireCacheImpl cache;
private final ResourceManagerStats stats;
private static boolean testDisableMemoryUpdates = false;
private static long testBytesUsedForThresholdSet = -1;
/*
* Number of eviction or critical state changes that have to occur before the
* event is delivered. This was introduced because we saw sudden memory usage
* spikes in jrockit VM.
*/
private static final int memoryStateChangeTolerance;
static {
String vendor = System.getProperty("java.vendor");
if (vendor.contains("Sun") || vendor.contains("Oracle")) {
memoryStateChangeTolerance = Integer.getInteger("gemfire.memoryEventTolerance",1);
} else {
memoryStateChangeTolerance = Integer.getInteger("gemfire.memoryEventTolerance",5);
}
}
// JVM MXBean used to report changes in heap memory usage
private static final MemoryPoolMXBean tenuredMemoryPoolMXBean;
static {
MemoryPoolMXBean matchingMemoryPoolMXBean = null;
for (MemoryPoolMXBean memoryPoolMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
if (memoryPoolMXBean.isUsageThresholdSupported() && isTenured(memoryPoolMXBean)) {
matchingMemoryPoolMXBean = memoryPoolMXBean;
break;
}
}
tenuredMemoryPoolMXBean = matchingMemoryPoolMXBean;
if (tenuredMemoryPoolMXBean == null) {
logger.error(LocalizedMessage.create(LocalizedStrings.HeapMemoryMonitor_NO_POOL_FOUND_POOLS_0, getAllMemoryPoolNames()));
}
}
// Calculated value for the amount of JVM tenured heap memory available.
private static final long tenuredPoolMaxMemory;
/*
* Calculates the max memory for the tenured pool. Works around JDK bug:
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7078465 by getting max
* memory from runtime and subtracting all other heap pools from it.
*/
static {
if (tenuredMemoryPoolMXBean != null && tenuredMemoryPoolMXBean.getUsage().getMax() != -1) {
tenuredPoolMaxMemory = tenuredMemoryPoolMXBean.getUsage().getMax();
} else {
long calculatedMaxMemory = Runtime.getRuntime().maxMemory();
List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
for (MemoryPoolMXBean p : pools) {
if (p.getType() == MemoryType.HEAP && p.getUsage().getMax() != -1) {
calculatedMaxMemory -= p.getUsage().getMax();
}
}
tenuredPoolMaxMemory = calculatedMaxMemory;
}
}
/**
* Determines if the name of the memory pool MXBean provided matches a list of
* known tenured pool names.
*
* Package private for testing.
*
* @param memoryPoolMXBean
* The memory pool MXBean to check.
* @return True if the pool name matches a known tenured pool name, false
* otherwise.
*/
static boolean isTenured(MemoryPoolMXBean memoryPoolMXBean) {
if (memoryPoolMXBean.getType() != MemoryType.HEAP) {
return false;
}
String name = memoryPoolMXBean.getName();
return name.equals("CMS Old Gen") // Sun Concurrent Mark Sweep GC
|| name.equals("PS Old Gen") // Sun Parallel GC
|| name.equals("G1 Old Gen") // Sun G1 GC
|| name.equals("Old Space") // BEA JRockit 1.5, 1.6 GC
|| name.equals("Tenured Gen") // Hitachi 1.5 GC
|| name.equals("Java heap") // IBM 1.5, 1.6 GC
// Allow an unknown pool name to monitor
|| (HEAP_POOL != null && name.equals(HEAP_POOL));
}
HeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final ResourceManagerStats stats) {
this.resourceManager = resourceManager;
this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
this.cache = cache;
this.stats = stats;
}
/**
* Returns the tenured pool MXBean or throws an IllegaleStateException if one
* couldn't be found.
*/
public static MemoryPoolMXBean getTenuredMemoryPoolMXBean() {
if (tenuredMemoryPoolMXBean != null) {
return tenuredMemoryPoolMXBean;
}
throw new IllegalStateException(LocalizedStrings.HeapMemoryMonitor_NO_POOL_FOUND_POOLS_0
.toLocalizedString(getAllMemoryPoolNames()));
}
/**
* Returns the names of all available memory pools as a single string.
*/
private static String getAllMemoryPoolNames() {
StringBuilder builder = new StringBuilder("[");
for (MemoryPoolMXBean memoryPoolBean : ManagementFactory.getMemoryPoolMXBeans()) {
builder.append("(Name=").append(memoryPoolBean.getName()).append(";Type=").append(memoryPoolBean.getType()).append(
";UsageThresholdSupported=").append(memoryPoolBean.isUsageThresholdSupported()).append("), ");
}
if (builder.length() > 1) {
builder.setLength(builder.length() - 2);
}
builder.append("]");
return builder.toString();
}
/**
* Monitoring is done using a combination of data from the JVM and statistics
* collected from the cache. A usage threshold is set on the MemoryMXBean of
* the JVM to get notifications when the JVM crosses the eviction or critical
* thresholds. A separate usage collection is done either by setting up a
* listener on the cache stats or polling of the JVM, depending on whether
* stats have been enabled. This separate collection is done to return the
* state of the heap memory back to a normal state when memory has been freed.
*/
private void startMonitoring() {
synchronized (this) {
if (this.started) {
return;
}
final boolean statListenerStarted = startCacheStatListener();
if (!statListenerStarted) {
startMemoryPoolPoller();
}
startJVMThresholdListener();
this.started = true;
}
}
/**
* Stops all three mechanisms from monitoring heap usage.
*/
@Override
public void stopMonitoring() {
synchronized (this) {
if (!this.started) {
return;
}
// Stop the poller
this.resourceManager.stopExecutor(this.pollerExecutor);
// Stop the JVM threshold listener
NotificationEmitter emitter = (NotificationEmitter) ManagementFactory.getMemoryMXBean();
try {
emitter.removeNotificationListener(this, null, null);
this.cache.getLoggerI18n().fine("Removed Memory MXBean notification listener" + this);
} catch (ListenerNotFoundException e) {
this.cache.getLoggerI18n().fine("This instance '" + toString() + "' was not registered as a Memory MXBean listener");
}
// Stop the stats listener
final GemFireStatSampler sampler = this.cache.getDistributedSystem().getStatSampler();
if (sampler != null) {
sampler.removeLocalStatListener(this.statListener);
}
this.started = false;
}
}
/**
* Start a listener on the cache stats to monitor memory usage.
*
* @return True of the listener was correctly started, false otherwise.
*/
private boolean startCacheStatListener() {
final GemFireStatSampler sampler = this.cache.getDistributedSystem().getStatSampler();
if (sampler == null) {
return false;
}
try {
sampler.waitForInitialization();
String tenuredPoolName = getTenuredMemoryPoolMXBean().getName();
List list = this.cache.getDistributedSystem().getStatsList();
synchronized (list) {
for (Object o : list) {
if (o instanceof StatisticsImpl) {
StatisticsImpl si = (StatisticsImpl) o;
if (si.getTextId().contains(tenuredPoolName) && si.getType().getName().contains("PoolStats")) {
sampler.addLocalStatListener(this.statListener, si, "currentUsedMemory");
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Registered stat listener for " + si.getTextId());
}
return true;
}
}
}
}
} catch (InterruptedException iex) {
Thread.currentThread().interrupt();
this.cache.getCancelCriterion().checkCancelInProgress(iex);
}
return false;
}
/**
* Start a separate thread for polling the JVM for heap memory usage.
*/
private void startMemoryPoolPoller() {
if (tenuredMemoryPoolMXBean == null) {
return;
}
final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("HeapPoller", logger);
final ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(threadGroup, r, "GemfireHeapPoller");
thread.setDaemon(true);
return thread;
}
};
this.pollerExecutor = Executors.newScheduledThreadPool(1, threadFactory);
this.pollerExecutor.scheduleAtFixedRate(new HeapPoller(), POLLER_INTERVAL, POLLER_INTERVAL, TimeUnit.MILLISECONDS);
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Started GemfireHeapPoller to poll the heap every " + POLLER_INTERVAL + " milliseconds");
}
}
void setCriticalThreshold(final float criticalThreshold) {
synchronized (this) {
// If the threshold isn't changing then don't do anything.
if (criticalThreshold == this.thresholds.getCriticalThreshold()) {
return;
}
// Do some basic sanity checking on the new threshold
if (criticalThreshold > 100.0f || criticalThreshold < 0.0f) {
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GT_ZERO_AND_LTE_100
.toLocalizedString());
}
if (getTenuredMemoryPoolMXBean() == null) {
throw new IllegalStateException(LocalizedStrings.HeapMemoryMonitor_NO_POOL_FOUND_POOLS_0
.toLocalizedString(getAllMemoryPoolNames()));
}
if (criticalThreshold != 0 && this.thresholds.isEvictionThresholdEnabled()
&& criticalThreshold <= this.thresholds.getEvictionThreshold()) {
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GTE_EVICTION_PERCENTAGE
.toLocalizedString());
}
this.cache.setQueryMonitorRequiredForResourceManager(criticalThreshold != 0);
this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(), criticalThreshold, this.thresholds
.getEvictionThreshold());
if (testBytesUsedForThresholdSet != -1) {
updateStateAndSendEvent(testBytesUsedForThresholdSet);
} else {
updateStateAndSendEvent(getBytesUsed());
}
// Start or stop monitoring based upon whether a threshold has been set
if (this.thresholds.isEvictionThresholdEnabled() || this.thresholds.isCriticalThresholdEnabled()) {
startMonitoring();
} else if (!this.thresholds.isEvictionThresholdEnabled() && !this.thresholds.isCriticalThresholdEnabled()) {
stopMonitoring();
}
this.stats.changeCriticalThreshold(this.thresholds.getCriticalThresholdBytes());
}
}
float getCriticalThreshold() {
return this.thresholds.getCriticalThreshold();
}
public boolean hasEvictionThreshold() {
return this.hasEvictionThreshold;
}
void setEvictionThreshold(final float evictionThreshold) {
this.hasEvictionThreshold = true;
synchronized (this) {
// If the threshold isn't changing then don't do anything.
if (evictionThreshold == this.thresholds.getEvictionThreshold()) {
return;
}
// Do some basic sanity checking on the new threshold
if (evictionThreshold > 100.0f || evictionThreshold < 0.0f) {
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_EVICTION_PERCENTAGE_GT_ZERO_AND_LTE_100
.toLocalizedString());
}
if (getTenuredMemoryPoolMXBean() == null) {
throw new IllegalStateException(LocalizedStrings.HeapMemoryMonitor_NO_POOL_FOUND_POOLS_0
.toLocalizedString(getAllMemoryPoolNames()));
}
if (evictionThreshold != 0 && this.thresholds.isCriticalThresholdEnabled()
&& evictionThreshold >= this.thresholds.getCriticalThreshold()) {
throw new IllegalArgumentException(LocalizedStrings.MemoryMonitor_EVICTION_PERCENTAGE_LTE_CRITICAL_PERCENTAGE
.toLocalizedString());
}
this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(), this.thresholds.getCriticalThreshold(),
evictionThreshold);
if (testBytesUsedForThresholdSet != -1) {
updateStateAndSendEvent(testBytesUsedForThresholdSet);
} else {
updateStateAndSendEvent(getBytesUsed());
}
// Start or stop monitoring based upon whether a threshold has been set
if (this.thresholds.isEvictionThresholdEnabled() || this.thresholds.isCriticalThresholdEnabled()) {
startMonitoring();
} else if (!this.thresholds.isEvictionThresholdEnabled() && !this.thresholds.isCriticalThresholdEnabled()) {
stopMonitoring();
}
this.stats.changeEvictionThreshold(this.thresholds.getEvictionThresholdBytes());
}
}
public float getEvictionThreshold() {
return this.thresholds.getEvictionThreshold();
}
/**
* Compare the number of bytes used (fetched from the JVM) to the thresholds.
* If necessary, change the state and send an event for the state change.
*/
public void updateStateAndSendEvent() {
updateStateAndSendEvent(getBytesUsed());
}
/**
* Compare the number of bytes used to the thresholds. If necessary, change the state
* and send an event for the state change.
*
* Public for testing.
*
* @param bytesUsed Number of bytes of heap memory currently used.
*/
public void updateStateAndSendEvent(long bytesUsed) {
this.stats.changeTenuredHeapUsed(bytesUsed);
synchronized (this) {
MemoryState oldState = this.mostRecentEvent.getState();
MemoryState newState = this.thresholds.computeNextState(oldState, bytesUsed);
if (oldState != newState) {
setUsageThresholdOnMXBean(bytesUsed);
if (!skipEventDueToToleranceLimits(oldState, newState)) {
this.currentState = newState;
MemoryEvent event = new MemoryEvent(ResourceType.HEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true,
this.thresholds);
this.upcomingEvent.set(event);
processLocalEvent(event);
updateStatsFromEvent(event);
}
// The state didn't change. However, if the state isn't normal and the
// number of bytes used changed, then go ahead and send the event
// again with an updated number of bytes used.
} else if (!oldState.isNormal() && bytesUsed != this.mostRecentEvent.getBytesUsed()) {
MemoryEvent event = new MemoryEvent(ResourceType.HEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true,
this.thresholds);
this.upcomingEvent.set(event);
processLocalEvent(event);
}
}
}
/**
* Update resource manager stats based upon the given event.
*
* @param event
* Event from which to derive data for updating stats.
*/
private void updateStatsFromEvent(MemoryEvent event) {
if (event.isLocal()) {
if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
this.stats.incHeapCriticalEvents();
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
this.stats.incHeapSafeEvents();
}
if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
this.stats.incEvictionStartEvents();
} else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
this.stats.incEvictionStopEvents();
}
}
}
/**
* Populate heap memory data in the given profile.
*
* @param profile
* Profile to populate.
*/
@Override
public void fillInProfile(final ResourceManagerProfile profile) {
final MemoryEvent tempEvent = this.upcomingEvent.get();
if (tempEvent != null) {
this.mostRecentEvent = tempEvent;
this.upcomingEvent.set(null);
}
final MemoryEvent eventToPopulate = this.mostRecentEvent;
profile.setHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(), eventToPopulate.getThresholds());
}
public MemoryState getState() {
return this.currentState;
}
public MemoryThresholds getThresholds() {
MemoryThresholds saveThresholds = this.thresholds;
return new MemoryThresholds(saveThresholds.getMaxMemoryBytes(), saveThresholds.getCriticalThreshold(),
saveThresholds.getEvictionThreshold());
}
/**
* Sets the usage threshold on the tenured pool to either the eviction
* threshold or the critical threshold depending on the current number of
* bytes used
*
* @param bytesUsed
* Number of bytes of heap memory currently used.
*/
private void setUsageThresholdOnMXBean(final long bytesUsed) {
if (testDisableMemoryUpdates) {
return;
}
final MemoryPoolMXBean memoryPoolMXBean = getTenuredMemoryPoolMXBean();
final MemoryThresholds saveThresholds = this.thresholds;
if (bytesUsed < saveThresholds.getEvictionThresholdBytes()) {
memoryPoolMXBean.setUsageThreshold(saveThresholds.getEvictionThresholdBytes());
} else {
memoryPoolMXBean.setUsageThreshold(saveThresholds.getCriticalThresholdBytes());
}
}
/**
* Register with the JVM to get threshold events.
*
* Package private for testing.
*/
void startJVMThresholdListener() {
final MemoryPoolMXBean memoryPoolMXBean = getTenuredMemoryPoolMXBean();
// Set collection threshold to a low value, so that we can get
// notifications after every GC run. After each such collection
// threshold notification we set the usage thresholds to an
// appropriate value.
if (!testDisableMemoryUpdates) {
memoryPoolMXBean.setCollectionUsageThreshold(1);
}
final long usageThreshold = memoryPoolMXBean.getUsageThreshold();
this.cache.getLoggerI18n().info(
LocalizedStrings.HeapMemoryMonitor_OVERRIDDING_MEMORYPOOLMXBEAN_HEAP_0_NAME_1,
new Object[] { Long.valueOf(usageThreshold), memoryPoolMXBean.getName() });
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(this, null, null);
}
/**
* To avoid memory spikes in jrockit, we only deliver events if we receive
* more than {@link HeapMemoryMonitor#memoryStateChangeTolerance} of the same
* state change.
*
* @return True if an event should be skipped, false otherwise.
*/
private boolean skipEventDueToToleranceLimits(MemoryState oldState, MemoryState newState) {
if (testDisableMemoryUpdates) {
return false;
}
if (newState.isEviction() && !oldState.isEviction()) {
this.evictionToleranceCounter++;
this.criticalToleranceCounter = 0;
if (this.evictionToleranceCounter <= memoryStateChangeTolerance) {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("State "+newState+" ignored. toleranceCounter:"
+this.evictionToleranceCounter+" MEMORY_EVENT_TOLERANCE:" + memoryStateChangeTolerance);
}
return true;
}
} else if (newState.isCritical()) {
this.criticalToleranceCounter++;
this.evictionToleranceCounter = 0;
if (this.criticalToleranceCounter <= memoryStateChangeTolerance) {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("State "+newState+" ignored. toleranceCounter:"
+this.criticalToleranceCounter+" MEMORY_EVENT_TOLERANCE:" + memoryStateChangeTolerance);
}
return true;
}
} else {
this.criticalToleranceCounter = 0;
this.evictionToleranceCounter = 0;
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("TOLERANCE counters reset");
}
}
return false;
}
/**
* Returns the number of bytes of memory reported by the tenured pool as
* currently in use.
*/
public long getBytesUsed() {
return getTenuredMemoryPoolMXBean().getUsage().getUsed();
}
public static long getTenuredPoolMaxMemory() {
return tenuredPoolMaxMemory;
}
/**
* Deliver a memory event from one of the monitors to both local listeners and
* remote resource managers. Also, if a critical event is received and a query
* monitor has been enabled, then the query monitor will be notified.
*
* Package private for testing.
*
* @param event
* Event to process.
*/
synchronized void processLocalEvent(MemoryEvent event) {
assert event.isLocal();
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Handling new local event " + event);
}
if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
this.cache.getLoggerI18n().error(LocalizedStrings.MemoryMonitor_MEMBER_ABOVE_CRITICAL_THRESHOLD,
new Object[] { event.getMember(), "heap" });
if (!this.cache.isQueryMonitorDisabledForLowMemory()) {
QueryMonitor.setLowMemory(true, event.getBytesUsed());
this.cache.getQueryMonitor().cancelAllQueriesDueToMemory();
}
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
this.cache.getLoggerI18n().error(LocalizedStrings.MemoryMonitor_MEMBER_BELOW_CRITICAL_THRESHOLD,
new Object[] { event.getMember(), "heap" });
if (!this.cache.isQueryMonitorDisabledForLowMemory()) {
QueryMonitor.setLowMemory(false, event.getBytesUsed());
}
}
if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
this.cache.getLoggerI18n().info(LocalizedStrings.MemoryMonitor_MEMBER_ABOVE_HIGH_THRESHOLD,
new Object[] { event.getMember(), "heap" });
} else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
this.cache.getLoggerI18n().info(LocalizedStrings.MemoryMonitor_MEMBER_BELOW_HIGH_THRESHOLD,
new Object[] { event.getMember(), "heap" });
}
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Informing remote members of event " + event);
}
this.resourceAdvisor.updateRemoteProfile();
this.resourceManager.deliverLocalEvent(event);
}
@Override
public void notifyListeners(final Set<ResourceListener> listeners, final ResourceEvent event) {
for (ResourceListener listener : listeners) {
try {
listener.onEvent(event);
} catch (CancelException ignore) {
// ignore
} catch (Throwable t) {
Error err;
if (t instanceof Error && SystemFailure.isJVMFailureError(err = (Error) t)) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned now, so
// don't let this thread continue.
throw err;
}
// Whenever you catch Error or Throwable, you must also
// check for fatal JVM error (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
this.cache.getLoggerI18n().error(LocalizedStrings.MemoryMonitor_EXCEPTION_OCCURED_WHEN_NOTIFYING_LISTENERS, t);
}
}
}
// Handles memory usage notification from MemoryMXBean.
// See ((NotificationEmitter) MemoryMXBean).addNoticiationListener(...).
@Override
public void handleNotification(final Notification notification, final Object callback) {
this.resourceManager.runWithNotifyExecutor(new Runnable() {
@SuppressWarnings("synthetic-access")
@Override
public void run() {
// Not using the information given by the notification in favor
// of constructing fresh information ourselves.
if (!testDisableMemoryUpdates) {
updateStateAndSendEvent();
}
}
});
}
/**
* Given a set of members, determine if any member in the set is above
* critical threshold.
*
* @param members
* The set of members to check.
* @return True if the set contains a member above critical threshold, false
* otherwise
*/
public boolean containsHeapCriticalMembers(final Set<InternalDistributedMember> members) {
if (members.contains(this.cache.getMyId()) && this.mostRecentEvent.getState().isCritical()) {
return true;
}
return SetUtils.intersectsWith(members, this.resourceAdvisor.adviseCritialMembers());
}
/**
* Determines if the given member is in a heap critical state.
*
* @param member
* Member to check.
*
* @return True if the member's heap memory is in a critical state, false
* otherwise.
*/
public final boolean isMemberHeapCritical(final InternalDistributedMember member) {
if (member.equals(this.cache.getMyId())) {
return this.mostRecentEvent.getState().isCritical();
}
return this.resourceAdvisor.isHeapCritical(member);
}
class LocalHeapStatListener implements LocalStatListener {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.LocalStatListener#statValueChanged(double)
*/
@Override
@SuppressWarnings("synthetic-access")
public void statValueChanged(double value) {
final long usedBytes = (long)value;
try {
HeapMemoryMonitor.this.resourceManager.runWithNotifyExecutor(new Runnable(){
@Override
public void run() {
if (!testDisableMemoryUpdates) {
updateStateAndSendEvent(usedBytes);
}
}
});
if (HeapMemoryMonitor.this.cache.getLoggerI18n().fineEnabled()) {
HeapMemoryMonitor.this.cache.getLoggerI18n().fine("StatSampler scheduled a " +
"handleNotification call with "+usedBytes+" bytes");
}
} catch (RejectedExecutionException e) {
if (!HeapMemoryMonitor.this.resourceManager.isClosed()) {
HeapMemoryMonitor.this.cache.getLoggerI18n().warning(LocalizedStrings.ResourceManager_REJECTED_EXECUTION_CAUSE_NOHEAP_EVENTS);
}
} catch (CacheClosedException e) {
// nothing to do
}
}
}
@Override
public String toString() {
return "HeapMemoryMonitor [thresholds=" + this.thresholds
+ ", mostRecentEvent=" + this.mostRecentEvent
+ ", criticalToleranceCounter=" + this.criticalToleranceCounter
+ ", evictionToleranceCounter=" + this.evictionToleranceCounter + "]";
}
/**
* Polls the heap if stat sampling is disabled.
*
* @author sbawaska
*/
class HeapPoller implements Runnable {
@SuppressWarnings("synthetic-access")
@Override
public void run() {
if (testDisableMemoryUpdates) {
return;
}
try {
updateStateAndSendEvent(getBytesUsed());
} catch (Exception e) {
HeapMemoryMonitor.this.cache.getLoggerI18n().fine("Poller Thread caught exception:",e);
}
//TODO: do we need to handle errors too?
}
}
/**
* Overrides the value returned by the JVM as the number of bytes of available
* memory.
*
* @param testMaxMemoryBytes
* The value to use as the maximum number of bytes of memory
* available.
*/
public void setTestMaxMemoryBytes(final long testMaxMemoryBytes) {
synchronized (this) {
MemoryThresholds newThresholds;
if (testMaxMemoryBytes == 0) {
newThresholds = new MemoryThresholds(getTenuredPoolMaxMemory());
} else {
newThresholds = new MemoryThresholds(testMaxMemoryBytes, this.thresholds.getCriticalThreshold(), this.thresholds
.getEvictionThreshold());
}
this.thresholds = newThresholds;
StringBuilder builder = new StringBuilder("In testing, the following values were set");
builder.append(" maxMemoryBytes:" + newThresholds.getMaxMemoryBytes());
builder.append(" criticalThresholdBytes:" + newThresholds.getCriticalThresholdBytes());
builder.append(" evictionThresholdBytes:" + newThresholds.getEvictionThresholdBytes());
this.cache.getLoggerI18n().fine(builder.toString());
}
}
public static void setTestDisableMemoryUpdates(final boolean newTestDisableMemoryUpdates) {
testDisableMemoryUpdates = newTestDisableMemoryUpdates;
}
/**
* Since the setter methods for the eviction and critical thresholds
* immediately update state based upon the new threshold value and the number
* of bytes currently used by the JVM, there needs to be a way to override the
* number of bytes of memory reported as in use for testing. That's what this
* method and the value it sets are for.
*
* @param newTestBytesUsedForThresholdSet
* Value to use as the amount of memory in use when calling the
* setEvictionThreshold or setCriticalThreshold methods are called.
*/
public static void setTestBytesUsedForThresholdSet(final long newTestBytesUsedForThresholdSet) {
testBytesUsedForThresholdSet = newTestBytesUsedForThresholdSet;
}
}