| package org.apache.helix.zookeeper.zkclient.metric; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import javax.management.JMException; |
| import javax.management.MBeanAttributeInfo; |
| import javax.management.MalformedObjectNameException; |
| import javax.management.ObjectName; |
| |
| import org.apache.helix.monitoring.mbeans.MBeanRegistrar; |
| import org.apache.helix.monitoring.mbeans.MonitorDomainNames; |
| import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; |
| import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; |
| import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; |
| import org.apache.helix.monitoring.mbeans.exception.MetricException; |
| import org.apache.helix.zookeeper.zkclient.ZkEventThread; |
| |
| |
| public class ZkClientMonitor extends DynamicMBeanProvider { |
| |
| public static final String MONITOR_TYPE = "Type"; |
| public static final String MONITOR_KEY = "Key"; |
| protected static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor"; |
| |
| public enum AccessType { |
| READ, WRITE |
| } |
| |
| private String _sensorName; |
| private String _monitorType; |
| private String _monitorKey; |
| private String _monitorInstanceName; |
| private boolean _monitorRootOnly; |
| |
| private SimpleDynamicMetric<Long> _stateChangeEventCounter; |
| private SimpleDynamicMetric<Long> _expiredSessionCounter; |
| private SimpleDynamicMetric<Long> _dataChangeEventCounter; |
| private SimpleDynamicMetric<Long> _outstandingRequestGauge; |
| private SimpleDynamicMetric<Long> _znodeCompressCounter; |
| |
| private ZkThreadMetric _zkEventThreadMetric; |
| |
| private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> _zkClientPathMonitorMap = |
| new ConcurrentHashMap<>(); |
| |
| public ZkClientMonitor(String monitorType, String monitorKey, String monitorInstanceName, |
| boolean monitorRootOnly, ZkEventThread zkEventThread) { |
| if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || monitorType |
| .isEmpty()) { |
| throw new MetricException("Cannot create ZkClientMonitor without monitor key and type."); |
| } |
| |
| _sensorName = |
| String.format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey); |
| _monitorType = monitorType; |
| _monitorKey = monitorKey; |
| _monitorInstanceName = monitorInstanceName; |
| _monitorRootOnly = monitorRootOnly; |
| |
| _stateChangeEventCounter = new SimpleDynamicMetric("StateChangeEventCounter", 0l); |
| _expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter", 0l); |
| _dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l); |
| _outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l); |
| _znodeCompressCounter = new SimpleDynamicMetric("CompressedZnodeWriteCounter", 0l); |
| |
| if (zkEventThread != null) { |
| boolean result = setAndInitZkEventThreadMonitor(zkEventThread); |
| if (!result) { |
| _logger.error("register zkEventThreadMonitor failed due to an existing one."); |
| } |
| } |
| |
| for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values()) { |
| // If monitor root path only, check if the current path is Root. |
| // Otherwise, add monitors for every path. |
| if (!_monitorRootOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) { |
| _zkClientPathMonitorMap.put(path, |
| new ZkClientPathMonitor(path, _monitorType, _monitorKey, _monitorInstanceName) |
| ); |
| } |
| } |
| } |
| |
| public static ObjectName getObjectName(String monitorType, String monitorKey, |
| String monitorInstanceName) throws MalformedObjectNameException { |
| return MBeanRegistrar |
| .buildObjectName(MonitorDomainNames.HelixZkClient.name(), MONITOR_TYPE, monitorType, |
| MONITOR_KEY, |
| (monitorKey + (monitorInstanceName == null ? "" : "." + monitorInstanceName))); |
| } |
| |
| public synchronized boolean setAndInitZkEventThreadMonitor(ZkEventThread zkEventThread) { |
| if (_zkEventThreadMetric == null) { |
| _zkEventThreadMetric = new ZkThreadMetric(zkEventThread); |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public DynamicMBeanProvider register() throws JMException { |
| List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); |
| attributeList.add(_dataChangeEventCounter); |
| attributeList.add(_outstandingRequestGauge); |
| attributeList.add(_stateChangeEventCounter); |
| attributeList.add(_expiredSessionCounter); |
| attributeList.add(_znodeCompressCounter); |
| if (_zkEventThreadMetric != null) { |
| attributeList.add(_zkEventThreadMetric); |
| } |
| doRegister(attributeList, MBEAN_DESCRIPTION, |
| getObjectName(_monitorType, _monitorKey, _monitorInstanceName)); |
| _zkClientPathMonitorMap.values().stream().forEach( monitor -> { |
| if (monitor != null) { |
| try { |
| monitor.register(); |
| } catch (JMException e) { |
| _logger.error(" {} failed registration", monitor, e); |
| } |
| } |
| }); |
| return this; |
| } |
| |
| /** |
| * After unregistered, the MBean can't be registered again, a new monitor has be to created. |
| */ |
| public void unregister() { |
| super.unregister(); |
| for (ZkClientPathMonitor zkClientPathMonitor : _zkClientPathMonitorMap.values()) { |
| zkClientPathMonitor.unregister(); |
| } |
| } |
| |
| @Override |
| public String getSensorName() { |
| return _sensorName; |
| } |
| |
| public void increaseStateChangeEventCounter() { |
| synchronized (_stateChangeEventCounter) { |
| _stateChangeEventCounter.updateValue(_stateChangeEventCounter.getValue() + 1); |
| } |
| } |
| |
| public void increasExpiredSessionCounter() { |
| synchronized (_expiredSessionCounter) { |
| _expiredSessionCounter.updateValue(_expiredSessionCounter.getValue() + 1); |
| } |
| } |
| |
| public void increaseDataChangeEventCounter() { |
| synchronized (_dataChangeEventCounter) { |
| _dataChangeEventCounter.updateValue(_dataChangeEventCounter.getValue() + 1); |
| } |
| } |
| |
| public void increaseOutstandingRequestGauge() { |
| synchronized (_outstandingRequestGauge) { |
| _outstandingRequestGauge.updateValue(_outstandingRequestGauge.getValue() + 1); |
| } |
| } |
| |
| public void decreaseOutstandingRequestGauge() { |
| synchronized (_outstandingRequestGauge) { |
| _outstandingRequestGauge.updateValue(_outstandingRequestGauge.getValue() - 1); |
| } |
| } |
| |
| public void increaseZnodeCompressCounter() { |
| synchronized (_znodeCompressCounter) { |
| _znodeCompressCounter.updateValue(_znodeCompressCounter.getValue() + 1); |
| } |
| } |
| |
| public void recordDataPropagationLatency(String path, long latencyMilliSec) { |
| if (null == path) { |
| return; |
| } |
| Arrays.stream(ZkClientPathMonitor.PredefinedPath.values()) |
| .filter(predefinedPath -> predefinedPath.match(path)) |
| .forEach(predefinedPath -> { |
| ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath); |
| if (zkClientPathMonitor != null) { |
| zkClientPathMonitor.recordDataPropagationLatency(latencyMilliSec); |
| } |
| }); |
| } |
| |
| private void record(String path, int bytes, long latencyMilliSec, boolean isFailure, |
| boolean isRead) { |
| if (null == path) { |
| return; |
| } |
| Arrays.stream(ZkClientPathMonitor.PredefinedPath.values()) |
| .filter(predefinedPath -> predefinedPath.match(path)) |
| .forEach(predefinedPath -> { |
| ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath); |
| if (zkClientPathMonitor != null) { |
| zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead); |
| } |
| }); |
| } |
| |
| public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) { |
| switch (accessType) { |
| case READ: |
| record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, true); |
| return; |
| case WRITE: |
| record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, false); |
| return; |
| default: |
| return; |
| } |
| } |
| |
| public void recordFailure(String path, AccessType accessType) { |
| switch (accessType) { |
| case READ: |
| record(path, 0, 0, true, true); |
| return; |
| case WRITE: |
| record(path, 0, 0, true, false); |
| return; |
| default: |
| return; |
| } |
| } |
| |
| /** |
| * Records metrics for async operations |
| */ |
| private void recordAsync(String path, int bytes, long latencyMilliSec, boolean isFailure, |
| AccessType accessType) { |
| if (null == path) { |
| return; |
| } |
| Arrays.stream(ZkClientPathMonitor.PredefinedPath.values()) |
| .filter(predefinedPath -> predefinedPath.match(path)) |
| .forEach(predefinedPath -> { |
| ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath); |
| if (zkClientPathMonitor != null) { |
| zkClientPathMonitor.recordAsync(bytes, latencyMilliSec, isFailure, accessType); |
| } |
| }); |
| } |
| |
| public void recordAsync(String path, int dataSize, long startTimeMilliSec, AccessType accessType) { |
| recordAsync(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, accessType); |
| } |
| |
| public void recordAsyncFailure(String path, AccessType accessType) { |
| recordAsync(path, 0, 0, true, accessType); |
| } |
| |
| class ZkThreadMetric extends DynamicMetric<ZkEventThread, ZkEventThread> { |
| public ZkThreadMetric(ZkEventThread eventThread) { |
| super("ZkEventThead", eventThread); |
| } |
| |
| @Override |
| protected Set<MBeanAttributeInfo> generateAttributeInfos(String metricName, |
| ZkEventThread eventThread) { |
| Set<MBeanAttributeInfo> attributeInfoSet = new HashSet<>(); |
| attributeInfoSet.add(new MBeanAttributeInfo("PendingCallbackGauge", Long.TYPE.getName(), |
| DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false)); |
| attributeInfoSet.add(new MBeanAttributeInfo("TotalCallbackCounter", Long.TYPE.getName(), |
| DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false)); |
| attributeInfoSet.add( |
| new MBeanAttributeInfo("TotalCallbackHandledCounter", Long.TYPE.getName(), |
| DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false)); |
| return attributeInfoSet; |
| } |
| |
| @Override |
| public Object getAttributeValue(String attributeName) { |
| switch (attributeName) { |
| case "PendingCallbackGauge": |
| return getMetricObject().getPendingEventsCount(); |
| case "TotalCallbackCounter": |
| return getMetricObject().getTotalEventCount(); |
| case "TotalCallbackHandledCounter": |
| return getMetricObject().getTotalHandledEventCount(); |
| default: |
| throw new MetricException("Unknown attribute name: " + attributeName); |
| } |
| } |
| |
| @Override |
| public void updateValue(ZkEventThread newEventThread) { |
| setMetricObject(newEventThread); |
| } |
| } |
| } |