blob: 3ba7c21fd28b1caa5f7e8a35cf41edb4ee19c1e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.management.Notification;
import javax.management.ObjectName;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.HasCachePerfStats;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.ManagementException;
/**
* Manager implementation which manages federated MBeans for the entire DistributedSystem and
* controls the JMX server endpoints for JMX clients to connect, such as an RMI server.
*
* <p>
* The FederatingManager is only appropriate for a peer or server in a GemFire distributed system.
*
* @since GemFire 7.0
*/
public class FederatingManager extends Manager implements ManagerMembership {
private static final Logger logger = LogService.getLogger();
/**
* This Executor uses a pool of thread to execute the member addition /removal tasks, This will
* utilize the processing powers available. Going with unbounded queue because tasks wont be
* unbounded in practical situation as number of members will be a finite set at any given point
* of time
*/
private final AtomicReference<ExecutorService> executorService = new AtomicReference<>();
private final AtomicReference<Exception> latestException = new AtomicReference<>();
private final List<Runnable> pendingTasks = new CopyOnWriteArrayList<>();
private final SystemManagementService service;
private final Supplier<ExecutorService> executorServiceSupplier;
private final MBeanProxyFactory proxyFactory;
private final MemberMessenger messenger;
private final ReentrantLock lifecycleLock;
private volatile boolean starting;
@VisibleForTesting
FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
ExecutorService executorService) {
this(repo, system, service, cache, statisticsFactory, statisticsClock, proxyFactory, messenger,
() -> executorService);
}
FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
Supplier<ExecutorService> executorServiceSupplier) {
super(repo, system, cache, statisticsFactory, statisticsClock);
this.service = service;
this.proxyFactory = proxyFactory;
this.messenger = messenger;
this.executorServiceSupplier = executorServiceSupplier;
lifecycleLock = new ReentrantLock();
}
/**
* This method will be invoked whenever a member wants to be a managing node. The exception
* Management exception has to be handled by the caller.
*/
@Override
public synchronized void startManager() {
try {
lifecycleLock.lock();
try {
if (starting || running) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Starting the Federating Manager.... ");
}
starting = true;
executorService.set(executorServiceSupplier.get());
running = true;
} finally {
lifecycleLock.unlock();
}
startManagingActivity();
lifecycleLock.lock();
try {
for (Runnable task : pendingTasks) {
executeTask(task);
}
} finally {
pendingTasks.clear();
starting = false;
lifecycleLock.unlock();
}
messenger.broadcastManagerInfo();
} catch (Exception e) {
cleanupFailedStart();
throw new ManagementException(e);
}
}
private void cleanupFailedStart() {
lifecycleLock.lock();
try {
pendingTasks.clear();
running = false;
starting = false;
} finally {
lifecycleLock.unlock();
}
}
@Override
public synchronized void stopManager() {
lifecycleLock.lock();
try {
// remove hidden management regions and federatedMBeans
if (!running) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Stopping the Federating Manager.... ");
}
running = false;
} finally {
lifecycleLock.unlock();
}
stopManagingActivity();
}
@Override
public boolean isRunning() {
return running;
}
/**
* This method will delegate task to another thread and exit, so that it wont block the membership
* listener
*/
@Override
public void addMember(InternalDistributedMember member) {
lifecycleLock.lock();
try {
if (!running) {
return;
}
executeTask(() -> new AddMemberTask(member).call());
} finally {
lifecycleLock.unlock();
}
}
/**
* This method will delegate task to another thread and exit, so that it wont block the membership
* listener
*/
@Override
public void removeMember(DistributedMember member, boolean crashed) {
lifecycleLock.lock();
try {
Runnable task = new RemoveMemberTask(member, crashed);
if (starting) {
pendingTasks.add(task);
} else if (running) {
executeTask(task);
}
} finally {
lifecycleLock.unlock();
}
}
/**
* this method will delegate task to another thread and exit, so that it wont block the membership
* listener
*/
@Override
public void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected,
String reason) {
service.memberSuspect((InternalDistributedMember) member, whoSuspected, reason);
}
public MemberMessenger getMessenger() {
return messenger;
}
/**
* This will return the last updated time of the proxyMBean.
*
* @param objectName {@link ObjectName} of the MBean
*
* @return last updated time of the proxy
*/
long getLastUpdateTime(ObjectName objectName) {
return proxyFactory.getLastUpdateTime(objectName);
}
/**
* Find a particular proxy instance for a {@link ObjectName}, {@link DistributedMember} and
* interface class If the proxy interface does not implement the given interface class a
* {@link ClassCastException} will be thrown
*
* @param objectName {@link ObjectName} of the MBean
* @param interfaceClass interface class implemented by proxy
*
* @return an instance of proxy exposing the given interface
*/
<T> T findProxy(ObjectName objectName, Class<T> interfaceClass) {
return proxyFactory.findProxy(objectName, interfaceClass);
}
/**
* Find a set of proxies given a {@link DistributedMember}.
*
* @param member {@link DistributedMember}
*
* @return a set of {@link ObjectName}
*/
Set<ObjectName> findAllProxies(DistributedMember member) {
return proxyFactory.findAllProxies(member);
}
/**
* This method will be invoked when a node transitions from managed node to managing node This
* method will block for all GIIs to be completed But each GII is given a specific time frame.
* After that the task will be marked as cancelled.
*/
private void startManagingActivity() {
boolean isDebugEnabled = logger.isDebugEnabled();
Collection<Callable<InternalDistributedMember>> giiTaskList = new ArrayList<>();
for (InternalDistributedMember member : system.getDistributionManager()
.getOtherDistributionManagerIds()) {
giiTaskList.add(new AddMemberTask(member));
}
try {
if (isDebugEnabled) {
logger.debug("Management Resource creation started : ");
}
List<Future<InternalDistributedMember>> futureTaskList =
executorService.get().invokeAll(giiTaskList);
for (Future<InternalDistributedMember> futureTask : futureTaskList) {
try {
DistributedMember returnedMember = futureTask.get();
String memberId = returnedMember != null ? returnedMember.getId() : null;
if (futureTask.isDone()) {
if (isDebugEnabled) {
logger.debug("Monitoring Resource Created for : {}", memberId);
}
}
if (futureTask.isCancelled()) {
// Retry mechanism can be added here after discussions
if (isDebugEnabled) {
logger.debug("Monitoring resource Creation Failed for : {}", memberId);
}
}
} catch (ExecutionException e) {
if (isDebugEnabled) {
logger.debug("ExecutionException during Management GII", e);
}
} catch (CancellationException e) {
if (isDebugEnabled) {
logger.debug("InterruptedException while creating Monitoring resource with error",
new ManagementException(e));
}
}
}
} catch (InterruptedException e) {
if (isDebugEnabled) {
logger.debug("InterruptedException while creating Monitoring resource with error",
new ManagementException(e));
}
} finally {
if (isDebugEnabled) {
logger.debug("Management Resource creation completed");
}
}
}
/**
* This method will be invoked whenever a member stops being a managing node. The
* {@code ManagementException} has to be handled by the caller.
*/
private void stopManagingActivity() {
try {
executorService.get().shutdownNow();
for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) {
removeMemberArtifacts(distributedMember, false);
}
} catch (Exception e) {
throw new ManagementException(e);
}
}
private synchronized void executeTask(Runnable task) {
try {
executorService.get().execute(task);
} catch (RejectedExecutionException ignored) {
// Ignore, we are getting shutdown
}
}
@VisibleForTesting
void addMemberArtifacts(InternalDistributedMember member) {
synchronized (member) {
String appender = MBeanJMXAdapter.getUniqueIDForMember(member);
String monitoringRegionName = ManagementConstants.MONITORING_REGION + "_" + appender;
String notificationRegionName = ManagementConstants.NOTIFICATION_REGION + "_" + appender;
if (cache.getInternalRegion(monitoringRegionName) != null
&& cache.getInternalRegion(notificationRegionName) != null) {
return;
}
try {
// GII wont start at all if its interrupted
if (!Thread.currentThread().isInterrupted()) {
// Create anonymous stats holder for Management Regions
HasCachePerfStats monitoringRegionStats = new HasCachePerfStats() {
@Override
public CachePerfStats getCachePerfStats() {
return new CachePerfStats(cache.getDistributedSystem(),
"RegionStats-managementRegionStats", statisticsClock);
}
@Override
public boolean hasOwnStats() {
return true;
}
};
// Monitoring region for member is created
InternalRegionFactory<String, Object> monitorFactory =
cache.createInternalRegionFactory();
monitorFactory.setScope(Scope.DISTRIBUTED_NO_ACK);
monitorFactory.setDataPolicy(DataPolicy.REPLICATE);
monitorFactory.setConcurrencyChecksEnabled(false);
ManagementCacheListener managementCacheListener =
new ManagementCacheListener(proxyFactory);
monitorFactory.addCacheListener(managementCacheListener);
monitorFactory.setIsUsedForMetaRegion(true);
monitorFactory.setCachePerfStatsHolder(monitoringRegionStats);
// Notification region for member is created
InternalRegionFactory<NotificationKey, Notification> notificationFactory =
cache.createInternalRegionFactory();
notificationFactory.setScope(Scope.DISTRIBUTED_NO_ACK);
notificationFactory.setDataPolicy(DataPolicy.REPLICATE);
notificationFactory.setConcurrencyChecksEnabled(false);
// Fix for issue #49638, evict the internal region _notificationRegion
notificationFactory
.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
ManagementConstants.NOTIF_REGION_MAX_ENTRIES, EvictionAction.LOCAL_DESTROY));
NotificationCacheListener notifListener = new NotificationCacheListener(proxyFactory);
notificationFactory.addCacheListener(notifListener);
notificationFactory.setIsUsedForMetaRegion(true);
notificationFactory.setCachePerfStatsHolder(monitoringRegionStats);
Region<String, Object> proxyMonitoringRegion;
try {
if (!running) {
return;
}
proxyMonitoringRegion = monitorFactory.create(monitoringRegionName);
} catch (TimeoutException | RegionExistsException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error During Internal Region creation", e);
}
throw new ManagementException(e);
}
boolean proxyNotificationRegionCreated = false;
Region<NotificationKey, Notification> proxyNotificationRegion;
try {
if (!running) {
return;
}
proxyNotificationRegion = notificationFactory.create(notificationRegionName);
proxyNotificationRegionCreated = true;
} catch (TimeoutException | RegionExistsException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error During Internal Region creation", e);
}
throw new ManagementException(e);
} finally {
if (!proxyNotificationRegionCreated) {
// Destroy the proxy region if proxy notification region is not created
proxyMonitoringRegion.localDestroyRegion();
}
}
if (logger.isDebugEnabled()) {
logger.debug("Management Region created with Name : {}",
proxyMonitoringRegion.getName());
logger.debug("Notification Region created with Name : {}",
proxyNotificationRegion.getName());
}
// Only the exception case would have destroyed the proxy
// regions. We can safely proceed here.
repo.putEntryInMonitoringRegionMap(member, proxyMonitoringRegion);
repo.putEntryInNotifRegionMap(member, proxyNotificationRegion);
try {
if (!running) {
return;
}
proxyFactory.createAllProxies(member, proxyMonitoringRegion);
managementCacheListener.markReady();
notifListener.markReady();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error During GII Proxy creation", e);
}
throw new ManagementException(e);
}
}
} catch (Exception e) {
throw new ManagementException(e);
}
// Before completing task intimate all listening ProxyListener which might send
// notifications.
service.memberJoined(member);
// Send manager info to the added member
messenger.sendManagerInfo(member);
}
}
@VisibleForTesting
void removeMemberArtifacts(DistributedMember member, boolean crashed) {
Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member);
Region<NotificationKey, Notification> notificationRegion =
repo.getEntryFromNotifRegionMap(member);
if (monitoringRegion == null && notificationRegion == null) {
return;
}
repo.romoveEntryFromMonitoringRegionMap(member);
repo.removeEntryFromNotifRegionMap(member);
// If cache is closed all the regions would have been destroyed implicitly
if (!cache.isClosed()) {
try {
if (monitoringRegion != null) {
proxyFactory.removeAllProxies(member, monitoringRegion);
monitoringRegion.localDestroyRegion();
}
} catch (CancelException | RegionDestroyedException ignore) {
// ignored
}
try {
if (notificationRegion != null) {
notificationRegion.localDestroyRegion();
}
} catch (CancelException | RegionDestroyedException ignore) {
// ignored
}
}
if (!system.getDistributedMember().equals(member)) {
try {
service.memberDeparted((InternalDistributedMember) member, crashed);
} catch (CancelException | RegionDestroyedException ignore) {
// ignored
}
}
}
@VisibleForTesting
public MBeanProxyFactory proxyFactory() {
return proxyFactory;
}
@VisibleForTesting
Exception latestException() {
return latestException.getAndSet(null);
}
@VisibleForTesting
List<Runnable> pendingTasks() {
return pendingTasks;
}
@VisibleForTesting
boolean isStarting() {
return starting;
}
/**
* Actual task of adding a member.
*
* <p>
* Perform the GII request which might originate from transition listener or membership listener.
*
* <p>
* Manager resources are created per member which is visible to this node:
*
* <pre>
* 1) Management Region : its a Replicated NO_ACK region
* 2) Notification Region : its a Replicated Proxy NO_ACK region
* </pre>
*
* <p>
* Listeners are added to the above two regions:
*
* <pre>
* 1) ManagementCacheListener
* 2) NotificationCacheListener
* </pre>
*
* <p>
* This task can be cancelled from the calling thread if a timeout happens. In that case we have
* to handle the thread interrupt.
*/
private class AddMemberTask implements Callable<InternalDistributedMember> {
private final InternalDistributedMember member;
private AddMemberTask(InternalDistributedMember member) {
this.member = member;
}
@Override
public InternalDistributedMember call() {
addMemberArtifacts(member);
return member;
}
}
private class RemoveMemberTask implements Runnable {
private final DistributedMember member;
private final boolean crashed;
private RemoveMemberTask(DistributedMember member, boolean crashed) {
this.member = member;
this.crashed = crashed;
}
@Override
public void run() {
removeMemberArtifacts(member, crashed);
}
}
}