blob: 023f18246bf42321b2b7c656395de769cb8b40ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.management.Notification;
import javax.management.ObjectName;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.AlreadyRunningException;
import org.apache.geode.management.AsyncEventQueueMXBean;
import org.apache.geode.management.CacheServerMXBean;
import org.apache.geode.management.DiskStoreMXBean;
import org.apache.geode.management.DistributedLockServiceMXBean;
import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.GatewayReceiverMXBean;
import org.apache.geode.management.GatewaySenderMXBean;
import org.apache.geode.management.LocatorMXBean;
import org.apache.geode.management.LockServiceMXBean;
import org.apache.geode.management.ManagementException;
import org.apache.geode.management.ManagerMXBean;
import org.apache.geode.management.MemberMXBean;
import org.apache.geode.management.RegionMXBean;
import org.apache.geode.management.internal.beans.ManagementAdapter;
import org.apache.geode.management.membership.MembershipEvent;
import org.apache.geode.management.membership.MembershipListener;
/**
* This is the concrete implementation of ManagementService which is the gateway to various JMX
* operations over a GemFire System
*
* @since GemFire 7.0
*/
public class SystemManagementService extends BaseManagementService {
private static final Logger logger = LogService.getLogger();
/**
* The concrete implementation of DistributedSystem that provides internal-only functionality.
*/
private final InternalDistributedSystem system;
/**
* core component for distribution
*/
private LocalManager localManager;
/**
* This is a notification hub to listen all the notifications emitted from all the MBeans in a
* peer cache./cache server
*/
private final NotificationHub notificationHub;
/**
* whether the service is closed or not if cache is closed automatically this service will be
* closed
*/
private volatile boolean closed = false;
/**
* has the management service has started yet
*/
private volatile boolean isStarted = false;
/**
* Adapter to interact with platform MBean server
*/
private final MBeanJMXAdapter jmxAdapter;
private final InternalCacheForClientAccess cache;
private FederatingManager federatingManager;
private final ManagementAgent agent;
private final ManagementResourceRepo repo;
/**
* This membership listener will listen on membership events after the node has transformed into a
* Managing node.
*/
private ManagementMembershipListener listener;
/**
* Proxy aggregator to create aggregate MBeans e.g. DistributedSystem and DistributedRegion
* GemFire comes with a default aggregator.
*/
private final List<ProxyListener> proxyListeners;
private final UniversalListenerContainer universalListenerContainer =
new UniversalListenerContainer();
public static BaseManagementService newSystemManagementService(
InternalCacheForClientAccess cache) {
return new SystemManagementService(cache).init();
}
protected SystemManagementService(InternalCacheForClientAccess cache) {
this.cache = cache;
this.system = (InternalDistributedSystem) cache.getDistributedSystem();
// This is a safe check to ensure Management service does not start for a
// system which is disconnected.
// Most likely scenario when this will happen is when a cache is closed and we are at this
// point.
if (!system.isConnected()) {
throw new DistributedSystemDisconnectedException(
"This connection to a distributed system has been disconnected.");
}
this.jmxAdapter = new MBeanJMXAdapter(this.system.getDistributedMember());
this.repo = new ManagementResourceRepo();
this.notificationHub = new NotificationHub(repo);
if (system.getConfig().getJmxManager()) {
this.agent = new ManagementAgent(system.getConfig(), cache);
} else {
this.agent = null;
}
ManagementFunction function = new ManagementFunction(notificationHub);
FunctionService.registerFunction(function);
this.proxyListeners = new CopyOnWriteArrayList<>();
}
/**
* This method will initialize all the internal components for Management and Monitoring
*
* It will a)start an JMX connectorServer b) create a notification hub c)register the
* ManagementFunction
*/
private SystemManagementService init() {
try {
this.localManager = new LocalManager(repo, system, this, cache);
this.localManager.startManager();
this.listener = new ManagementMembershipListener(this);
system.getDistributionManager().addMembershipListener(listener);
isStarted = true;
return this;
} catch (CancelException e) {
// Rethrow all CancelExceptions (fix for defect 46339)
throw e;
} catch (Exception e) {
// Wrap all other exceptions as ManagementExceptions
logger.error(e.getMessage(), e);
throw new ManagementException(e);
}
}
/**
* For internal Use only
*/
public LocalManager getLocalManager() {
return localManager;
}
public NotificationHub getNotificationHub() {
return notificationHub;
}
public FederatingManager getFederatingManager() {
return federatingManager;
}
public MBeanJMXAdapter getJMXAdapter() {
return jmxAdapter;
}
public ManagementAgent getManagementAgent() {
return agent;
}
public boolean isStartedAndOpen() {
return isStarted && !closed && system.isConnected();
}
private void verifyManagementService() {
if (!isStarted) {
throw new ManagementException(
"Management Service Not Started Yet");
}
if (!system.isConnected()) {
throw new ManagementException(
"Not Connected To Distributed System");
}
if (closed) {
throw new ManagementException(
"Management Service Is Closed");
}
}
@Override
public void close() {
synchronized (instances) {
if (closed) {
// its a no op, hence not logging any exception
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Closing Management Service");
}
if (listener != null && system.isConnected()) {
system.getDistributionManager().removeMembershipListener(listener);
}
// Stop the Federating Manager first . It will ensure MBeans are not getting federated.
// while un-registering
if (federatingManager != null && federatingManager.isRunning()) {
federatingManager.stopManager();
}
this.notificationHub.cleanUpListeners();
jmxAdapter.cleanJMXResource();
if (localManager.isRunning()) {
localManager.stopManager();
}
if (this.agent != null && this.agent.isRunning()) {
this.agent.stopAgent();
}
getInternalCache().getJmxManagerAdvisor().broadcastChange();
instances.remove(cache);
localManager = null;
closed = true;
}
}
@Override
public <T> void federate(ObjectName objectName, Class<T> interfaceClass,
boolean notificationEmitter) {
verifyManagementService();
if (!objectName.getDomain().equalsIgnoreCase(ManagementConstants.OBJECTNAME__DEFAULTDOMAIN)) {
throw new ManagementException(
"Not A GemFire Domain MBean, can not Federate");
}
if (!jmxAdapter.isRegistered(objectName)) {
throw new ManagementException(
"MBean Not Registered In GemFire Domain");
}
if (notificationEmitter && !jmxAdapter.hasNotificationSupport(objectName)) {
throw new ManagementException(
"MBean Does Not Have Notification Support");
}
// All validation Passed. Now create the federation Component
Object object = jmxAdapter.getMBeanObject(objectName);
FederationComponent fedComp =
new FederationComponent(object, objectName, interfaceClass, notificationEmitter);
if (ManagementAdapter.refreshOnInit.contains(interfaceClass)) {
fedComp.refreshObjectState(true);// Fixes 46387
}
localManager.markForFederation(objectName, fedComp);
if (isManager()) {
afterCreateProxy(objectName, interfaceClass, object, fedComp);
}
}
@Override
public CacheServerMXBean getLocalCacheServerMXBean(int serverPort) {
return jmxAdapter.getClientServiceMXBean(serverPort);
}
@Override
public long getLastUpdateTime(ObjectName objectName) {
if (!isStartedAndOpen()) {
return 0;
}
if (federatingManager == null) {
return 0;
} else if (!federatingManager.isRunning()) {
return 0;
}
if (jmxAdapter.isLocalMBean(objectName)) {
return 0;
}
return federatingManager.getLastUpdateTime(objectName);
}
@Override
public DiskStoreMXBean getLocalDiskStoreMBean(String diskStoreName) {
return jmxAdapter.getLocalDiskStoreMXBean(diskStoreName);
}
@Override
public LockServiceMXBean getLocalLockServiceMBean(String lockServiceName) {
return jmxAdapter.getLocalLockServiceMXBean(lockServiceName);
}
@Override
public RegionMXBean getLocalRegionMBean(String regionPath) {
return jmxAdapter.getLocalRegionMXBean(regionPath);
}
public <T> T getMBeanProxy(ObjectName objectName, Class<T> interfaceClass) {
if (!isStartedAndOpen()) {
return null;
}
if (federatingManager == null) {
return null;
} else if (!federatingManager.isRunning()) {
return null;
}
return federatingManager.findProxy(objectName, interfaceClass);
}
@Override
public MemberMXBean getMemberMXBean() {
return jmxAdapter.getMemberMXBean();
}
@Override
public Set<ObjectName> queryMBeanNames(DistributedMember member) {
if (!isStartedAndOpen()) {
return Collections.emptySet();
}
if (cache.getDistributedSystem().getDistributedMember().equals(member)) {
return jmxAdapter.getLocalGemFireMBean().keySet();
} else {
if (federatingManager == null) {
return Collections.emptySet();
} else if (!federatingManager.isRunning()) {
return Collections.emptySet();
}
return federatingManager.findAllProxies(member);
}
}
@Override
public Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member) {
Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
.collect(Collectors.toSet());
}
@Override
public ObjectName registerMBean(Object object, ObjectName objectName) {
verifyManagementService();
return jmxAdapter.registerMBean(object, objectName, false);
}
public ObjectName registerInternalMBean(Object object, ObjectName objectName) {
verifyManagementService();
return jmxAdapter.registerMBean(object, objectName, true);
}
@Override
public void unregisterMBean(ObjectName objectName) {
if (!isStartedAndOpen()) {
return;
}
verifyManagementService();
if (isManager()) {
FederationComponent removedObj = localManager.getFedComponents().get(objectName);
if (removedObj != null) { // only for MBeans local to Manager , not
// proxies
afterRemoveProxy(objectName, removedObj.getInterfaceClass(), removedObj.getMBeanObject(),
removedObj);
}
}
jmxAdapter.unregisterMBean(objectName);
localManager.unMarkForFederation(objectName);
}
@Override
public boolean isManager() {
return isManagerCreated() && federatingManager.isRunning();
}
public boolean isManagerCreated() {
return isStartedAndOpen() && federatingManager != null;
}
@Override
public void startManager() {
if (!getInternalCache().getInternalDistributedSystem().getConfig().getJmxManager()) {
// fix for 45900
throw new ManagementException(
"Could not start the manager because the gemfire property \"jmx-manager\" is false.");
}
synchronized (instances) {
verifyManagementService();
if (federatingManager != null && federatingManager.isRunning()) {
throw new AlreadyRunningException(
"Manager is already running");
}
boolean needsToBeStarted = false;
if (!isManagerCreated()) {
createManager();
needsToBeStarted = true;
} else if (!federatingManager.isRunning()) {
needsToBeStarted = true;
}
if (needsToBeStarted) {
boolean started = false;
try {
system.handleResourceEvent(ResourceEvent.MANAGER_START, null);
federatingManager.startManager();
if (this.agent != null) {
this.agent.startAgent();
}
getInternalCache().getJmxManagerAdvisor().broadcastChange();
started = true;
} catch (RuntimeException | Error e) {
logger.error("Jmx manager could not be started because {}", e.getMessage(), e);
throw e;
} finally {
if (!started) {
if (federatingManager != null) {
federatingManager.stopManager();
}
system.handleResourceEvent(ResourceEvent.MANAGER_STOP, null);
}
}
}
}
}
private InternalCache getInternalCache() {
return this.cache;
}
/**
* Creates a Manager instance in stopped state.
*/
public boolean createManager() {
synchronized (instances) {
if (federatingManager != null) {
return false;
}
system.handleResourceEvent(ResourceEvent.MANAGER_CREATE, null);
// An initialised copy of federating manager
federatingManager = new FederatingManager(jmxAdapter, repo, system, this, cache);
getInternalCache().getJmxManagerAdvisor().broadcastChange();
return true;
}
}
/**
* It will stop the federating Manager and restart the Local cache operation
*/
@Override
public void stopManager() {
synchronized (instances) {
verifyManagementService();
if (federatingManager != null) {
federatingManager.stopManager();
system.handleResourceEvent(ResourceEvent.MANAGER_STOP, null);
getInternalCache().getJmxManagerAdvisor().broadcastChange();
if (this.agent != null && this.agent.isRunning()) {
this.agent.stopAgent();
}
}
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public DistributedLockServiceMXBean getDistributedLockServiceMXBean(String lockServiceName) {
return jmxAdapter.getDistributedLockServiceMXBean(lockServiceName);
}
@Override
public DistributedRegionMXBean getDistributedRegionMXBean(String regionName) {
return jmxAdapter.getDistributedRegionMXBean(regionName);
}
@Override
public DistributedSystemMXBean getDistributedSystemMXBean() {
return jmxAdapter.getDistributedSystemMXBean();
}
public void addProxyListener(ProxyListener listener) {
this.proxyListeners.add(listener);
}
public void removeProxyListener(ProxyListener listener) {
this.proxyListeners.remove(listener);
}
public List<ProxyListener> getProxyListeners() {
return this.proxyListeners;
}
@Override
public ManagerMXBean getManagerMXBean() {
return jmxAdapter.getManagerMXBean();
}
@Override
public ObjectName getCacheServerMBeanName(int serverPort, DistributedMember member) {
return MBeanJMXAdapter.getClientServiceMBeanName(serverPort, member);
}
@Override
public ObjectName getDiskStoreMBeanName(DistributedMember member, String diskName) {
return MBeanJMXAdapter.getDiskStoreMBeanName(member, diskName);
}
@Override
public ObjectName getDistributedLockServiceMBeanName(String lockService) {
return MBeanJMXAdapter.getDistributedLockServiceName(lockService);
}
@Override
public ObjectName getDistributedRegionMBeanName(String regionPath) {
return MBeanJMXAdapter.getDistributedRegionMbeanName(regionPath);
}
@Override
public ObjectName getDistributedSystemMBeanName() {
return MBeanJMXAdapter.getDistributedSystemName();
}
@Override
public ObjectName getGatewayReceiverMBeanName(DistributedMember member) {
return MBeanJMXAdapter.getGatewayReceiverMBeanName(member);
}
@Override
public ObjectName getGatewaySenderMBeanName(DistributedMember member, String gatewaySenderId) {
return MBeanJMXAdapter.getGatewaySenderMBeanName(member, gatewaySenderId);
}
@Override
public ObjectName getAsyncEventQueueMBeanName(DistributedMember member, String queueId) {
return MBeanJMXAdapter.getAsyncEventQueueMBeanName(member, queueId);
}
@Override
public ObjectName getLockServiceMBeanName(DistributedMember member, String lockServiceName) {
return MBeanJMXAdapter.getLockServiceMBeanName(member, lockServiceName);
}
@Override
public ObjectName getManagerMBeanName() {
return MBeanJMXAdapter.getManagerName();
}
@Override
public ObjectName getMemberMBeanName(DistributedMember member) {
return MBeanJMXAdapter.getMemberMBeanName(member);
}
@Override
public ObjectName getRegionMBeanName(DistributedMember member, String regionPath) {
return MBeanJMXAdapter.getRegionMBeanName(member, regionPath);
}
@Override
public GatewayReceiverMXBean getLocalGatewayReceiverMXBean() {
return jmxAdapter.getGatewayReceiverMXBean();
}
@Override
public GatewaySenderMXBean getLocalGatewaySenderMXBean(String senderId) {
return jmxAdapter.getGatewaySenderMXBean(senderId);
}
@Override
public AsyncEventQueueMXBean getLocalAsyncEventQueueMXBean(String queueId) {
return jmxAdapter.getAsyncEventQueueMXBean(queueId);
}
@Override
public ObjectName getLocatorMBeanName(DistributedMember member) {
return MBeanJMXAdapter.getLocatorMBeanName(member);
}
@Override
public LocatorMXBean getLocalLocatorMXBean() {
return jmxAdapter.getLocatorMXBean();
}
public boolean afterCreateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
FederationComponent newVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterCreateProxy(objectName, interfaceClass, proxyObject, newVal);
}
return true;
}
public boolean afterPseudoCreateProxy(ObjectName objectName, Class interfaceClass,
Object proxyObject, FederationComponent newVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterPseudoCreateProxy(objectName, interfaceClass, proxyObject, newVal);
}
return true;
}
public boolean afterRemoveProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
FederationComponent oldVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterRemoveProxy(objectName, interfaceClass, proxyObject, oldVal);
}
return true;
}
public boolean afterUpdateProxy(ObjectName objectName, Class interfaceClass, Object proxyObject,
FederationComponent newVal, FederationComponent oldVal) {
for (ProxyListener listener : proxyListeners) {
listener.afterUpdateProxy(objectName, interfaceClass, proxyObject, newVal, oldVal);
}
return true;
}
public void handleNotification(Notification notification) {
for (ProxyListener listener : proxyListeners) {
listener.handleNotification(notification);
}
}
@Override
public <T> T getMBeanInstance(ObjectName objectName, Class<T> interfaceClass) {
if (jmxAdapter.isLocalMBean(objectName)) {
return jmxAdapter.findMBeanByName(objectName, interfaceClass);
} else {
return this.getMBeanProxy(objectName, interfaceClass);
}
}
public void logFine(String s) {
if (logger.isDebugEnabled()) {
logger.debug(s);
}
}
public void memberJoined(InternalDistributedMember id) {
for (ProxyListener listener : proxyListeners) {
listener.memberJoined(system.getDistributionManager(), id);
}
}
public void memberDeparted(InternalDistributedMember id, boolean crashed) {
for (ProxyListener listener : proxyListeners) {
listener.memberDeparted(system.getDistributionManager(), id, crashed);
}
}
public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
String reason) {
for (ProxyListener listener : proxyListeners) {
listener.memberSuspect(system.getDistributionManager(), id, whoSuspected, reason);
}
}
public void quorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
for (ProxyListener listener : proxyListeners) {
listener.quorumLost(system.getDistributionManager(), failures, remaining);
}
}
public static class UniversalListenerContainer {
private List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
public void memberJoined(InternalDistributedMember id) {
MembershipEvent event = createEvent(id);
for (MembershipListener listener : membershipListeners) {
try {
listener.memberJoined(event);
} catch (Exception e) {
logger.error("Could not invoke listener event memberJoined for listener[{}] due to ",
listener.getClass(), e.getMessage(), e);
}
}
}
public void memberDeparted(InternalDistributedMember id, boolean crashed) {
MembershipEvent event = createEvent(id);
if (!crashed) {
for (MembershipListener listener : membershipListeners) {
try {
listener.memberLeft(event);
} catch (Exception e) {
logger.error("Could not invoke listener event memberLeft for listener[{}] due to ",
listener.getClass(), e.getMessage(), e);
}
}
} else {
for (MembershipListener listener : membershipListeners) {
try {
listener.memberCrashed(event);
} catch (Exception e) {
logger.error("Could not invoke listener event memberCrashed for listener[{}] due to ",
listener.getClass(), e.getMessage(), e);
}
}
}
}
private MembershipEvent createEvent(InternalDistributedMember id) {
final String memberId = id.getId();
final DistributedMember member = id;
return new MembershipEvent() {
@Override
public String getMemberId() {
return memberId;
}
@Override
public DistributedMember getDistributedMember() {
return member;
}
};
}
/**
* Registers a listener that receives call backs when a member joins or leaves the distributed
* system.
*/
public void addMembershipListener(MembershipListener listener) {
membershipListeners.add(listener);
}
/**
* Unregisters a membership listener
*
* @see #addMembershipListener
*/
public void removeMembershipListener(MembershipListener listener) {
membershipListeners.remove(listener);
}
}
public UniversalListenerContainer getUniversalListenerContainer() {
return universalListenerContainer;
}
@Override
public void addMembershipListener(MembershipListener listener) {
universalListenerContainer.addMembershipListener(listener);
}
@Override
public void removeMembershipListener(MembershipListener listener) {
universalListenerContainer.removeMembershipListener(listener);
}
}