| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.feed.jmx; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.management.Notification; |
| import javax.management.NotificationFilter; |
| import javax.management.NotificationListener; |
| import javax.management.ObjectName; |
| |
| import org.apache.brooklyn.api.entity.EntityLocal; |
| import org.apache.brooklyn.config.ConfigKey; |
| import org.apache.brooklyn.core.config.ConfigKeys; |
| import org.apache.brooklyn.core.feed.AbstractFeed; |
| import org.apache.brooklyn.core.feed.AttributePollHandler; |
| import org.apache.brooklyn.core.feed.DelegatingPollHandler; |
| import org.apache.brooklyn.core.feed.PollHandler; |
| import org.apache.brooklyn.core.feed.Poller; |
| import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.SetMultimap; |
| import com.google.common.collect.Sets; |
| import com.google.common.reflect.TypeToken; |
| |
| |
| /** |
| * Provides a feed of attribute values, by polling or subscribing over jmx. |
| * |
| * Example usage (e.g. in an entity that extends {@link SoftwareProcessImpl}): |
| * <pre> |
| * {@code |
| * private JmxFeed feed; |
| * |
| * //@Override |
| * protected void connectSensors() { |
| * super.connectSensors(); |
| * |
| * feed = JmxFeed.builder() |
| * .entity(this) |
| * .period(500, TimeUnit.MILLISECONDS) |
| * .pollAttribute(new JmxAttributePollConfig<Integer>(ERROR_COUNT) |
| * .objectName(requestProcessorMbeanName) |
| * .attributeName("errorCount")) |
| * .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) |
| * .objectName(serverMbeanName) |
| * .attributeName("Started") |
| * .onError(Functions.constant(false))) |
| * .build(); |
| * } |
| * |
| * {@literal @}Override |
| * protected void disconnectSensors() { |
| * super.disconnectSensors(); |
| * if (feed != null) feed.stop(); |
| * } |
| * } |
| * </pre> |
| * |
| * @author aled |
| */ |
| public class JmxFeed extends AbstractFeed { |
| |
| public static final Logger log = LoggerFactory.getLogger(JmxFeed.class); |
| |
| public static final long JMX_CONNECTION_TIMEOUT_MS = 120*1000; |
| |
| public static final ConfigKey<JmxHelper> HELPER = ConfigKeys.newConfigKey(JmxHelper.class, "helper"); |
| public static final ConfigKey<Boolean> OWN_HELPER = ConfigKeys.newBooleanConfigKey("ownHelper"); |
| public static final ConfigKey<String> JMX_URI = ConfigKeys.newStringConfigKey("jmxUri"); |
| public static final ConfigKey<Long> JMX_CONNECTION_TIMEOUT = ConfigKeys.newLongConfigKey("jmxConnectionTimeout"); |
| |
| @SuppressWarnings("serial") |
| public static final ConfigKey<SetMultimap<String, JmxAttributePollConfig<?>>> ATTRIBUTE_POLLS = ConfigKeys.newConfigKey( |
| new TypeToken<SetMultimap<String, JmxAttributePollConfig<?>>>() {}, |
| "attributePolls"); |
| |
| @SuppressWarnings("serial") |
| public static final ConfigKey<SetMultimap<List<?>, JmxOperationPollConfig<?>>> OPERATION_POLLS = ConfigKeys.newConfigKey( |
| new TypeToken<SetMultimap<List<?>, JmxOperationPollConfig<?>>>() {}, |
| "operationPolls"); |
| |
| @SuppressWarnings("serial") |
| public static final ConfigKey<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>> NOTIFICATION_SUBSCRIPTIONS = ConfigKeys.newConfigKey( |
| new TypeToken<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>>() {}, |
| "notificationPolls"); |
| |
| public static Builder builder() { |
| return new Builder(); |
| } |
| |
| public static class Builder { |
| private EntityLocal entity; |
| private JmxHelper helper; |
| private long jmxConnectionTimeout = JMX_CONNECTION_TIMEOUT_MS; |
| private long period = 500; |
| private TimeUnit periodUnits = TimeUnit.MILLISECONDS; |
| private List<JmxAttributePollConfig<?>> attributePolls = Lists.newArrayList(); |
| private List<JmxOperationPollConfig<?>> operationPolls = Lists.newArrayList(); |
| private List<JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = Lists.newArrayList(); |
| private String uniqueTag; |
| private volatile boolean built; |
| |
| public Builder entity(EntityLocal val) { |
| this.entity = val; |
| return this; |
| } |
| public Builder helper(JmxHelper val) { |
| this.helper = val; |
| return this; |
| } |
| public Builder period(Duration duration) { |
| return period(duration.toMilliseconds(), TimeUnit.MILLISECONDS); |
| } |
| public Builder period(long millis) { |
| return period(millis, TimeUnit.MILLISECONDS); |
| } |
| public Builder period(long val, TimeUnit units) { |
| this.period = val; |
| this.periodUnits = units; |
| return this; |
| } |
| public Builder pollAttribute(JmxAttributePollConfig<?> config) { |
| attributePolls.add(config); |
| return this; |
| } |
| public Builder pollOperation(JmxOperationPollConfig<?> config) { |
| operationPolls.add(config); |
| return this; |
| } |
| public Builder subscribeToNotification(JmxNotificationSubscriptionConfig<?> config) { |
| notificationSubscriptions.add(config); |
| return this; |
| } |
| public Builder uniqueTag(String uniqueTag) { |
| this.uniqueTag = uniqueTag; |
| return this; |
| } |
| public JmxFeed build() { |
| built = true; |
| JmxFeed result = new JmxFeed(this); |
| result.setEntity(checkNotNull(entity, "entity")); |
| result.start(); |
| return result; |
| } |
| @Override |
| protected void finalize() { |
| if (!built) log.warn("JmxFeed.Builder created, but build() never called"); |
| } |
| } |
| |
| private final SetMultimap<ObjectName, NotificationListener> notificationListeners = HashMultimap.create(); |
| |
| /** |
| * For rebind; do not call directly; use builder |
| */ |
| public JmxFeed() { |
| } |
| |
| protected JmxFeed(Builder builder) { |
| super(); |
| if (builder.helper != null) { |
| JmxHelper helper = builder.helper; |
| setConfig(HELPER, helper); |
| setConfig(OWN_HELPER, false); |
| setConfig(JMX_URI, helper.getUrl()); |
| } |
| setConfig(JMX_CONNECTION_TIMEOUT, builder.jmxConnectionTimeout); |
| |
| SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = HashMultimap.<String,JmxAttributePollConfig<?>>create(); |
| for (JmxAttributePollConfig<?> config : builder.attributePolls) { |
| if (!config.isEnabled()) continue; |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| JmxAttributePollConfig<?> configCopy = new JmxAttributePollConfig(config); |
| if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); |
| attributePolls.put(configCopy.getObjectName().getCanonicalName() + configCopy.getAttributeName(), configCopy); |
| } |
| setConfig(ATTRIBUTE_POLLS, attributePolls); |
| |
| SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = HashMultimap.<List<?>,JmxOperationPollConfig<?>>create(); |
| for (JmxOperationPollConfig<?> config : builder.operationPolls) { |
| if (!config.isEnabled()) continue; |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| JmxOperationPollConfig<?> configCopy = new JmxOperationPollConfig(config); |
| if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); |
| operationPolls.put(configCopy.buildOperationIdentity(), configCopy); |
| } |
| setConfig(OPERATION_POLLS, operationPolls); |
| |
| SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = HashMultimap.create(); |
| for (JmxNotificationSubscriptionConfig<?> config : builder.notificationSubscriptions) { |
| if (!config.isEnabled()) continue; |
| notificationSubscriptions.put(config.getNotificationFilter(), config); |
| } |
| setConfig(NOTIFICATION_SUBSCRIPTIONS, notificationSubscriptions); |
| initUniqueTag(builder.uniqueTag, attributePolls, operationPolls, notificationSubscriptions); |
| } |
| |
| @Override |
| public void setEntity(EntityLocal entity) { |
| if (getConfig(HELPER) == null) { |
| JmxHelper helper = new JmxHelper(entity); |
| setConfig(HELPER, helper); |
| setConfig(OWN_HELPER, true); |
| setConfig(JMX_URI, helper.getUrl()); |
| } |
| super.setEntity(entity); |
| } |
| |
| public String getJmxUri() { |
| return getConfig(JMX_URI); |
| } |
| |
| protected JmxHelper getHelper() { |
| return getConfig(HELPER); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected Poller<Object> getPoller() { |
| return (Poller<Object>) super.getPoller(); |
| } |
| |
| @Override |
| protected boolean isConnected() { |
| return super.isConnected() && getHelper().isConnected(); |
| } |
| |
| @Override |
| protected void preStart() { |
| /* |
| * All actions on the JmxHelper are done async (through the poller's threading) so we don't |
| * block on start/rebind if the entity is unreachable |
| * (without this we get a 120s pause in JmxHelper.connect restarting) |
| */ |
| final SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = getConfig(NOTIFICATION_SUBSCRIPTIONS); |
| final SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = getConfig(OPERATION_POLLS); |
| final SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = getConfig(ATTRIBUTE_POLLS); |
| |
| getPoller().submit(new Callable<Void>() { |
| public Void call() { |
| getHelper().connect(getConfig(JMX_CONNECTION_TIMEOUT)); |
| return null; |
| } |
| @Override public String toString() { return "Connect JMX "+getHelper().getUrl(); } |
| }); |
| |
| for (final NotificationFilter filter : notificationSubscriptions.keySet()) { |
| getPoller().submit(new Callable<Void>() { |
| public Void call() { |
| // TODO Could config.getObjectName have wildcards? Is this code safe? |
| Set<JmxNotificationSubscriptionConfig<?>> configs = notificationSubscriptions.get(filter); |
| NotificationListener listener = registerNotificationListener(configs); |
| ObjectName objectName = Iterables.get(configs, 0).getObjectName(); |
| notificationListeners.put(objectName, listener); |
| return null; |
| } |
| @Override public String toString() { return "Register JMX notifications: "+notificationSubscriptions.get(filter); } |
| }); |
| } |
| |
| // Setup polling of sensors |
| for (final String jmxAttributeName : attributePolls.keys()) { |
| registerAttributePoller(attributePolls.get(jmxAttributeName)); |
| } |
| |
| // Setup polling of operations |
| for (final List<?> operationIdentifier : operationPolls.keys()) { |
| registerOperationPoller(operationPolls.get(operationIdentifier)); |
| } |
| } |
| |
| @Override |
| protected void preStop() { |
| super.preStop(); |
| |
| for (Map.Entry<ObjectName, NotificationListener> entry : notificationListeners.entries()) { |
| unregisterNotificationListener(entry.getKey(), entry.getValue()); |
| } |
| notificationListeners.clear(); |
| } |
| |
| @Override |
| protected void postStop() { |
| super.postStop(); |
| JmxHelper helper = getHelper(); |
| Boolean ownHelper = getConfig(OWN_HELPER); |
| if (helper != null && ownHelper) helper.terminate(); |
| } |
| |
| /** |
| * Registers to poll a jmx-operation for an ObjectName, where all the given configs are for the same ObjectName + operation + parameters. |
| */ |
| private void registerOperationPoller(Set<JmxOperationPollConfig<?>> configs) { |
| Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet(); |
| long minPeriod = Integer.MAX_VALUE; |
| |
| final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); |
| final String operationName = Iterables.get(configs, 0).getOperationName(); |
| final List<String> signature = Iterables.get(configs, 0).getSignature(); |
| final List<?> params = Iterables.get(configs, 0).getParams(); |
| |
| for (JmxOperationPollConfig<?> config : configs) { |
| handlers.add(new AttributePollHandler<Object>(config, getEntity(), this)); |
| if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); |
| } |
| |
| getPoller().scheduleAtFixedRate( |
| new Callable<Object>() { |
| public Object call() throws Exception { |
| if (log.isDebugEnabled()) log.debug("jmx operation polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), operationName}); |
| if (signature.size() == params.size()) { |
| return getHelper().operation(objectName, operationName, signature, params); |
| } else { |
| return getHelper().operation(objectName, operationName, params.toArray()); |
| } |
| } |
| }, |
| new DelegatingPollHandler<Object>(handlers), minPeriod); |
| } |
| |
| /** |
| * Registers to poll a jmx-attribute for an ObjectName, where all the given configs are for that same ObjectName + attribute. |
| */ |
| private void registerAttributePoller(Set<JmxAttributePollConfig<?>> configs) { |
| Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet(); |
| long minPeriod = Integer.MAX_VALUE; |
| |
| final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); |
| final String jmxAttributeName = Iterables.get(configs, 0).getAttributeName(); |
| |
| for (JmxAttributePollConfig<?> config : configs) { |
| handlers.add(new AttributePollHandler<Object>(config, getEntity(), this)); |
| if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); |
| } |
| |
| // TODO Not good calling this holding the synchronization lock |
| getPoller().scheduleAtFixedRate( |
| new Callable<Object>() { |
| public Object call() throws Exception { |
| if (log.isTraceEnabled()) log.trace("jmx attribute polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), jmxAttributeName}); |
| return getHelper().getAttribute(objectName, jmxAttributeName); |
| } |
| }, |
| new DelegatingPollHandler<Object>(handlers), minPeriod); |
| } |
| |
| /** |
| * Registers to subscribe to notifications for an ObjectName, where all the given configs are for that same ObjectName + filter. |
| */ |
| private NotificationListener registerNotificationListener(Set<JmxNotificationSubscriptionConfig<?>> configs) { |
| final List<AttributePollHandler<? super javax.management.Notification>> handlers = Lists.newArrayList(); |
| |
| final ObjectName objectName = Iterables.get(configs, 0).getObjectName(); |
| final NotificationFilter filter = Iterables.get(configs, 0).getNotificationFilter(); |
| |
| for (final JmxNotificationSubscriptionConfig<?> config : configs) { |
| AttributePollHandler<javax.management.Notification> handler = new AttributePollHandler<javax.management.Notification>(config, getEntity(), this) { |
| @Override protected Object transformValueOnSuccess(javax.management.Notification val) { |
| if (config.getOnNotification() != null) { |
| return config.getOnNotification().apply(val); |
| } else { |
| Object result = super.transformValueOnSuccess(val); |
| if (result instanceof javax.management.Notification) |
| return ((javax.management.Notification)result).getUserData(); |
| return result; |
| } |
| } |
| }; |
| handlers.add(handler); |
| } |
| final PollHandler<javax.management.Notification> compoundHandler = new DelegatingPollHandler<javax.management.Notification>(handlers); |
| |
| NotificationListener listener = new NotificationListener() { |
| @Override public void handleNotification(Notification notification, Object handback) { |
| compoundHandler.onSuccess(notification); |
| } |
| }; |
| getHelper().addNotificationListener(objectName, listener, filter); |
| |
| return listener; |
| } |
| |
| private void unregisterNotificationListener(ObjectName objectName, NotificationListener listener) { |
| try { |
| getHelper().removeNotificationListener(objectName, listener); |
| } catch (RuntimeException e) { |
| log.warn("Failed to unregister listener: "+objectName+", "+listener+"; continuing...", e); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "JmxFeed["+(getManagementContext()!=null&&getManagementContext().isRunning()?getJmxUri():"mgmt-not-running")+"]"; |
| } |
| } |