blob: cad0d16252c71c6bec3980f7a71864c2b170ef0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.control;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.control.RestoreRedundancyOperation;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.SizedBasedLoadProbe;
import org.apache.geode.internal.classloader.ClassPathLoader;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.runtime.RestoreRedundancyResults;
import org.apache.geode.util.internal.GeodeGlossary;
/**
* Implementation of ResourceManager with additional internal-only methods.
*/
public class InternalResourceManager implements ResourceManager {
private static final Logger logger = LogService.getLogger();
final int MAX_RESOURCE_MANAGER_EXE_THREADS =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "resource.manager.threads", 1);
private final Collection<CompletableFuture<Void>> startupTasks = new ArrayList<>();
public enum ResourceType {
HEAP_MEMORY(0x1), OFFHEAP_MEMORY(0x2), MEMORY(0x3), ALL(0xFFFFFFFF);
final int id;
ResourceType(final int id) {
this.id = id;
}
}
private final Map<ResourceType, Set<ResourceListener<?>>> listeners = new HashMap<>();
private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService notifyExecutor;
// A map of in progress rebalance operations. The value is Boolean because ConcurrentHashMap does
// not support null values.
private final Map<RebalanceOperation, Boolean> inProgressRebalanceOperations =
new ConcurrentHashMap<>();
// A map of in progress restore redundancy completable futures. The value is Boolean because
// ConcurrentHashMap does not support null values.
private final Map<CompletableFuture<RestoreRedundancyResults>, Boolean> inProgressRedundancyOperations =
new ConcurrentHashMap<>();
final InternalCache cache;
private LoadProbe loadProbe;
private final ResourceManagerStats stats;
private final ResourceAdvisor resourceAdvisor;
private boolean closed;
private final Map<ResourceType, ResourceMonitor> resourceMonitors;
@MutableForTesting
private static ResourceObserver observer = new ResourceObserverAdapter();
private static final String PR_LOAD_PROBE_CLASS =
System.getProperty(GeodeGlossary.GEMFIRE_PREFIX + "ResourceManager.PR_LOAD_PROBE_CLASS",
SizedBasedLoadProbe.class.getName());
public static InternalResourceManager getInternalResourceManager(Cache cache) {
return (InternalResourceManager) cache.getResourceManager();
}
public static InternalResourceManager createResourceManager(final InternalCache cache) {
return new InternalResourceManager(cache);
}
private InternalResourceManager(InternalCache cache) {
this.cache = cache;
resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
stats = new ResourceManagerStats(cache.getDistributedSystem());
// Create a new executor that other classes may use for handling resource
// related tasks
scheduledExecutor = LoggingExecutors
.newScheduledThreadPool(MAX_RESOURCE_MANAGER_EXE_THREADS, "ResourceManagerRecoveryThread ");
// Initialize the load probe
try {
Class<?> loadProbeClass = ClassPathLoader.getLatest().forName(PR_LOAD_PROBE_CLASS);
loadProbe = (LoadProbe) loadProbeClass.newInstance();
} catch (Exception e) {
throw new InternalGemFireError("Unable to instantiate " + PR_LOAD_PROBE_CLASS, e);
}
// Create a new executor the resource manager and the monitors it creates
// can use to handle dispatching of notifications.
notifyExecutor =
CoreLoggingExecutors.newSerialThreadPoolWithFeedStatistics(0,
stats.getResourceEventQueueStatHelper(), "Notification Handler",
thread -> thread.setPriority(Thread.MAX_PRIORITY), null,
stats.getResourceEventPoolStatHelper(), getThreadMonitorObj());
// Create the monitors
Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<>();
tempMonitors.put(ResourceType.HEAP_MEMORY,
new HeapMemoryMonitor(this, cache, stats, new TenuredHeapConsumptionMonitor()));
tempMonitors.put(ResourceType.OFFHEAP_MEMORY,
new OffHeapMemoryMonitor(this, cache, cache.getOffHeapStore(), stats));
resourceMonitors = Collections.unmodifiableMap(tempMonitors);
// Initialize the listener sets so that it only needs to be done once
for (ResourceType resourceType : new ResourceType[] {ResourceType.HEAP_MEMORY,
ResourceType.OFFHEAP_MEMORY}) {
Set<ResourceListener<?>> emptySet = new CopyOnWriteArraySet<>();
listeners.put(resourceType, emptySet);
}
}
public void close() {
for (ResourceMonitor monitor : resourceMonitors.values()) {
monitor.stopMonitoring();
}
stopExecutor(scheduledExecutor);
stopExecutor(notifyExecutor);
stats.close();
closed = true;
}
boolean isClosed() {
return closed;
}
public void fillInProfile(final Profile profile) {
assert profile instanceof ResourceManagerProfile;
for (ResourceMonitor monitor : resourceMonitors.values()) {
monitor.fillInProfile((ResourceManagerProfile) profile);
}
}
public void addResourceListener(final ResourceListener<?> listener) {
addResourceListener(ResourceType.ALL, listener);
}
public void addResourceListener(final ResourceType resourceType,
final ResourceListener<?> listener) {
for (Map.Entry<ResourceType, Set<ResourceListener<?>>> mapEntry : listeners.entrySet()) {
if ((mapEntry.getKey().id & resourceType.id) != 0) {
mapEntry.getValue().add(listener);
}
}
}
public void removeResourceListener(final ResourceListener<?> listener) {
removeResourceListener(ResourceType.ALL, listener);
}
public void removeResourceListener(final ResourceType resourceType,
final ResourceListener<?> listener) {
for (Map.Entry<ResourceType, Set<ResourceListener<?>>> mapEntry : listeners.entrySet()) {
if ((mapEntry.getKey().id & resourceType.id) != 0) {
mapEntry.getValue().remove(listener);
}
}
}
public Set<ResourceListener<?>> getResourceListeners(final ResourceType resourceType) {
return listeners.get(resourceType);
}
/**
* Deliver an event received from remote resource managers to the local listeners.
*
* @param event Event to deliver.
*/
public void deliverEventFromRemote(final ResourceEvent event) {
assert !event.isLocal();
if (logger.isTraceEnabled()) {
logger.trace(
"New remote event to deliver for member " + event.getMember() + ": event=" + event);
}
runWithNotifyExecutor(() -> deliverLocalEvent(event));
}
void deliverLocalEvent(ResourceEvent event) {
// Wait for an event to be handled by all listeners before starting to send another event
synchronized (listeners) {
resourceMonitors.get(event.getType()).notifyListeners(listeners.get(event.getType()), event);
}
stats.incResourceEventsDelivered();
}
public HeapMemoryMonitor getHeapMonitor() {
return (HeapMemoryMonitor) resourceMonitors.get(ResourceType.HEAP_MEMORY);
}
public OffHeapMemoryMonitor getOffHeapMonitor() {
return (OffHeapMemoryMonitor) resourceMonitors.get(ResourceType.OFFHEAP_MEMORY);
}
public MemoryMonitor getMemoryMonitor(boolean offheap) {
if (offheap) {
return getOffHeapMonitor();
} else {
return getHeapMonitor();
}
}
/**
* Use threshold event processor to execute the event embedded in the runnable.
*
* @param runnable Runnable to execute.
*/
void runWithNotifyExecutor(Runnable runnable) {
try {
notifyExecutor.execute(runnable);
} catch (RejectedExecutionException ignore) {
if (!closed) {
logger.warn("No memory events will be delivered because of RejectedExecutionException");
}
}
}
@Override
public RebalanceFactory createRebalanceFactory() {
return new RebalanceFactoryImpl();
}
@Override
public Set<RebalanceOperation> getRebalanceOperations() {
return Collections.unmodifiableSet(inProgressRebalanceOperations.keySet());
}
void addInProgressRebalance(RebalanceOperation op) {
inProgressRebalanceOperations.put(op, Boolean.TRUE);
}
void removeInProgressRebalance(RebalanceOperation op) {
inProgressRebalanceOperations.remove(op);
}
class RebalanceFactoryImpl implements RebalanceFactory {
private Set<String> includedRegions;
private Set<String> excludedRegions;
@Override
public RebalanceOperation simulate() {
RegionFilter filter = new FilterByPath(includedRegions, excludedRegions);
RebalanceOperationImpl op = new RebalanceOperationImpl(cache, true, filter);
op.start();
return op;
}
@Override
public RebalanceOperation start() {
RegionFilter filter = new FilterByPath(includedRegions, excludedRegions);
RebalanceOperationImpl op = new RebalanceOperationImpl(cache, false, filter);
op.start();
return op;
}
@Override
public RebalanceFactory excludeRegions(Set<String> regions) {
excludedRegions = regions;
return this;
}
@Override
public RebalanceFactory includeRegions(Set<String> regions) {
includedRegions = regions;
return this;
}
}
@Override
public RestoreRedundancyOperation createRestoreRedundancyOperation() {
return new RestoreRedundancyOperationImpl(cache);
}
@Override
public Set<CompletableFuture<RestoreRedundancyResults>> getRestoreRedundancyFutures() {
return Collections.unmodifiableSet(inProgressRedundancyOperations.keySet());
}
void addInProgressRestoreRedundancy(
CompletableFuture<RestoreRedundancyResults> completableFuture) {
inProgressRedundancyOperations.put(completableFuture, Boolean.TRUE);
}
void removeInProgressRestoreRedundancy(
CompletableFuture<RestoreRedundancyResults> completableFuture) {
inProgressRedundancyOperations.remove(completableFuture);
}
void stopExecutor(ExecutorService executor) {
if (executor == null) {
return;
}
executor.shutdown();
final int secToWait =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "prrecovery-close-timeout", 5);
try {
executor.awaitTermination(secToWait, TimeUnit.SECONDS);
} catch (InterruptedException x) {
Thread.currentThread().interrupt();
logger.debug("Failed in interrupting the Resource Manager Thread due to interrupt");
}
if (!executor.isTerminated()) {
logger.warn("Failed to stop resource manager threads in {} seconds, forcing shutdown",
secToWait);
List<Runnable> remainingTasks = executor.shutdownNow();
remainingTasks.forEach(runnable -> {
if (runnable instanceof Future) {
((Future<?>) runnable).cancel(true);
}
});
}
}
public ScheduledExecutorService getExecutor() {
return scheduledExecutor;
}
public ResourceManagerStats getStats() {
return stats;
}
/**
* For testing only, an observer which is called when rebalancing is started and finished for a
* particular region. This observer is called even the "rebalancing" is actually redundancy
* recovery for a particular region.
*/
public static void setResourceObserver(ResourceObserver observer) {
if (observer == null) {
observer = new ResourceObserverAdapter();
}
InternalResourceManager.observer = observer;
}
/**
* Get the resource observer used for testing. Never returns null.
*/
public static ResourceObserver getResourceObserver() {
return observer;
}
/**
* For testing only. Receives callbacks for resource related events.
*/
public interface ResourceObserver {
/**
* Indicates that rebalancing has started on a given region.
*/
void rebalancingStarted(Region<?, ?> region);
/**
* Indicates that rebalancing has finished on a given region.
*/
void rebalancingFinished(Region<?, ?> region);
/**
* Indicates that recovery has started on a given region.
*/
void recoveryStarted(Region<?, ?> region);
/**
* Indicates that recovery has finished on a given region.
*/
void recoveryFinished(Region<?, ?> region);
/**
* Indicated that a membership event triggered a recovery operation, but the recovery operation
* will not be executed because there is already an existing recovery operation waiting to
* happen on this region.
*/
void recoveryConflated(PartitionedRegion region);
/**
* Indicates that a bucket is being moved from the source member to the target member.
*
* @param region the region
* @param bucketId the bucket being moved
* @param source the member the bucket is moving from
* @param target the member the bucket is moving to
*/
void movingBucket(Region<?, ?> region, int bucketId, DistributedMember source,
DistributedMember target);
/**
* Indicates that a bucket primary is being moved from the source member to the target member.
*
* @param region the region
* @param bucketId the bucket primary being moved
* @param source the member the bucket primary is moving from
* @param target the member the bucket primary is moving to
*/
void movingPrimary(Region<?, ?> region, int bucketId, DistributedMember source,
DistributedMember target);
}
public static class ResourceObserverAdapter implements ResourceObserver {
@Override
public void rebalancingFinished(Region<?, ?> region) {
rebalancingOrRecoveryFinished(region);
}
@Override
public void rebalancingStarted(Region<?, ?> region) {
rebalancingOrRecoveryStarted(region);
}
@Override
public void recoveryFinished(Region<?, ?> region) {
rebalancingOrRecoveryFinished(region);
}
@Override
public void recoveryStarted(Region<?, ?> region) {
rebalancingOrRecoveryStarted(region);
}
/**
* Indicated the a rebalance or a recovery has started.
*/
public void rebalancingOrRecoveryStarted(Region<?, ?> region) {
// do nothing
}
/**
* Indicated the a rebalance or a recovery has finished.
*/
public void rebalancingOrRecoveryFinished(Region<?, ?> region) {
// do nothing
}
@Override
public void recoveryConflated(PartitionedRegion region) {
// do nothing
}
@Override
public void movingBucket(Region<?, ?> region, int bucketId, DistributedMember source,
DistributedMember target) {
// do nothing
}
@Override
public void movingPrimary(Region<?, ?> region, int bucketId, DistributedMember source,
DistributedMember target) {
// do nothing
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.DistributionAdvisee#getCancelCriterion()
*/
public CancelCriterion getCancelCriterion() {
return cache.getCancelCriterion();
}
public ResourceAdvisor getResourceAdvisor() {
return resourceAdvisor;
}
public LoadProbe getLoadProbe() {
return loadProbe;
}
/**
* This method is test purposes only.
*/
public LoadProbe setLoadProbe(LoadProbe probe) {
LoadProbe old = loadProbe;
loadProbe = probe;
return old;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.control.ResourceManager#setEvictionHeapPercentage(int)
*/
@Override
public void setCriticalOffHeapPercentage(float offHeapPercentage) {
getOffHeapMonitor().setCriticalThreshold(offHeapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getCriticalOffHeapPercentage() {
return getOffHeapMonitor().getCriticalThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setEvictionOffHeapPercentage(float offHeapPercentage) {
getOffHeapMonitor().setEvictionThreshold(offHeapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getEvictionOffHeapPercentage() {
return getOffHeapMonitor().getEvictionThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setCriticalHeapPercentage(float heapPercentage) {
getHeapMonitor().setCriticalThreshold(heapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getCriticalHeapPercentage() {
return getHeapMonitor().getCriticalThreshold();
}
/**
* {@inheritDoc}
*/
@Override
public void setEvictionHeapPercentage(float heapPercentage) {
getHeapMonitor().setEvictionThreshold(heapPercentage);
}
/**
* {@inheritDoc}
*/
@Override
public float getEvictionHeapPercentage() {
return getHeapMonitor().getEvictionThreshold();
}
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
return distributionManager.getThreadMonitoring();
} else {
return null;
}
}
/**
* Adds a task that represents an asynchronous action during startup
*
* @param startupTask the CompletableFuture startup task
*/
public void addStartupTask(CompletableFuture<Void> startupTask) {
Objects.requireNonNull(startupTask);
synchronized (startupTasks) {
startupTasks.add(startupTask);
}
}
/**
* Clears the startup tasks and returns a CompletableFuture that completes when all of the startup
* tasks complete.
*
* @return a CompletableFuture that completes when all of the startup tasks complete
*/
public CompletableFuture<Void> allOfStartupTasks() {
synchronized (startupTasks) {
CompletableFuture<?>[] completableFutures = startupTasks.toArray(new CompletableFuture[0]);
startupTasks.clear();
return CompletableFuture.allOf(completableFutures);
}
}
@VisibleForTesting
Collection<CompletableFuture<Void>> getStartupTasks() {
return startupTasks;
}
}