blob: 57da627de82c12c9c7a7a9d02f28d98cba79f050 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.control;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.Statistics;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
import org.apache.geode.internal.statistics.GemFireStatSampler;
import org.apache.geode.internal.statistics.LocalStatListener;
import org.apache.geode.internal.statistics.StatisticsManager;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Allows for the setting of eviction and critical thresholds. These thresholds are compared against
* current heap usage and, with the help of {#link InternalResourceManager}, dispatches events when
* the thresholds are crossed. Gathering memory usage information from the JVM is done using a
* listener on the MemoryMXBean, by polling the JVM and as a listener on GemFire Statistics output
* in order to accommodate differences in the various JVMs.
*
* @since Geode 1.0
*/
public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
private static final Logger logger = LogService.getLogger();
// Allow for an unknown heap pool for VMs we may support in the future.
private static final String HEAP_POOL =
System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "ResourceManager.HEAP_POOL");
// Property for setting the JVM polling interval (below)
public static final String POLLER_INTERVAL_PROP =
DistributionConfig.GEMFIRE_PREFIX + "heapPollerInterval";
// Internal for polling the JVM for changes in heap memory usage.
private static final int POLLER_INTERVAL = Integer.getInteger(POLLER_INTERVAL_PROP, 500);
// This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
private ScheduledExecutorService pollerExecutor;
// Listener for heap memory usage as reported by the Cache stats.
private final LocalStatListener statListener = new LocalHeapStatListener();
// JVM MXBean used to report changes in heap memory usage
@Immutable
private static final MemoryPoolMXBean tenuredMemoryPoolMXBean;
static {
MemoryPoolMXBean matchingMemoryPoolMXBean = null;
for (MemoryPoolMXBean memoryPoolMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
if (memoryPoolMXBean.isUsageThresholdSupported() && isTenured(memoryPoolMXBean)) {
matchingMemoryPoolMXBean = memoryPoolMXBean;
break;
}
}
tenuredMemoryPoolMXBean = matchingMemoryPoolMXBean;
if (tenuredMemoryPoolMXBean == null) {
logger.error("No tenured pools found. Known pools are: {}",
getAllMemoryPoolNames());
}
}
// Calculated value for the amount of JVM tenured heap memory available.
private static final long tenuredPoolMaxMemory;
/*
* Calculates the max memory for the tenured pool. Works around JDK bug:
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7078465 by getting max memory from runtime
* and subtracting all other heap pools from it.
*/
static {
if (tenuredMemoryPoolMXBean != null && tenuredMemoryPoolMXBean.getUsage().getMax() != -1) {
tenuredPoolMaxMemory = tenuredMemoryPoolMXBean.getUsage().getMax();
} else {
long calculatedMaxMemory = Runtime.getRuntime().maxMemory();
List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
for (MemoryPoolMXBean p : pools) {
if (p.getType() == MemoryType.HEAP && p.getUsage().getMax() != -1) {
calculatedMaxMemory -= p.getUsage().getMax();
}
}
tenuredPoolMaxMemory = calculatedMaxMemory;
}
}
private volatile MemoryThresholds thresholds = new MemoryThresholds(tenuredPoolMaxMemory);
private volatile MemoryEvent mostRecentEvent = new MemoryEvent(ResourceType.HEAP_MEMORY,
MemoryState.DISABLED, MemoryState.DISABLED, null, 0L, true, this.thresholds);
private volatile MemoryState currentState = MemoryState.DISABLED;
// Set when startMonitoring() and stopMonitoring() are called
boolean started = false;
// Set to true when setEvictionThreshold(...) is called.
private boolean hasEvictionThreshold = false;
private final InternalResourceManager resourceManager;
private final ResourceAdvisor resourceAdvisor;
private final InternalCache cache;
private final ResourceManagerStats stats;
@MutableForTesting
private static boolean testDisableMemoryUpdates = false;
@MutableForTesting
private static long testBytesUsedForThresholdSet = -1;
/**
* Determines if the name of the memory pool MXBean provided matches a list of known tenured pool
* names.
*
* Package private for testing.
*
* @param memoryPoolMXBean The memory pool MXBean to check.
* @return True if the pool name matches a known tenured pool name, false otherwise.
*/
static boolean isTenured(MemoryPoolMXBean memoryPoolMXBean) {
if (memoryPoolMXBean.getType() != MemoryType.HEAP) {
return false;
}
String name = memoryPoolMXBean.getName();
return name.equals("CMS Old Gen") // Sun Concurrent Mark Sweep GC
|| name.equals("PS Old Gen") // Sun Parallel GC
|| name.equals("G1 Old Gen") // Sun G1 GC
|| name.equals("Old Space") // BEA JRockit 1.5, 1.6 GC
|| name.equals("Tenured Gen") // Hitachi 1.5 GC
|| name.equals("Java heap") // IBM 1.5, 1.6 GC
|| name.equals("GenPauseless Old Gen") // azul C4/GPGC collector
// Allow an unknown pool name to monitor
|| (HEAP_POOL != null && name.equals(HEAP_POOL));
}
HeapMemoryMonitor(final InternalResourceManager resourceManager, final InternalCache cache,
final ResourceManagerStats stats) {
this.resourceManager = resourceManager;
this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
this.cache = cache;
this.stats = stats;
}
/**
* Returns the tenured pool MXBean or throws an IllegaleStateException if one couldn't be found.
*/
public static MemoryPoolMXBean getTenuredMemoryPoolMXBean() {
if (tenuredMemoryPoolMXBean != null) {
return tenuredMemoryPoolMXBean;
}
throw new IllegalStateException(String.format("No tenured pools found. Known pools are: %s",
getAllMemoryPoolNames()));
}
/**
* Returns the names of all available memory pools as a single string.
*/
private static String getAllMemoryPoolNames() {
StringBuilder builder = new StringBuilder("[");
for (MemoryPoolMXBean memoryPoolBean : ManagementFactory.getMemoryPoolMXBeans()) {
builder.append("(Name=").append(memoryPoolBean.getName()).append(";Type=")
.append(memoryPoolBean.getType()).append(";UsageThresholdSupported=")
.append(memoryPoolBean.isUsageThresholdSupported()).append("), ");
}
if (builder.length() > 1) {
builder.setLength(builder.length() - 2);
}
builder.append("]");
return builder.toString();
}
public void setMemoryStateChangeTolerance(int memoryStateChangeTolerance) {
thresholds.setMemoryStateChangeTolerance(memoryStateChangeTolerance);
}
public int getMemoryStateChangeTolerance() {
return thresholds.getMemoryStateChangeTolerance();
}
/**
* Monitoring is done using a combination of data from the JVM and statistics collected from the
* cache. A usage threshold is set on the MemoryMXBean of the JVM to get notifications when the
* JVM crosses the eviction or critical thresholds. A separate usage collection is done either by
* setting up a listener on the cache stats or polling of the JVM, depending on whether stats have
* been enabled. This separate collection is done to return the state of the heap memory back to a
* normal state when memory has been freed.
*/
private void startMonitoring() {
synchronized (this) {
if (this.started) {
return;
}
final boolean statListenerStarted = startCacheStatListener();
if (!statListenerStarted) {
startMemoryPoolPoller();
}
startJVMThresholdListener();
this.started = true;
}
}
/**
* Stops all three mechanisms from monitoring heap usage.
*/
@Override
public void stopMonitoring() {
synchronized (this) {
if (!this.started) {
return;
}
// Stop the poller
this.resourceManager.stopExecutor(this.pollerExecutor);
// Stop the JVM threshold listener
NotificationEmitter emitter = (NotificationEmitter) ManagementFactory.getMemoryMXBean();
try {
emitter.removeNotificationListener(this, null, null);
if (logger.isDebugEnabled()) {
logger.debug("Removed Memory MXBean notification listener" + this);
}
} catch (ListenerNotFoundException ignore) {
if (logger.isDebugEnabled()) {
logger.debug("This instance '{}' was not registered as a Memory MXBean listener", this);
}
}
// Stop the stats listener
final GemFireStatSampler sampler = this.cache.getInternalDistributedSystem().getStatSampler();
if (sampler != null) {
sampler.removeLocalStatListener(this.statListener);
}
this.started = false;
}
}
public static Statistics getTenuredPoolStatistics(StatisticsManager statisticsManager) {
String tenuredPoolName = getTenuredMemoryPoolMXBean().getName();
String tenuredPoolType = "PoolStats";
for (Statistics si : statisticsManager.getStatsList()) {
if (si.getTextId().contains(tenuredPoolName)
&& si.getType().getName().contains(tenuredPoolType)) {
return si;
}
}
return null;
}
/**
* Start a listener on the cache stats to monitor memory usage.
*
* @return True of the listener was correctly started, false otherwise.
*/
private boolean startCacheStatListener() {
final GemFireStatSampler sampler = this.cache.getInternalDistributedSystem().getStatSampler();
if (sampler == null) {
return false;
}
try {
sampler.waitForInitialization();
Statistics si = getTenuredPoolStatistics(
this.cache.getInternalDistributedSystem().getStatisticsManager());
if (si != null) {
sampler.addLocalStatListener(this.statListener, si, "currentUsedMemory");
if (logger.isDebugEnabled()) {
logger.debug("Registered stat listener for " + si.getTextId());
}
return true;
}
} catch (InterruptedException iex) {
Thread.currentThread().interrupt();
this.cache.getCancelCriterion().checkCancelInProgress(iex);
}
return false;
}
/**
* Start a separate thread for polling the JVM for heap memory usage.
*/
private void startMemoryPoolPoller() {
if (tenuredMemoryPoolMXBean == null) {
return;
}
this.pollerExecutor = LoggingExecutors.newScheduledThreadPool("GemfireHeapPoller", 1);
this.pollerExecutor.scheduleAtFixedRate(new HeapPoller(), POLLER_INTERVAL, POLLER_INTERVAL,
TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug(
"Started GemfireHeapPoller to poll the heap every " + POLLER_INTERVAL + " milliseconds");
}
}
void setCriticalThreshold(final float criticalThreshold) {
synchronized (this) {
// If the threshold isn't changing then don't do anything.
if (criticalThreshold == this.thresholds.getCriticalThreshold()) {
return;
}
// Do some basic sanity checking on the new threshold
if (criticalThreshold > 100.0f || criticalThreshold < 0.0f) {
throw new IllegalArgumentException(
"Critical percentage must be greater than 0.0 and less than or equal to 100.0.");
}
if (getTenuredMemoryPoolMXBean() == null) {
throw new IllegalStateException(
String.format("No tenured pools found. Known pools are: %s",
getAllMemoryPoolNames()));
}
if (criticalThreshold != 0 && this.thresholds.isEvictionThresholdEnabled()
&& criticalThreshold <= this.thresholds.getEvictionThreshold()) {
throw new IllegalArgumentException(
"Critical percentage must be greater than the eviction percentage.");
}
this.cache.setQueryMonitorRequiredForResourceManager(criticalThreshold != 0);
this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(), criticalThreshold,
this.thresholds.getEvictionThreshold());
updateStateAndSendEvent();
// Start or stop monitoring based upon whether a threshold has been set
if (this.thresholds.isEvictionThresholdEnabled()
|| this.thresholds.isCriticalThresholdEnabled()) {
startMonitoring();
} else if (!this.thresholds.isEvictionThresholdEnabled()
&& !this.thresholds.isCriticalThresholdEnabled()) {
stopMonitoring();
}
this.stats.changeCriticalThreshold(this.thresholds.getCriticalThresholdBytes());
}
}
@Override
public boolean hasEvictionThreshold() {
return this.hasEvictionThreshold;
}
void setEvictionThreshold(final float evictionThreshold) {
this.hasEvictionThreshold = true;
synchronized (this) {
// If the threshold isn't changing then don't do anything.
if (evictionThreshold == this.thresholds.getEvictionThreshold()) {
return;
}
// Do some basic sanity checking on the new threshold
if (evictionThreshold > 100.0f || evictionThreshold < 0.0f) {
throw new IllegalArgumentException(
"Eviction percentage must be greater than 0.0 and less than or equal to 100.0.");
}
if (getTenuredMemoryPoolMXBean() == null) {
throw new IllegalStateException(
String.format("No tenured pools found. Known pools are: %s",
getAllMemoryPoolNames()));
}
if (evictionThreshold != 0 && this.thresholds.isCriticalThresholdEnabled()
&& evictionThreshold >= this.thresholds.getCriticalThreshold()) {
throw new IllegalArgumentException(
"Eviction percentage must be less than the critical percentage.");
}
this.thresholds = new MemoryThresholds(this.thresholds.getMaxMemoryBytes(),
this.thresholds.getCriticalThreshold(), evictionThreshold);
updateStateAndSendEvent();
// Start or stop monitoring based upon whether a threshold has been set
if (this.thresholds.isEvictionThresholdEnabled()
|| this.thresholds.isCriticalThresholdEnabled()) {
startMonitoring();
} else if (!this.thresholds.isEvictionThresholdEnabled()
&& !this.thresholds.isCriticalThresholdEnabled()) {
stopMonitoring();
}
this.stats.changeEvictionThreshold(this.thresholds.getEvictionThresholdBytes());
}
}
/**
* Compare the number of bytes used (fetched from the JVM) to the thresholds. If necessary, change
* the state and send an event for the state change.
*/
public void updateStateAndSendEvent() {
updateStateAndSendEvent(
testBytesUsedForThresholdSet != -1 ? testBytesUsedForThresholdSet : getBytesUsed(),
"notification");
}
/**
* Compare the number of bytes used to the thresholds. If necessary, change the state and send an
* event for the state change.
*
* @param bytesUsed Number of bytes of heap memory currently used.
* @param eventOrigin Indicates where the event originated e.g. notification vs polling
*/
public void updateStateAndSendEvent(long bytesUsed, String eventOrigin) {
this.stats.changeTenuredHeapUsed(bytesUsed);
synchronized (this) {
MemoryState oldState = this.mostRecentEvent.getState();
MemoryState newState = this.thresholds.computeNextState(oldState, bytesUsed);
if (oldState != newState) {
setUsageThresholdOnMXBean(bytesUsed);
this.currentState = newState;
MemoryEvent event = new MemoryEvent(ResourceType.HEAP_MEMORY, oldState, newState,
this.cache.getMyId(), bytesUsed, true, this.thresholds);
this.upcomingEvent.set(event);
processLocalEvent(event, eventOrigin);
updateStatsFromEvent(event);
// The state didn't change. However, if the state isn't normal and the
// number of bytes used changed, then go ahead and send the event
// again with an updated number of bytes used.
} else if (!oldState.isNormal() && bytesUsed != this.mostRecentEvent.getBytesUsed()) {
MemoryEvent event = new MemoryEvent(ResourceType.HEAP_MEMORY, oldState, newState,
this.cache.getMyId(), bytesUsed, true, this.thresholds);
this.upcomingEvent.set(event);
processLocalEvent(event, eventOrigin);
}
}
}
/**
* Update resource manager stats based upon the given event.
*
* @param event Event from which to derive data for updating stats.
*/
private void updateStatsFromEvent(MemoryEvent event) {
if (event.isLocal()) {
if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
this.stats.incHeapCriticalEvents();
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
this.stats.incHeapSafeEvents();
}
if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
this.stats.incEvictionStartEvents();
} else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
this.stats.incEvictionStopEvents();
}
}
}
/**
* Populate heap memory data in the given profile.
*
* @param profile Profile to populate.
*/
@Override
public void fillInProfile(final ResourceManagerProfile profile) {
final MemoryEvent tempEvent = this.upcomingEvent.get();
if (tempEvent != null) {
this.mostRecentEvent = tempEvent;
this.upcomingEvent.set(null);
}
final MemoryEvent eventToPopulate = this.mostRecentEvent;
profile.setHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(),
eventToPopulate.getThresholds());
}
@Override
public MemoryState getState() {
return this.currentState;
}
@Override
public MemoryThresholds getThresholds() {
MemoryThresholds saveThresholds = this.thresholds;
return new MemoryThresholds(saveThresholds.getMaxMemoryBytes(),
saveThresholds.getCriticalThreshold(), saveThresholds.getEvictionThreshold());
}
/**
* Sets the usage threshold on the tenured pool to either the eviction threshold or the critical
* threshold depending on the current number of bytes used
*
* @param bytesUsed Number of bytes of heap memory currently used.
*/
private void setUsageThresholdOnMXBean(final long bytesUsed) {
//// this method has been made a no-op to fix bug 49064
}
/**
* Register with the JVM to get threshold events.
*
* Package private for testing.
*/
void startJVMThresholdListener() {
final MemoryPoolMXBean memoryPoolMXBean = getTenuredMemoryPoolMXBean();
// Set collection threshold to a low value, so that we can get
// notifications after every GC run. After each such collection
// threshold notification we set the usage thresholds to an
// appropriate value.
if (!testDisableMemoryUpdates) {
memoryPoolMXBean.setCollectionUsageThreshold(1);
}
final long usageThreshold = memoryPoolMXBean.getUsageThreshold();
this.cache.getLogger().info(
String.format("Overridding MemoryPoolMXBean heap threshold bytes %s on pool %s",
new Object[] {usageThreshold, memoryPoolMXBean.getName()}));
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(this, null, null);
}
/**
* Returns the number of bytes of memory reported by the tenured pool as currently in use.
*/
@Override
public long getBytesUsed() {
return getTenuredMemoryPoolMXBean().getUsage().getUsed();
}
public static long getTenuredPoolMaxMemory() {
return tenuredPoolMaxMemory;
}
/**
* Deliver a memory event from one of the monitors to both local listeners and remote resource
* managers. Also, if a critical event is received and a query monitor has been enabled, then the
* query monitor will be notified.
*
* Package private for testing.
*
* @param event Event to process.
* @param eventOrigin Indicates where the event originated e.g. notification vs polling
*/
synchronized void processLocalEvent(MemoryEvent event, String eventOrigin) {
assert event.isLocal();
if (logger.isDebugEnabled()) {
logger.debug("Handling new local event " + event);
}
if (event.getState().isCritical() && !event.getPreviousState().isCritical()) {
this.cache.getLogger().error(
createCriticalThresholdLogMessage(event, eventOrigin, true));
if (!this.cache.isQueryMonitorDisabledForLowMemory()) {
this.cache.getQueryMonitor().setLowMemory(true, event.getBytesUsed());
}
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()) {
this.cache.getLogger().error(
createCriticalThresholdLogMessage(event, eventOrigin, false));
if (!this.cache.isQueryMonitorDisabledForLowMemory()) {
this.cache.getQueryMonitor().setLowMemory(false, event.getBytesUsed());
}
}
if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
this.cache.getLogger().info(String.format("Member: %s above %s eviction threshold",
event.getMember(), "heap"));
} else if (!event.getState().isEviction() && event.getPreviousState().isEviction()) {
this.cache.getLogger().info(String.format("Member: %s below %s eviction threshold",
event.getMember(), "heap"));
}
if (logger.isDebugEnabled()) {
logger.debug("Informing remote members of event " + event);
}
this.resourceAdvisor.updateRemoteProfile();
this.resourceManager.deliverLocalEvent(event);
}
@Override
public void notifyListeners(final Set<ResourceListener> listeners, final ResourceEvent event) {
for (ResourceListener listener : listeners) {
try {
listener.onEvent(event);
} catch (CancelException ignore) {
// ignore
} catch (Throwable t) {
Error err;
if (t instanceof Error && SystemFailure.isJVMFailureError(err = (Error) t)) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned now, so
// don't let this thread continue.
throw err;
}
// Whenever you catch Error or Throwable, you must also
// check for fatal JVM error (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
this.cache.getLogger()
.error("Exception occurred when notifying listeners ", t);
}
}
}
// Handles memory usage notification from MemoryMXBean.
// See ((NotificationEmitter) MemoryMXBean).addNoticiationListener(...).
@Override
public void handleNotification(final Notification notification, final Object callback) {
this.resourceManager.runWithNotifyExecutor(new Runnable() {
@SuppressWarnings("synthetic-access")
@Override
public void run() {
// Not using the information given by the notification in favor
// of constructing fresh information ourselves.
if (!testDisableMemoryUpdates) {
updateStateAndSendEvent();
}
}
});
}
protected Set<DistributedMember> getHeapCriticalMembersFrom(
Set<? extends DistributedMember> members) {
Set<DistributedMember> criticalMembers = getCriticalMembers();
criticalMembers.retainAll(members);
return criticalMembers;
}
private Set<DistributedMember> getCriticalMembers() {
Set<DistributedMember> criticalMembers = new HashSet<>(resourceAdvisor.adviseCriticalMembers());
if (this.mostRecentEvent.getState().isCritical()) {
criticalMembers.add(cache.getMyId());
}
return criticalMembers;
}
public void checkForLowMemory(Function function, DistributedMember targetMember) {
Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
checkForLowMemory(function, targetMembers);
}
public void checkForLowMemory(Function function, Set<? extends DistributedMember> dest) {
LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
if (exception != null) {
throw exception;
}
}
public LowMemoryException createLowMemoryIfNeeded(Function function,
DistributedMember targetMember) {
Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
return createLowMemoryIfNeeded(function, targetMembers);
}
public LowMemoryException createLowMemoryIfNeeded(Function function,
Set<? extends DistributedMember> memberSet) {
if (function.optimizeForWrite()
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
if (!criticalMembersFrom.isEmpty()) {
return new LowMemoryException(
String.format(
"Function: %s cannot be executed because the members %s are running low on memory",
function.getId(), criticalMembersFrom),
criticalMembersFrom);
}
}
return null;
}
/**
* Determines if the given member is in a heap critical state.
*
* @param member Member to check.
*
* @return True if the member's heap memory is in a critical state, false otherwise.
*/
public boolean isMemberHeapCritical(final InternalDistributedMember member) {
if (member.equals(this.cache.getMyId())) {
return this.mostRecentEvent.getState().isCritical();
}
return this.resourceAdvisor.isHeapCritical(member);
}
protected MemoryEvent getMostRecentEvent() {
return mostRecentEvent;
}
protected HeapMemoryMonitor setMostRecentEvent(
MemoryEvent mostRecentEvent) {
this.mostRecentEvent = mostRecentEvent;
return this;
}
class LocalHeapStatListener implements LocalStatListener {
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.statistics.LocalStatListener#statValueChanged(double)
*/
@Override
@SuppressWarnings("synthetic-access")
public void statValueChanged(double value) {
final long usedBytes = (long) value;
try {
HeapMemoryMonitor.this.resourceManager.runWithNotifyExecutor(new Runnable() {
@Override
public void run() {
if (!testDisableMemoryUpdates) {
updateStateAndSendEvent(usedBytes, "polling");
}
}
});
if (HeapMemoryMonitor.logger.isDebugEnabled()) {
HeapMemoryMonitor.logger.debug(
"StatSampler scheduled a " + "handleNotification call with " + usedBytes + " bytes");
}
} catch (RejectedExecutionException ignore) {
if (!HeapMemoryMonitor.this.resourceManager.isClosed()) {
logger.warn("No memory events will be delivered because of RejectedExecutionException");
}
} catch (CacheClosedException ignore) {
// nothing to do
}
}
}
@Override
public String toString() {
return "HeapMemoryMonitor [thresholds=" + this.thresholds + ", mostRecentEvent="
+ this.mostRecentEvent + "]";
}
/**
* Polls the heap if stat sampling is disabled.
*/
class HeapPoller implements Runnable {
@SuppressWarnings("synthetic-access")
@Override
public void run() {
if (testDisableMemoryUpdates) {
return;
}
try {
updateStateAndSendEvent(getBytesUsed(), "polling");
} catch (Exception e) {
HeapMemoryMonitor.logger.debug("Poller Thread caught exception:", e);
}
// TODO: do we need to handle errors too?
}
}
/**
* Overrides the value returned by the JVM as the number of bytes of available memory.
*
* @param testMaxMemoryBytes The value to use as the maximum number of bytes of memory available.
*/
public void setTestMaxMemoryBytes(final long testMaxMemoryBytes) {
synchronized (this) {
MemoryThresholds newThresholds;
if (testMaxMemoryBytes == 0) {
newThresholds = new MemoryThresholds(getTenuredPoolMaxMemory());
} else {
newThresholds = new MemoryThresholds(testMaxMemoryBytes,
this.thresholds.getCriticalThreshold(), this.thresholds.getEvictionThreshold());
}
this.thresholds = newThresholds;
StringBuilder builder = new StringBuilder("In testing, the following values were set");
builder.append(" maxMemoryBytes:").append(newThresholds.getMaxMemoryBytes());
builder.append(" criticalThresholdBytes:").append(newThresholds.getCriticalThresholdBytes());
builder.append(" evictionThresholdBytes:").append(newThresholds.getEvictionThresholdBytes());
logger.debug(builder.toString());
}
}
public static void setTestDisableMemoryUpdates(final boolean newTestDisableMemoryUpdates) {
testDisableMemoryUpdates = newTestDisableMemoryUpdates;
}
/**
* Since the setter methods for the eviction and critical thresholds immediately update state
* based upon the new threshold value and the number of bytes currently used by the JVM, there
* needs to be a way to override the number of bytes of memory reported as in use for testing.
* That's what this method and the value it sets are for.
*
* @param newTestBytesUsedForThresholdSet Value to use as the amount of memory in use when calling
* the setEvictionThreshold or setCriticalThreshold methods are called.
*/
public static void setTestBytesUsedForThresholdSet(final long newTestBytesUsedForThresholdSet) {
testBytesUsedForThresholdSet = newTestBytesUsedForThresholdSet;
}
private String createCriticalThresholdLogMessage(MemoryEvent event, String eventOrigin,
boolean above) {
return "Member: " + event.getMember() + " " + (above ? "above" : "below")
+ " heap critical threshold."
+ " Event generated via " + eventOrigin + "."
+ " Used bytes: " + event.getBytesUsed() + "."
+ " Memory thresholds: " + thresholds;
}
}