blob: 8ad0a5e66f2fb5fb117dadcb5e2654846ec2f93f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.metric;
import java.io.Serializable;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.DoubleMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.apache.ignite.spi.metric.ReadOnlyMetricManager;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.fromFullName;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.util.IgniteUtils.notifyListeners;
/**
* This manager should provide {@link ReadOnlyMetricManager} for each configured {@link MetricExporterSpi}.
*
* @see MetricExporterSpi
* @see MetricRegistry
*/
public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> implements ReadOnlyMetricManager {
/** */
public static final String ACTIVE_COUNT_DESC = "Approximate number of threads that are actively executing tasks.";
/** */
public static final String COMPLETED_TASK_DESC = "Approximate total number of tasks that have completed execution.";
/** */
public static final String CORE_SIZE_DESC = "The core number of threads.";
/** */
public static final String LARGEST_SIZE_DESC = "Largest number of threads that have ever simultaneously been in the pool.";
/** */
public static final String MAX_SIZE_DESC = "The maximum allowed number of threads.";
/** */
public static final String POOL_SIZE_DESC = "Current number of threads in the pool.";
/** */
public static final String TASK_COUNT_DESC = "Approximate total number of tasks that have been scheduled for execution.";
/** */
public static final String QUEUE_SIZE_DESC = "Current size of the execution queue.";
/** */
public static final String KEEP_ALIVE_TIME_DESC = "Thread keep-alive time, which is the amount of time which threads in excess of " +
"the core pool size may remain idle before being terminated.";
/** */
public static final String IS_SHUTDOWN_DESC = "True if this executor has been shut down.";
/** */
public static final String IS_TERMINATED_DESC = "True if all tasks have completed following shut down.";
/** */
public static final String IS_TERMINATING_DESC = "True if terminating but not yet terminated.";
/** */
public static final String REJ_HND_DESC = "Class name of current rejection handler.";
/** */
public static final String THRD_FACTORY_DESC = "Class name of thread factory used to create new threads.";
/** Group for a thread pools. */
public static final String THREAD_POOLS = "threadPools";
/** Metrics update frequency. */
private static final long METRICS_UPDATE_FREQ = 3000;
/** System metrics prefix. */
public static final String SYS_METRICS = "sys";
/** Ignite node metrics prefix. */
public static final String IGNITE_METRICS = "ignite";
/** Partition map exchange metrics prefix. */
public static final String PME_METRICS = "pme";
/** Transaction metrics prefix. */
public static final String TX_METRICS = "tx";
/** GC CPU load metric name. */
public static final String GC_CPU_LOAD = "GcCpuLoad";
/** GC CPU load metric description. */
public static final String GC_CPU_LOAD_DESCRIPTION = "GC CPU load.";
/** CPU load metric name. */
public static final String CPU_LOAD = "CpuLoad";
/** CPU load metric description. */
public static final String CPU_LOAD_DESCRIPTION = "CPU load.";
/** Up time metric name. */
public static final String UP_TIME = "UpTime";
/** Thread count metric name. */
public static final String THREAD_CNT = "ThreadCount";
/** Peak thread count metric name. */
public static final String PEAK_THREAD_CNT = "PeakThreadCount";
/** Total started thread count metric name. */
public static final String TOTAL_STARTED_THREAD_CNT = "TotalStartedThreadCount";
/** Daemon thread count metric name. */
public static final String DAEMON_THREAD_CNT = "DaemonThreadCount";
/** PME duration metric name. */
public static final String PME_DURATION = "Duration";
/** PME cache operations blocked duration metric name. */
public static final String PME_OPS_BLOCKED_DURATION = "CacheOperationsBlockedDuration";
/** Histogram of PME durations metric name. */
public static final String PME_DURATION_HISTOGRAM = "DurationHistogram";
/** Histogram of blocking PME durations metric name. */
public static final String PME_OPS_BLOCKED_DURATION_HISTOGRAM = "CacheOperationsBlockedDurationHistogram";
/** JVM interface to memory consumption info */
private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
/** */
private static final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
/** */
private static final RuntimeMXBean rt = ManagementFactory.getRuntimeMXBean();
/** */
private static final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
/** */
private static final Collection<GarbageCollectorMXBean> gc = ManagementFactory.getGarbageCollectorMXBeans();
/** Prefix for {@link HitRateMetric} configuration property name. */
public static final String HITRATE_CFG_PREFIX = metricName("metrics", "hitrate");
/** Prefix for {@link HistogramMetric} configuration property name. */
public static final String HISTOGRAM_CFG_PREFIX = metricName("metrics", "histogram");
/** Registered metrics registries. */
private final ConcurrentHashMap<String, ReadOnlyMetricRegistry> registries = new ConcurrentHashMap<>();
/** Metric registry creation listeners. */
private final List<Consumer<ReadOnlyMetricRegistry>> metricRegCreationLsnrs = new CopyOnWriteArrayList<>();
/** Metric registry remove listeners. */
private final List<Consumer<ReadOnlyMetricRegistry>> metricRegRemoveLsnrs = new CopyOnWriteArrayList<>();
/** Read-only metastorage. */
private volatile ReadableDistributedMetaStorage roMetastorage;
/** Metastorage with the write access. */
private volatile DistributedMetaStorage metastorage;
/** Metrics update worker. */
private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
/** GC CPU load. */
private final DoubleMetricImpl gcCpuLoad;
/** CPU load. */
private final DoubleMetricImpl cpuLoad;
/** Heap memory metrics. */
private final MemoryUsageMetrics heap;
/** Nonheap memory metrics. */
private final MemoryUsageMetrics nonHeap;
/**
* @param ctx Kernal context.
*/
public GridMetricManager(GridKernalContext ctx) {
super(ctx, ctx.config().getMetricExporterSpi());
ctx.addNodeAttribute(ATTR_PHY_RAM, totalSysMemory());
heap = new MemoryUsageMetrics(SYS_METRICS, metricName("memory", "heap"));
nonHeap = new MemoryUsageMetrics(SYS_METRICS, metricName("memory", "nonheap"));
heap.update(mem.getHeapMemoryUsage());
nonHeap.update(mem.getNonHeapMemoryUsage());
MetricRegistry sysreg = registry(SYS_METRICS);
gcCpuLoad = sysreg.doubleMetric(GC_CPU_LOAD, GC_CPU_LOAD_DESCRIPTION);
cpuLoad = sysreg.doubleMetric(CPU_LOAD, CPU_LOAD_DESCRIPTION);
sysreg.register("SystemLoadAverage", os::getSystemLoadAverage, Double.class, null);
sysreg.register(UP_TIME, rt::getUptime, null);
sysreg.register(THREAD_CNT, threads::getThreadCount, null);
sysreg.register(PEAK_THREAD_CNT, threads::getPeakThreadCount, null);
sysreg.register(TOTAL_STARTED_THREAD_CNT, threads::getTotalStartedThreadCount, null);
sysreg.register(DAEMON_THREAD_CNT, threads::getDaemonThreadCount, null);
sysreg.register("CurrentThreadCpuTime", threads::getCurrentThreadCpuTime, null);
sysreg.register("CurrentThreadUserTime", threads::getCurrentThreadUserTime, null);
MetricRegistry pmeReg = registry(PME_METRICS);
long[] pmeBounds = new long[] {500, 1000, 5000, 30000};
pmeReg.histogram(PME_DURATION_HISTOGRAM, pmeBounds,
"Histogram of PME durations in milliseconds.");
pmeReg.histogram(PME_OPS_BLOCKED_DURATION_HISTOGRAM, pmeBounds,
"Histogram of cache operations blocked PME durations in milliseconds.");
}
/** {@inheritDoc} */
@Override protected void onKernalStart0() {
metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
for (MetricExporterSpi spi : getSpis())
spi.setMetricRegistry(this);
startSpi();
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(
new DistributedMetastorageLifecycleListener() {
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
roMetastorage = metastorage;
try {
metastorage.iterate(HITRATE_CFG_PREFIX, (name, val) -> onHitRateConfigChanged(
name.substring(HITRATE_CFG_PREFIX.length() + 1), (Long) val));
metastorage.iterate(HISTOGRAM_CFG_PREFIX, (name, val) -> onHistogramConfigChanged(
name.substring(HISTOGRAM_CFG_PREFIX.length() + 1), (long[]) val));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
metastorage.listen(n -> n.startsWith(HITRATE_CFG_PREFIX),
(name, oldVal, newVal) -> onHitRateConfigChanged(
name.substring(HITRATE_CFG_PREFIX.length() + 1), (Long) newVal));
metastorage.listen(n -> n.startsWith(HISTOGRAM_CFG_PREFIX),
(name, oldVal, newVal) -> onHistogramConfigChanged(
name.substring(HISTOGRAM_CFG_PREFIX.length() + 1), (long[]) newVal));
}
/** {@inheritDoc} */
@Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
GridMetricManager.this.metastorage = metastorage;
}
});
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
stopSpi();
// Stop discovery worker and metrics updater.
U.closeQuiet(metricsUpdateTask);
}
/**
* Gets or creates metric registry.
*
* @param name Group name.
* @return Group of metrics.
*/
public MetricRegistry registry(String name) {
return (MetricRegistry)registries.computeIfAbsent(name, n -> {
MetricRegistry mreg = new MetricRegistry(name,
mname -> readFromMetastorage(metricName(HITRATE_CFG_PREFIX, mname)),
mname -> readFromMetastorage(metricName(HISTOGRAM_CFG_PREFIX, mname)),
log);
notifyListeners(mreg, metricRegCreationLsnrs, log);
return mreg;
});
}
/**
* Reads value from {@link #roMetastorage}.
*
* @param key Key.
* @param <T> Key type.
* @return Value or {@code null} if not found.
*/
private <T extends Serializable> T readFromMetastorage(String key) {
if (roMetastorage == null)
return null;
try {
return roMetastorage.read(key);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<ReadOnlyMetricRegistry> iterator() {
return registries.values().iterator();
}
/** {@inheritDoc} */
@Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
metricRegCreationLsnrs.add(lsnr);
}
/** {@inheritDoc} */
@Override public void addMetricRegistryRemoveListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
metricRegRemoveLsnrs.add(lsnr);
}
/**
* Removes metric registry.
*
* @param regName Metric registry name.
*/
public void remove(String regName) {
ReadOnlyMetricRegistry mreg = registries.remove(regName);
if (mreg == null)
return;
notifyListeners(mreg, metricRegRemoveLsnrs, log);
try {
GridCompoundFuture opsFut = new GridCompoundFuture<>();
for (Metric m : mreg) {
if (m instanceof HitRateMetric)
opsFut.add(metastorage.removeAsync(metricName(HITRATE_CFG_PREFIX, m.name())));
else if (m instanceof HistogramMetric)
opsFut.add(metastorage.removeAsync(metricName(HISTOGRAM_CFG_PREFIX, m.name())));
}
opsFut.markInitialized();
opsFut.get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/**
* Change {@link HitRateMetric} configuration if it exists.
*
* @param name Metric name.
* @param rateTimeInterval New rate time interval.
* @throws IgniteCheckedException If write of configuration failed.
* @see HitRateMetric#reset(long, int)
*/
public void configureHitRate(String name, long rateTimeInterval) throws IgniteCheckedException {
A.notNullOrEmpty(name, "name");
A.ensure(rateTimeInterval > 0, "rateTimeInterval should be positive");
A.notNull(metastorage, "Metastorage not ready. Node not started?");
if (ctx.isStopping())
throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
metastorage.write(metricName(HITRATE_CFG_PREFIX, name), rateTimeInterval);
}
/**
* Stores {@link HistogramMetric} configuration in metastorage.
*
* @param name Metric name.
* @param bounds New bounds.
* @throws IgniteCheckedException If write of configuration failed.
*/
public void configureHistogram(String name, long[] bounds) throws IgniteCheckedException {
A.notNullOrEmpty(name, "name");
A.notEmpty(bounds, "bounds");
A.notNull(metastorage, "Metastorage not ready. Node not started?");
if (ctx.isStopping())
throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
metastorage.write(metricName(HISTOGRAM_CFG_PREFIX, name), bounds);
}
/**
* Change {@link HitRateMetric} instance configuration.
*
* @param name Metric name.
* @param rateTimeInterval New rateTimeInterval.
* @see HistogramMetricImpl#reset(long[])
*/
private void onHitRateConfigChanged(String name, @Nullable Long rateTimeInterval) {
if (rateTimeInterval == null)
return;
A.ensure(rateTimeInterval > 0, "rateTimeInterval should be positive");
HitRateMetric m = find(name, HitRateMetric.class);
if (m == null)
return;
m.reset(rateTimeInterval);
}
/**
* Change {@link HistogramMetric} instance configuration.
*
* @param name Metric name.
* @param bounds New bounds.
*/
private void onHistogramConfigChanged(String name, @Nullable long[] bounds) {
if (bounds == null)
return;
HistogramMetricImpl m = find(name, HistogramMetricImpl.class);
if (m == null)
return;
m.reset(bounds);
}
/**
* @param name Metric name.
* @param type Metric type.
* @return Metric.
*/
private <T extends Metric> T find(String name, Class<T> type) {
A.notNull(name, "name");
T2<String, String> splitted = fromFullName(name);
MetricRegistry mreg = (MetricRegistry)registries.get(splitted.get1());
if (mreg == null) {
if (log.isInfoEnabled())
log.info("Metric registry not found[registry=" + splitted.get1() + ']');
return null;
}
Metric m = mreg.findMetric(splitted.get2());
if (m == null) {
if (log.isInfoEnabled())
log.info("Metric not found[registry=" + splitted.get1() + ", metricName=" + splitted.get2() + ']');
return null;
}
if (!m.getClass().isAssignableFrom(type)) {
log.error("Metric '" + name + "' has wrong type[type=" + m.getClass().getSimpleName() + ']');
return null;
}
return (T) m;
}
/**
* Registers all metrics for thread pools.
*
* @param utilityCachePool Utility cache pool.
* @param execSvc Executor service.
* @param svcExecSvc Services' executor service.
* @param sysExecSvc System executor service.
* @param stripedExecSvc Striped executor.
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param dataStreamExecSvc Data stream executor service.
* @param restExecSvc Reset executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
* @param rebalanceExecSvc Rebalance executor service.
* @param rebalanceStripedExecSvc Rebalance striped executor service.
* @param customExecSvcs Custom named executors.
*/
public void registerThreadPools(
ExecutorService utilityCachePool,
ExecutorService execSvc,
ExecutorService svcExecSvc,
ExecutorService sysExecSvc,
StripedExecutor stripedExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
StripedExecutor dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
ExecutorService rebalanceExecSvc,
IgniteStripedThreadPoolExecutor rebalanceStripedExecSvc,
@Nullable final Map<String, ? extends ExecutorService> customExecSvcs
) {
// Executors
monitorExecutor("GridUtilityCacheExecutor", utilityCachePool);
monitorExecutor("GridExecutionExecutor", execSvc);
monitorExecutor("GridServicesExecutor", svcExecSvc);
monitorExecutor("GridSystemExecutor", sysExecSvc);
monitorExecutor("GridClassLoadingExecutor", p2pExecSvc);
monitorExecutor("GridManagementExecutor", mgmtExecSvc);
monitorExecutor("GridIgfsExecutor", igfsExecSvc);
monitorExecutor("GridAffinityExecutor", affExecSvc);
monitorExecutor("GridCallbackExecutor", callbackExecSvc);
monitorExecutor("GridQueryExecutor", qryExecSvc);
monitorExecutor("GridSchemaExecutor", schemaExecSvc);
monitorExecutor("GridRebalanceExecutor", rebalanceExecSvc);
monitorExecutor("GridRebalanceStripedExecutor", rebalanceStripedExecSvc);
monitorStripedPool("GridDataStreamExecutor", dataStreamExecSvc);
if (idxExecSvc != null)
monitorExecutor("GridIndexingExecutor", idxExecSvc);
if (ctx.config().getConnectorConfiguration() != null)
monitorExecutor("GridRestExecutor", restExecSvc);
if (stripedExecSvc != null) {
// Striped executor uses a custom adapter.
monitorStripedPool("StripedExecutor", stripedExecSvc);
}
if (customExecSvcs != null) {
for (Map.Entry<String, ? extends ExecutorService> entry : customExecSvcs.entrySet())
monitorExecutor(entry.getKey(), entry.getValue());
}
}
/**
* Creates a MetricSet for an executor.
*
* @param name Name of the bean to register.
* @param execSvc Executor to register a bean for.
*/
private void monitorExecutor(String name, ExecutorService execSvc) {
MetricRegistry mreg = registry(metricName(THREAD_POOLS, name));
if (execSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
mreg.register("ActiveCount", exec::getActiveCount, ACTIVE_COUNT_DESC);
mreg.register("CompletedTaskCount", exec::getCompletedTaskCount, COMPLETED_TASK_DESC);
mreg.register("CorePoolSize", exec::getCorePoolSize, CORE_SIZE_DESC);
mreg.register("LargestPoolSize", exec::getLargestPoolSize, LARGEST_SIZE_DESC);
mreg.register("MaximumPoolSize", exec::getMaximumPoolSize, MAX_SIZE_DESC);
mreg.register("PoolSize", exec::getPoolSize, POOL_SIZE_DESC);
mreg.register("TaskCount", exec::getTaskCount, TASK_COUNT_DESC);
mreg.register("QueueSize", () -> exec.getQueue().size(), QUEUE_SIZE_DESC);
mreg.register("KeepAliveTime", () -> exec.getKeepAliveTime(MILLISECONDS), KEEP_ALIVE_TIME_DESC);
mreg.register("Shutdown", exec::isShutdown, IS_SHUTDOWN_DESC);
mreg.register("Terminated", exec::isTerminated, IS_TERMINATED_DESC);
mreg.register("Terminating", exec::isTerminating, IS_TERMINATING_DESC);
mreg.register("RejectedExecutionHandlerClass", () -> {
RejectedExecutionHandler hnd = exec.getRejectedExecutionHandler();
return hnd == null ? "" : hnd.getClass().getName();
}, String.class, REJ_HND_DESC);
mreg.register("ThreadFactoryClass", () -> {
ThreadFactory factory = exec.getThreadFactory();
return factory == null ? "" : factory.getClass().getName();
}, String.class, THRD_FACTORY_DESC);
}
else {
mreg.longMetric("ActiveCount", ACTIVE_COUNT_DESC).value(0);
mreg.longMetric("CompletedTaskCount", COMPLETED_TASK_DESC).value(0);
mreg.longMetric("CorePoolSize", CORE_SIZE_DESC).value(0);
mreg.longMetric("LargestPoolSize", LARGEST_SIZE_DESC).value(0);
mreg.longMetric("MaximumPoolSize", MAX_SIZE_DESC).value(0);
mreg.longMetric("PoolSize", POOL_SIZE_DESC).value(0);
mreg.longMetric("TaskCount", TASK_COUNT_DESC);
mreg.longMetric("QueueSize", QUEUE_SIZE_DESC).value(0);
mreg.longMetric("KeepAliveTime", KEEP_ALIVE_TIME_DESC).value(0);
mreg.register("Shutdown", execSvc::isShutdown, IS_SHUTDOWN_DESC);
mreg.register("Terminated", execSvc::isTerminated, IS_TERMINATED_DESC);
mreg.longMetric("Terminating", IS_TERMINATING_DESC);
mreg.objectMetric("RejectedExecutionHandlerClass", String.class, REJ_HND_DESC).value("");
mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC).value("");
}
}
/**
* Creates a MetricSet for an stripped executor.
*
* @param name name of the bean to register
* @param svc Executor.
*/
private void monitorStripedPool(String name, StripedExecutor svc) {
MetricRegistry mreg = registry(metricName(THREAD_POOLS, name));
mreg.register("DetectStarvation",
svc::detectStarvation,
"True if possible starvation in striped pool is detected.");
mreg.register("StripesCount",
svc::stripesCount,
"Stripes count.");
mreg.register("Shutdown",
svc::isShutdown,
"True if this executor has been shut down.");
mreg.register("Terminated",
svc::isTerminated,
"True if all tasks have completed following shut down.");
mreg.register("TotalQueueSize",
svc::queueSize,
"Total queue size of all stripes.");
mreg.register("TotalCompletedTasksCount",
svc::completedTasks,
"Completed tasks count of all stripes.");
mreg.register("StripesCompletedTasksCounts",
svc::stripesCompletedTasks,
long[].class,
"Number of completed tasks per stripe.");
mreg.register("ActiveCount",
svc::activeStripesCount,
"Number of active tasks of all stripes.");
mreg.register("StripesActiveStatuses",
svc::stripesActiveStatuses,
boolean[].class,
"Number of active tasks per stripe.");
mreg.register("StripesQueueSizes",
svc::stripesQueueSizes,
int[].class,
"Size of queue per stripe.");
}
/**
* @return Memory usage of non-heap memory.
*/
public MemoryUsage nonHeapMemoryUsage() {
// Workaround of exception in WebSphere.
// We received the following exception:
// java.lang.IllegalArgumentException: used value cannot be larger than the committed value
// at java.lang.management.MemoryUsage.<init>(MemoryUsage.java:105)
// at com.ibm.lang.management.MemoryMXBeanImpl.getNonHeapMemoryUsageImpl(Native Method)
// at com.ibm.lang.management.MemoryMXBeanImpl.getNonHeapMemoryUsage(MemoryMXBeanImpl.java:143)
// at org.apache.ignite.spi.metrics.jdk.GridJdkLocalMetricsSpi.getMetrics(GridJdkLocalMetricsSpi.java:242)
//
// We so had to workaround this with exception handling, because we can not control classes from WebSphere.
try {
return mem.getNonHeapMemoryUsage();
}
catch (IllegalArgumentException ignored) {
return new MemoryUsage(0, 0, 0, 0);
}
}
/**
* Returns the current memory usage of the heap.
* @return Memory usage or fake value with zero in case there was exception during take of metrics.
*/
public MemoryUsage heapMemoryUsage() {
// Catch exception here to allow discovery proceed even if metrics are not available
// java.lang.IllegalArgumentException: committed = 5274103808 should be < max = 5274095616
// at java.lang.management.MemoryUsage.<init>(Unknown Source)
try {
return mem.getHeapMemoryUsage();
}
catch (IllegalArgumentException ignored) {
return new MemoryUsage(0, 0, 0, 0);
}
}
/**
* @return Total system memory.
*/
private long totalSysMemory() {
try {
return U.<Long>property(os, "totalPhysicalMemorySize");
}
catch (RuntimeException ignored) {
return -1;
}
}
/** */
private class MetricsUpdater implements Runnable {
/** */
private long prevGcTime = -1;
/** */
private long prevCpuTime = -1;
/** {@inheritDoc} */
@Override public void run() {
heap.update(heapMemoryUsage());
nonHeap.update(nonHeapMemoryUsage());
gcCpuLoad.value(getGcCpuLoad());
cpuLoad.value(getCpuLoad());
}
/**
* @return GC CPU load.
*/
private double getGcCpuLoad() {
long gcTime = 0;
for (GarbageCollectorMXBean bean : gc) {
long colTime = bean.getCollectionTime();
if (colTime > 0)
gcTime += colTime;
}
gcTime /= os.getAvailableProcessors();
double gc = 0;
if (prevGcTime > 0) {
long gcTimeDiff = gcTime - prevGcTime;
gc = (double)gcTimeDiff / METRICS_UPDATE_FREQ;
}
prevGcTime = gcTime;
return gc;
}
/**
* @return CPU load.
*/
private double getCpuLoad() {
long cpuTime;
try {
cpuTime = U.<Long>property(os, "processCpuTime");
}
catch (IgniteException ignored) {
return -1;
}
// Method reports time in nanoseconds across all processors.
cpuTime /= 1000000 * os.getAvailableProcessors();
double cpu = 0;
if (prevCpuTime > 0) {
long cpuTimeDiff = cpuTime - prevCpuTime;
// CPU load could go higher than 100% because calculating of cpuTimeDiff also takes some time.
cpu = Math.min(1.0, (double)cpuTimeDiff / METRICS_UPDATE_FREQ);
}
prevCpuTime = cpuTime;
return cpu;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MetricsUpdater.class, this, super.toString());
}
}
/** Memory usage metrics. */
public class MemoryUsageMetrics {
/** @see MemoryUsage#getInit() */
private final AtomicLongMetric init;
/** @see MemoryUsage#getUsed() */
private final AtomicLongMetric used;
/** @see MemoryUsage#getCommitted() */
private final AtomicLongMetric committed;
/** @see MemoryUsage#getMax() */
private final AtomicLongMetric max;
/**
* @param group Metric registry.
* @param metricNamePrefix Metric name prefix.
*/
public MemoryUsageMetrics(String group, String metricNamePrefix) {
MetricRegistry mreg = registry(group);
this.init = mreg.longMetric(metricName(metricNamePrefix, "init"), null);
this.used = mreg.longMetric(metricName(metricNamePrefix, "used"), null);
this.committed = mreg.longMetric(metricName(metricNamePrefix, "committed"), null);
this.max = mreg.longMetric(metricName(metricNamePrefix, "max"), null);
}
/** Updates metric to the provided values. */
public void update(MemoryUsage usage) {
init.value(usage.getInit());
used.value(usage.getUsed());
committed.value(usage.getCommitted());
max.value(usage.getMax());
}
}
}