blob: d144aaf0adab2fcfae54c71e6d4e886ee05884d5 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.control;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.control.RebalanceFactory;
import com.gemstone.gemfire.cache.control.RebalanceOperation;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.OverflowQueueWithDMStats;
import com.gemstone.gemfire.distributed.internal.SerialQueuedExecutorWithDMStats;
import com.gemstone.gemfire.internal.ClassPathLoader;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe;
import com.gemstone.gemfire.internal.cache.partitioned.SizedBasedLoadProbe;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* Implementation of ResourceManager with additional internal-only methods.
* TODO: cleanup raw and typed collections
*
* @author Kirk Lund
* @author Mitch Thomas
* @author Swapnil Bawaskar
* @author David Hoots
*/
public class InternalResourceManager implements ResourceManager {
private static final Logger logger = LogService.getLogger();
public enum ResourceType {
HEAP_MEMORY(0x1), OFFHEAP_MEMORY(0x2), MEMORY(0x3), ALL(0xFFFFFFFF);
int id;
private ResourceType(final int id) {
this.id = id;
}
};
private Map<ResourceType, Set<ResourceListener>> listeners = new HashMap<ResourceType, Set<ResourceListener>>();
private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService notifyExecutor;
//The set of in progress rebalance operations.
private final Set<RebalanceOperation> inProgressOperations = new HashSet<RebalanceOperation>();
private final Object inProgressOperationsLock = new Object();
final GemFireCacheImpl cache;
private final LoadProbe loadProbe;
private final ResourceManagerStats stats;
private final ResourceAdvisor resourceAdvisor;
private boolean closed = true;
private final Map<ResourceType, ResourceMonitor> resourceMonitors;
private static ResourceObserver observer = new ResourceObserverAdapter();
private static String PR_LOAD_PROBE_CLASS = System.getProperty(
"gemfire.ResourceManager.PR_LOAD_PROBE_CLASS", SizedBasedLoadProbe.class
.getName());
public static InternalResourceManager getInternalResourceManager(Cache cache) {
return (InternalResourceManager) cache.getResourceManager();
}
public static InternalResourceManager createResourceManager(final GemFireCacheImpl cache) {
return new InternalResourceManager(cache);
}
private InternalResourceManager(GemFireCacheImpl cache) {
this.cache = cache;
this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
this.stats = new ResourceManagerStats(cache.getDistributedSystem());
// Create a new executor that other classes may use for handling resource
// related tasks
final ThreadGroup thrdGrp = LoggingThreadGroup.createThreadGroup(
"ResourceManagerThreadGroup", logger);
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(thrdGrp, r, "ResourceManagerRecoveryThread");
thread.setDaemon(true);
return thread;
}
};
this.scheduledExecutor = new ScheduledThreadPoolExecutor(1, tf);
// Initialize the load probe
try {
Class loadProbeClass = ClassPathLoader.getLatest().forName(PR_LOAD_PROBE_CLASS);
this.loadProbe = (LoadProbe) loadProbeClass.newInstance();
} catch (Exception e) {
throw new InternalGemFireError("Unable to instantiate " + PR_LOAD_PROBE_CLASS, e);
}
// Create a new executor the resource manager and the monitors it creates
// can use to handle dispatching of notifications.
final ThreadGroup listenerInvokerthrdGrp = LoggingThreadGroup.createThreadGroup(
"ResourceListenerInvokerThreadGroup", logger);
ThreadFactory eventProcessorFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(listenerInvokerthrdGrp, r, "Notification Handler");
thread.setDaemon(true);
thread.setPriority(Thread.MAX_PRIORITY);
return thread;
}
};
BlockingQueue<Runnable> threadQ = new OverflowQueueWithDMStats(this.stats.getResourceEventQueueStatHelper());
this.notifyExecutor = new SerialQueuedExecutorWithDMStats(threadQ, this.stats.getResourceEventPoolStatHelper(),
eventProcessorFactory);
// Create the monitors
Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<ResourceType, ResourceMonitor>();
tempMonitors.put(ResourceType.HEAP_MEMORY, new HeapMemoryMonitor(this, cache, this.stats));
tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, this.stats));
this.resourceMonitors = Collections.unmodifiableMap(tempMonitors);
// Initialize the listener sets so that it only needs to be done once
for (ResourceType resourceType : new ResourceType[] { ResourceType.HEAP_MEMORY, ResourceType.OFFHEAP_MEMORY }) {
Set<ResourceListener> emptySet = new CopyOnWriteArraySet<ResourceListener>();
this.listeners.put(resourceType, emptySet);
}
this.closed = false;
}
public void close() {
for (ResourceMonitor monitor : this.resourceMonitors.values()) {
monitor.stopMonitoring();
}
stopExecutor(this.scheduledExecutor);
stopExecutor(this.notifyExecutor);
this.stats.close();
this.closed = true;
}
boolean isClosed() {
return this.closed;
}
public void fillInProfile(final Profile profile) {
assert profile instanceof ResourceManagerProfile;
for (ResourceMonitor monitor : this.resourceMonitors.values()) {
monitor.fillInProfile((ResourceManagerProfile) profile);
}
}
public void addResourceListener(final ResourceListener listener) {
addResourceListener(ResourceType.ALL, listener);
}
public void addResourceListener(final ResourceType resourceType, final ResourceListener listener) {
for (Map.Entry<ResourceType, Set<ResourceListener>> mapEntry : this.listeners.entrySet()) {
if ((mapEntry.getKey().id & resourceType.id) != 0) {
mapEntry.getValue().add(listener);
}
}
}
public void removeResourceListener(final ResourceListener listener) {
removeResourceListener(ResourceType.ALL, listener);
}
public void removeResourceListener(final ResourceType resourceType, final ResourceListener listener) {
for (Map.Entry<ResourceType, Set<ResourceListener>> mapEntry : this.listeners.entrySet()) {
if ((mapEntry.getKey().id & resourceType.id) != 0) {
mapEntry.getValue().remove(listener);
}
}
}
public Set<ResourceListener> getResourceListeners(final ResourceType resourceType) {
return this.listeners.get(resourceType);
}
/**
* Deliver an event received from remote resource managers to the local
* listeners.
*
* @param event
* Event to deliver.
*/
public void deliverEventFromRemote(final ResourceEvent event) {
assert !event.isLocal();
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("New remote event to deliver for member " + event.getMember() + ": event=" + event);
}
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Remote event to deliver for member " + event.getMember() + ":" + event);
}
runWithNotifyExecutor(new Runnable() {
@Override
public void run() {
deliverLocalEvent(event);
}
});
return;
}
void deliverLocalEvent(ResourceEvent event) {
// TODO OFFHEAP MERGE: the fix for 49555 did the following:
// if (listener instanceof LocalRegion) {
// LocalRegion lr = (LocalRegion)listener;
// if (lr.isDestroyed()) {
// // do not added destroyed region to fix bug 49555
// return;
// }
// }
// need to figure out where this code should now be
// Wait for an event to be handled by all listeners before starting to send another event
synchronized (this.listeners) {
this.resourceMonitors.get(event.getType()).notifyListeners(this.listeners.get(event.getType()), event);
}
this.stats.incResourceEventsDelivered();
}
public HeapMemoryMonitor getHeapMonitor() {
return (HeapMemoryMonitor) this.resourceMonitors.get(ResourceType.HEAP_MEMORY);
}
public OffHeapMemoryMonitor getOffHeapMonitor() {
return (OffHeapMemoryMonitor) this.resourceMonitors.get(ResourceType.OFFHEAP_MEMORY);
}
/**
* Use threshold event processor to execute the event embedded in the
* runnable.
*
* @param runnable
* Runnable to execute.
*/
void runWithNotifyExecutor(Runnable runnable) {
try {
this.notifyExecutor.execute(runnable);
} catch (RejectedExecutionException e) {
if (!isClosed()) {
this.cache.getLoggerI18n().warning(LocalizedStrings.ResourceManager_REJECTED_EXECUTION_CAUSE_NOHEAP_EVENTS);
}
}
}
@Override
public RebalanceFactory createRebalanceFactory() {
return new RebalanceFactoryImpl();
}
@Override
public Set<RebalanceOperation> getRebalanceOperations() {
synchronized(this.inProgressOperationsLock) {
return new HashSet<RebalanceOperation>(this.inProgressOperations);
}
}
void addInProgressRebalance(RebalanceOperation op) {
synchronized(this.inProgressOperationsLock) {
this.inProgressOperations.add(op);
}
}
void removeInProgressRebalance(RebalanceOperation op) {
synchronized(this.inProgressOperationsLock) {
this.inProgressOperations.remove(op);
}
}
class RebalanceFactoryImpl implements RebalanceFactory {
private Set<String> includedRegions;
private Set<String> excludedRegions;
@Override
public RebalanceOperation simulate() {
RegionFilter filter = new FilterByPath(this.includedRegions, this.excludedRegions);
RebalanceOperationImpl op = new RebalanceOperationImpl(InternalResourceManager.this.cache, true, filter);
op.start();
return op;
}
@Override
public RebalanceOperation start() {
RegionFilter filter = new FilterByPath(this.includedRegions, this.excludedRegions);
RebalanceOperationImpl op = new RebalanceOperationImpl(InternalResourceManager.this.cache, false,filter);
op.start();
return op;
}
@Override
public RebalanceFactory excludeRegions(Set<String> regions) {
this.excludedRegions = regions;
return this;
}
@Override
public RebalanceFactory includeRegions(Set<String> regions) {
this.includedRegions = regions;
return this;
}
}
void stopExecutor(ExecutorService executor) {
if (executor == null) {
return;
}
executor.shutdown();
final int secToWait = Integer.getInteger("gemfire.prrecovery-close-timeout", 120).intValue();
try {
executor.awaitTermination(secToWait, TimeUnit.SECONDS);
}
catch (InterruptedException x) {
Thread.currentThread().interrupt();
logger.debug("Failed in interrupting the Resource Manager Thread due to interrupt");
}
if (! executor.isTerminated()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ResourceManager_FAILED_TO_STOP_RESOURCE_MANAGER_THREADS,
new Object[]{secToWait}));
}
}
public ScheduledExecutorService getExecutor() {
return this.scheduledExecutor;
}
public ResourceManagerStats getStats() {
return this.stats;
}
/**
* For testing only, an observer which is called when
* rebalancing is started and finished for a particular region.
* This observer is called even the "rebalancing" is actually
* redundancy recovery for a particular region.
* @param observer
*/
public static void setResourceObserver(ResourceObserver observer) {
if(observer == null) {
observer = new ResourceObserverAdapter();
}
InternalResourceManager.observer = observer;
}
/**
* Get the resource observer used for testing. Never returns null.
*/
public static ResourceObserver getResourceObserver() {
return observer;
}
/**
* For testing only. Receives callbacks for resource related events.
* @author dsmith
*/
public static interface ResourceObserver {
/**
* Indicates that rebalancing has started on a given region.
* @param region
*/
public void rebalancingStarted(Region region);
/**
* Indicates that rebalancing has finished on a given region.
* @param region
*/
public void rebalancingFinished(Region region);
/**
* Indicates that recovery has started on a given region.
* @param region
*/
public void recoveryStarted(Region region);
/**
* Indicates that recovery has finished on a given region.
* @param region
*/
public void recoveryFinished(Region region);
/**
* Indicated that a membership event triggered a recovery
* operation, but the recovery operation will not be executed
* because there is already an existing recovery operation
* waiting to happen on this region.
* @param region
*/
public void recoveryConflated(PartitionedRegion region);
/**
* Indicates that a bucket is being moved from the source member to the
* target member.
* @param region the region
* @param bucketId the bucket being moved
* @param source the member the bucket is moving from
* @param target the member the bucket is moving to
*/
public void movingBucket(Region region,
int bucketId,
DistributedMember source,
DistributedMember target);
/**
* Indicates that a bucket primary is being moved from the source member
* to the target member.
* @param region the region
* @param bucketId the bucket primary being moved
* @param source the member the bucket primary is moving from
* @param target the member the bucket primary is moving to
*/
public void movingPrimary(Region region,
int bucketId,
DistributedMember source,
DistributedMember target);
}
public static class ResourceObserverAdapter implements ResourceObserver {
@Override
public void rebalancingFinished(Region region) {
rebalancingOrRecoveryFinished(region);
}
@Override
public void rebalancingStarted(Region region) {
rebalancingOrRecoveryStarted(region);
}
@Override
public void recoveryFinished(Region region) {
rebalancingOrRecoveryFinished(region);
}
@Override
public void recoveryStarted(Region region) {
rebalancingOrRecoveryStarted(region);
}
/**
* Indicated the a rebalance or a recovery has started.
*/
@SuppressWarnings("unused")
public void rebalancingOrRecoveryStarted(Region region) {
//do nothing
}
/**
* Indicated the a rebalance or a recovery has finished.
*/
@SuppressWarnings("unused")
public void rebalancingOrRecoveryFinished(Region region) {
//do nothing
}
@Override
public void recoveryConflated(PartitionedRegion region) {
//do nothing
}
@Override
public void movingBucket(Region region,
int bucketId,
DistributedMember source,
DistributedMember target) {
//do nothing
}
@Override
public void movingPrimary(Region region,
int bucketId,
DistributedMember source,
DistributedMember target) {
//do nothing
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.distributed.internal.DistributionAdvisee#getCancelCriterion()
*/
public CancelCriterion getCancelCriterion() {
return this.cache.getCancelCriterion();
}
public ResourceAdvisor getResourceAdvisor() {
return this.resourceAdvisor;
}
public LoadProbe getLoadProbe() {
return this.loadProbe;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.control.ResourceManager#setEvictionHeapPercentage(int)
*/
@Override
public void setCriticalOffHeapPercentage(float offHeapPercentage) {
getOffHeapMonitor().setCriticalThreshold(offHeapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getCriticalOffHeapPercentage() {
return getOffHeapMonitor().getCriticalThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setEvictionOffHeapPercentage(float offHeapPercentage) {
getOffHeapMonitor().setEvictionThreshold(offHeapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getEvictionOffHeapPercentage() {
return getOffHeapMonitor().getEvictionThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setCriticalHeapPercentage(float heapPercentage) {
getHeapMonitor().setCriticalThreshold(heapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getCriticalHeapPercentage() {
return getHeapMonitor().getCriticalThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setEvictionHeapPercentage(float heapPercentage) {
getHeapMonitor().setEvictionThreshold(heapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getEvictionHeapPercentage() {
return getHeapMonitor().getEvictionThreshold();
}
}