IGNITE-21908 Add metrics of distribution among stripes in disruptor (#3645)
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index a43be8b..50551cc 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -31,7 +31,7 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -124,7 +124,7 @@
clusterService,
bootstrapFactory,
() -> CompletableFuture.completedFuture(ClusterTag.clusterTag(msgFactory, "Test Server")),
- mock(MetricManager.class),
+ mock(MetricManagerImpl.class),
metrics,
authenticationManager,
new TestClockService(new HybridClockImpl()),
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 4046840..591c83f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -37,6 +37,7 @@
import org.apache.ignite.internal.jdbc.proto.ClientMessage;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.exporters.jmx.JmxExporter;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.network.ClusterNode;
@@ -119,7 +120,7 @@
return null;
}
- var metricManager = new MetricManager(ClientUtils.logger(cfg, MetricManager.class));
+ var metricManager = new MetricManagerImpl(ClientUtils.logger(cfg, MetricManagerImpl.class));
metricManager.start(List.of(new JmxExporter(ClientUtils.logger(cfg, JmxExporter.class))));
metricManager.registerSource(metrics);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 095ecfd..8a8f049 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -59,7 +59,7 @@
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -243,7 +243,7 @@
clusterService,
bootstrapFactory,
() -> CompletableFuture.completedFuture(tag),
- mock(MetricManager.class),
+ mock(MetricManagerImpl.class),
metrics,
authenticationManager,
new TestClockService(clock),
diff --git a/modules/cluster-management/build.gradle b/modules/cluster-management/build.gradle
index 809b2f2..8987125 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -61,6 +61,7 @@
testFixturesImplementation testFixtures(project(':ignite-core'))
testFixturesImplementation testFixtures(project(':ignite-configuration'))
testFixturesImplementation testFixtures(project(':ignite-network'))
+ testFixturesImplementation testFixtures(project(':ignite-metrics'))
testFixturesImplementation libs.jetbrains.annotations
integrationTestAnnotationProcessor libs.micronaut.inject.annotation.processor
@@ -73,6 +74,7 @@
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-network'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics:'))
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.jetbrains.annotations
}
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 0ebb827..74d4a7a 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -56,6 +56,7 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -104,7 +105,7 @@
Node(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder, Path workDir) {
this.clusterService = clusterService(testInfo, addr.port(), nodeFinder);
- this.raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
+ this.raftManager = new Loza(clusterService, new NoOpMetricManager(), raftConfiguration, workDir, new HybridClockImpl());
this.logicalTopology = new LogicalTopologyImpl(raftStorage);
}
diff --git a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index cf88e75..300cb12 100644
--- a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -39,6 +39,7 @@
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -112,7 +113,7 @@
this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
- Loza raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
+ Loza raftManager = new Loza(clusterService, new NoOpMetricManager(), raftConfiguration, workDir, new HybridClockImpl());
var clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve("cmg"), clusterService.nodeName());
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index c23d1ba..6f34083 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -58,6 +58,7 @@
integrationTestImplementation project(':ignite-storage-rocksdb')
integrationTestImplementation project(":ignite-vault")
integrationTestImplementation project(":ignite-security")
+ integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-network'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
@@ -66,6 +67,7 @@
integrationTestImplementation testFixtures(project(':ignite-metastorage'))
integrationTestImplementation testFixtures(project(':ignite-cluster-management'))
integrationTestImplementation testFixtures(project(':ignite-failure-handler'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics:'))
testFixturesImplementation project(':ignite-cluster-management')
testFixturesImplementation project(':ignite-core')
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 74b47c1..80838e5 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -68,6 +68,7 @@
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.Loza;
@@ -110,7 +111,8 @@
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
- raftManager = new Loza(clusterService, raftConfiguration, workDir.resolve("loza"), clock, raftGroupEventsClientListener);
+ raftManager = new Loza(clusterService, new NoOpMetricManager(), raftConfiguration, workDir.resolve("loza"), clock,
+ raftGroupEventsClientListener);
var logicalTopologyService = mock(LogicalTopologyService.class);
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index e93e9d2..cf84819 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -72,6 +72,7 @@
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -150,6 +151,7 @@
this.raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
basePath.resolve("raft"),
clock,
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index ecdaf47..1caa134 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -95,6 +95,7 @@
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -192,6 +193,7 @@
this.raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
dataPath.resolve(name()),
clock
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index d7c7306..dca14a1 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -74,6 +74,7 @@
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -134,6 +135,7 @@
var raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
basePath.resolve("raft"),
clock,
diff --git a/modules/metrics/build.gradle b/modules/metrics/build.gradle
index a609f8a..d7eae38 100644
--- a/modules/metrics/build.gradle
+++ b/modules/metrics/build.gradle
@@ -19,6 +19,7 @@
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
apply from: "$rootDir/buildscripts/java-integration-test.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
annotationProcessor project(":ignite-configuration-annotation-processor")
@@ -45,6 +46,8 @@
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation libs.auto.service.annotations
+
+ testFixturesImplementation project(':ignite-core')
}
description = 'ignite-metrics'
diff --git a/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItJvmMetricSourceTest.java b/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItJvmMetricSourceTest.java
index 74609b9..d0ce7704 100644
--- a/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItJvmMetricSourceTest.java
+++ b/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItJvmMetricSourceTest.java
@@ -27,6 +27,7 @@
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -50,7 +51,7 @@
@Test
public void testMemoryUsageMetric() {
- MetricManager metricManager = new MetricManager();
+ MetricManager metricManager = new MetricManagerImpl();
metricManager.configure(simpleConfiguration);
diff --git a/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItMetricExportersLoadingTest.java b/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItMetricExportersLoadingTest.java
index 798460d..ca2410b 100644
--- a/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItMetricExportersLoadingTest.java
+++ b/modules/metrics/src/integrationTest/java/org/apache/ignite/internal/metrics/exporters/ItMetricExportersLoadingTest.java
@@ -28,6 +28,7 @@
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
@@ -52,7 +53,7 @@
@Test
public void test() throws Exception {
- MetricManager metricManager = new MetricManager();
+ MetricManager metricManager = new MetricManagerImpl();
metricManager.configure(metricConfiguration);
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/LongGauge.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/LongGauge.java
index 5f8db42..e3b2430 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/LongGauge.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/LongGauge.java
@@ -28,7 +28,7 @@
private final LongSupplier val;
/**
- * Constructor.
+ * The constructor.
*
* @param name Name.
* @param desc Description.
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
index ba5082d..af8905b 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
@@ -17,67 +17,19 @@
package org.apache.ignite.internal.metrics;
-import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.ServiceLoader.Provider;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
-import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
-import org.apache.ignite.internal.metrics.configuration.MetricView;
import org.apache.ignite.internal.metrics.exporters.MetricExporter;
-import org.apache.ignite.internal.metrics.exporters.configuration.ExporterView;
import org.jetbrains.annotations.VisibleForTesting;
-
/**
- * Metric manager.
+ * The component services of the metrics. It has functions to switch on / off and register them.
*/
-public class MetricManager implements IgniteComponent {
- /** Logger. */
- private final IgniteLogger log;
-
- /** Metric registry. */
- private final MetricRegistry registry;
-
- private final MetricProvider metricsProvider;
-
- private final Map<String, MetricExporter> enabledMetricExporters = new ConcurrentHashMap<>();
-
- /** Metrics' exporters. */
- private Map<String, MetricExporter> availableExporters;
-
- private MetricConfiguration metricConfiguration;
-
- /**
- * Constructor.
- */
- public MetricManager() {
- this(Loggers.forClass(MetricManager.class));
- }
-
- /**
- * Constructor.
- *
- * @param log Logger.
- */
- public MetricManager(IgniteLogger log) {
- registry = new MetricRegistry();
- metricsProvider = new MetricProvider(registry);
- this.log = log;
- }
-
+public interface MetricManager extends IgniteComponent {
/**
* Method to configure {@link MetricManager} with distributed configuration.
*
@@ -85,18 +37,10 @@
*/
// TODO: IGNITE-17718 when we design the system to configure metrics itself
// TODO: this method should be revisited, but now it is supposed to use only to set distributed configuration for exporters.
- public void configure(MetricConfiguration metricConfiguration) {
- assert this.metricConfiguration == null : "Metric manager must be configured only once, on the start of the node";
+ void configure(MetricConfiguration metricConfiguration);
- this.metricConfiguration = metricConfiguration;
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> startAsync() {
- start(loadExporters());
-
- return nullCompletedFuture();
- }
+ @Override
+ CompletableFuture<Void> startAsync();
/**
* Start component.
@@ -104,71 +48,38 @@
* @param availableExporters Map of (name, exporter) with available exporters.
*/
@VisibleForTesting
- public void start(Map<String, MetricExporter> availableExporters) {
- this.availableExporters = availableExporters;
-
- MetricView conf = metricConfiguration.value();
-
- for (ExporterView exporter : conf.exporters()) {
- checkAndStartExporter(exporter.exporterName(), exporter);
- }
-
- metricConfiguration.exporters().listenElements(new ExporterConfigurationListener());
- }
+ void start(Map<String, MetricExporter> availableExporters);
/**
* Starts component with default configuration.
*
* @param exporters Exporters.
*/
- public void start(Iterable<MetricExporter<?>> exporters) {
- this.availableExporters = new HashMap<>();
+ void start(Iterable<MetricExporter<?>> exporters);
- for (MetricExporter<?> exporter : exporters) {
- exporter.start(metricsProvider, null);
-
- availableExporters.put(exporter.name(), exporter);
- enabledMetricExporters.put(exporter.name(), exporter);
- }
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> stopAsync() {
- for (MetricExporter metricExporter : enabledMetricExporters.values()) {
- metricExporter.stop();
- }
-
- enabledMetricExporters.clear();
-
- return nullCompletedFuture();
- }
+ @Override
+ CompletableFuture<Void> stopAsync();
/**
* Register metric source. See {@link MetricRegistry#registerSource(MetricSource)}.
*
* @param src Metric source.
*/
- public void registerSource(MetricSource src) {
- registry.registerSource(src);
- }
+ void registerSource(MetricSource src);
/**
* Unregister metric source. See {@link MetricRegistry#unregisterSource(MetricSource)}.
*
* @param src Metric source.
*/
- public void unregisterSource(MetricSource src) {
- registry.unregisterSource(src);
- }
+ void unregisterSource(MetricSource src);
/**
* Unregister metric source by name. See {@link MetricRegistry#unregisterSource(String)}.
*
* @param srcName Metric source name.
*/
- public void unregisterSource(String srcName) {
- registry.unregisterSource(srcName);
- }
+ void unregisterSource(String srcName);
/**
* Enable metric source. See {@link MetricRegistry#enable(MetricSource)}.
@@ -176,15 +87,7 @@
* @param src Metric source.
* @return Metric set, or {@code null} if already enabled.
*/
- public MetricSet enable(MetricSource src) {
- MetricSet enabled = registry.enable(src);
-
- if (enabled != null) {
- enabledMetricExporters.values().forEach(e -> e.addMetricSet(enabled));
- }
-
- return enabled;
- }
+ MetricSet enable(MetricSource src);
/**
* Enable metric source by name. See {@link MetricRegistry#enable(String)}.
@@ -192,52 +95,21 @@
* @param srcName Source name.
* @return Metric set, or {@code null} if already enabled.
*/
- public MetricSet enable(final String srcName) {
- MetricSet enabled = registry.enable(srcName);
-
- if (enabled != null) {
- enabledMetricExporters.values().forEach(e -> e.addMetricSet(enabled));
- }
-
- return enabled;
- }
-
- /**
- * Load exporters by {@link ServiceLoader} mechanism.
- *
- * @return list of loaded exporters.
- */
- public static Map<String, MetricExporter> loadExporters() {
- var clsLdr = Thread.currentThread().getContextClassLoader();
-
- return ServiceLoader
- .load(MetricExporter.class, clsLdr)
- .stream()
- .map(Provider::get)
- .collect(Collectors.toMap(e -> e.name(), Function.identity()));
- }
+ MetricSet enable(String srcName);
/**
* Disable metric source. See {@link MetricRegistry#disable(MetricSource)}.
*
* @param src Metric source.
*/
- public void disable(MetricSource src) {
- registry.disable(src);
-
- enabledMetricExporters.values().forEach(e -> e.removeMetricSet(src.name()));
- }
+ void disable(MetricSource src);
/**
* Disable metric source by name. See {@link MetricRegistry#disable(String)}.
*
* @param srcName Metric source name.
*/
- public void disable(final String srcName) {
- registry.disable(srcName);
-
- enabledMetricExporters.values().forEach(e -> e.removeMetricSet(srcName));
- }
+ void disable(String srcName);
/**
* Metrics snapshot. This is a snapshot of metric sets with corresponding version, the values of the metrics in the
@@ -245,61 +117,12 @@
*
* @return Metrics snapshot.
*/
- public IgniteBiTuple<Map<String, MetricSet>, Long> metricSnapshot() {
- return registry.metricSnapshot();
- }
+ IgniteBiTuple<Map<String, MetricSet>, Long> metricSnapshot();
/**
* Gets a collection of metric sources.
*
* @return collection of metric sources
*/
- public Collection<MetricSource> metricSources() {
- return registry.metricSources();
- }
-
- private <T extends ExporterView> void checkAndStartExporter(
- String exporterName,
- T exporterConfiguration) {
- MetricExporter<T> exporter = availableExporters.get(exporterName);
-
- if (exporter != null) {
- exporter.start(metricsProvider, exporterConfiguration);
-
- enabledMetricExporters.put(exporter.name(), exporter);
- } else {
- log.warn("Received configuration for unknown metric exporter with the name '" + exporterName + "'");
- }
- }
-
- private class ExporterConfigurationListener implements ConfigurationNamedListListener<ExporterView> {
- @Override
- public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ExporterView> ctx) {
- checkAndStartExporter(ctx.newValue().exporterName(), ctx.newValue());
-
- return nullCompletedFuture();
- }
-
- @Override
- public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<ExporterView> ctx) {
- var removed = enabledMetricExporters.remove(ctx.oldValue().exporterName());
-
- if (removed != null) {
- removed.stop();
- }
-
- return nullCompletedFuture();
- }
-
- @Override
- public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<ExporterView> ctx) {
- MetricExporter exporter = enabledMetricExporters.get(ctx.newValue().exporterName());
-
- if (exporter != null) {
- exporter.reconfigure(ctx.newValue());
- }
-
- return nullCompletedFuture();
- }
- }
+ Collection<MetricSource> metricSources();
}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
new file mode 100644
index 0000000..4123070
--- /dev/null
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.ServiceLoader.Provider;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
+import org.apache.ignite.internal.metrics.configuration.MetricView;
+import org.apache.ignite.internal.metrics.exporters.MetricExporter;
+import org.apache.ignite.internal.metrics.exporters.configuration.ExporterView;
+import org.jetbrains.annotations.VisibleForTesting;
+
+
+/**
+ * Metric manager.
+ */
+public class MetricManagerImpl implements MetricManager {
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Metric registry. */
+ private final MetricRegistry registry;
+
+ private final MetricProvider metricsProvider;
+
+ private final Map<String, MetricExporter> enabledMetricExporters = new ConcurrentHashMap<>();
+
+ /** Metrics' exporters. */
+ private Map<String, MetricExporter> availableExporters;
+
+ private MetricConfiguration metricConfiguration;
+
+ /**
+ * Constructor.
+ */
+ public MetricManagerImpl() {
+ this(Loggers.forClass(MetricManagerImpl.class));
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ */
+ public MetricManagerImpl(IgniteLogger log) {
+ registry = new MetricRegistry();
+ metricsProvider = new MetricProvider(registry);
+ this.log = log;
+ }
+
+ @Override
+ public void configure(MetricConfiguration metricConfiguration) {
+ assert this.metricConfiguration == null : "Metric manager must be configured only once, on the start of the node";
+
+ this.metricConfiguration = metricConfiguration;
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync() {
+ start(loadExporters());
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ @VisibleForTesting
+ public void start(Map<String, MetricExporter> availableExporters) {
+ this.availableExporters = availableExporters;
+
+ MetricView conf = metricConfiguration.value();
+
+ for (ExporterView exporter : conf.exporters()) {
+ checkAndStartExporter(exporter.exporterName(), exporter);
+ }
+
+ metricConfiguration.exporters().listenElements(new ExporterConfigurationListener());
+ }
+
+ @Override
+ public void start(Iterable<MetricExporter<?>> exporters) {
+ this.availableExporters = new HashMap<>();
+
+ for (MetricExporter<?> exporter : exporters) {
+ exporter.start(metricsProvider, null);
+
+ availableExporters.put(exporter.name(), exporter);
+ enabledMetricExporters.put(exporter.name(), exporter);
+ }
+ }
+
+ @Override public CompletableFuture<Void> stopAsync() {
+ for (MetricExporter metricExporter : enabledMetricExporters.values()) {
+ metricExporter.stop();
+ }
+
+ enabledMetricExporters.clear();
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void registerSource(MetricSource src) {
+ registry.registerSource(src);
+ }
+
+ @Override
+ public void unregisterSource(MetricSource src) {
+ registry.unregisterSource(src);
+ }
+
+ @Override
+ public void unregisterSource(String srcName) {
+ registry.unregisterSource(srcName);
+ }
+
+ @Override
+ public MetricSet enable(MetricSource src) {
+ MetricSet enabled = registry.enable(src);
+
+ if (enabled != null) {
+ enabledMetricExporters.values().forEach(e -> e.addMetricSet(enabled));
+ }
+
+ return enabled;
+ }
+
+ @Override
+ public MetricSet enable(final String srcName) {
+ MetricSet enabled = registry.enable(srcName);
+
+ if (enabled != null) {
+ enabledMetricExporters.values().forEach(e -> e.addMetricSet(enabled));
+ }
+
+ return enabled;
+ }
+
+ @Override
+ public void disable(MetricSource src) {
+ registry.disable(src);
+
+ enabledMetricExporters.values().forEach(e -> e.removeMetricSet(src.name()));
+ }
+
+ @Override
+ public void disable(final String srcName) {
+ registry.disable(srcName);
+
+ enabledMetricExporters.values().forEach(e -> e.removeMetricSet(srcName));
+ }
+
+ @Override
+ public IgniteBiTuple<Map<String, MetricSet>, Long> metricSnapshot() {
+ return registry.metricSnapshot();
+ }
+
+ @Override
+ public Collection<MetricSource> metricSources() {
+ return registry.metricSources();
+ }
+
+ private <T extends ExporterView> void checkAndStartExporter(
+ String exporterName,
+ T exporterConfiguration) {
+ MetricExporter<T> exporter = availableExporters.get(exporterName);
+
+ if (exporter != null) {
+ exporter.start(metricsProvider, exporterConfiguration);
+
+ enabledMetricExporters.put(exporter.name(), exporter);
+ } else {
+ log.warn("Received configuration for unknown metric exporter with the name '" + exporterName + "'");
+ }
+ }
+
+ /**
+ * Load exporters by {@link ServiceLoader} mechanism.
+ *
+ * @return list of loaded exporters.
+ */
+ public static Map<String, MetricExporter> loadExporters() {
+ var clsLdr = Thread.currentThread().getContextClassLoader();
+
+ return ServiceLoader
+ .load(MetricExporter.class, clsLdr)
+ .stream()
+ .map(Provider::get)
+ .collect(Collectors.toMap(e -> e.name(), Function.identity()));
+ }
+
+ private class ExporterConfigurationListener implements ConfigurationNamedListListener<ExporterView> {
+ @Override
+ public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ExporterView> ctx) {
+ checkAndStartExporter(ctx.newValue().exporterName(), ctx.newValue());
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<ExporterView> ctx) {
+ var removed = enabledMetricExporters.remove(ctx.oldValue().exporterName());
+
+ if (removed != null) {
+ removed.stop();
+ }
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<ExporterView> ctx) {
+ MetricExporter exporter = enabledMetricExporters.get(ctx.newValue().exporterName());
+
+ if (exporter != null) {
+ exporter.reconfigure(ctx.newValue());
+ }
+
+ return nullCompletedFuture();
+ }
+ }
+}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/SimpleMovingAverage.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/SimpleMovingAverage.java
new file mode 100644
index 0000000..d7b020c
--- /dev/null
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/SimpleMovingAverage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.DoubleFunction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The metric calculates the average value for the last several operations.
+ */
+public class SimpleMovingAverage extends AbstractMetric implements DoubleMetric {
+ /** Default window size. */
+ public static final int DFLT_ITEMS = 100;
+
+ /** Size. */
+ private final int items;
+
+ /** Elements. */
+ ConcurrentLinkedDeque<Double> queue = new ConcurrentLinkedDeque<>();
+
+ /**
+ * The constructor.
+ *
+ * @param name Name.
+ * @param desc Description.
+ * @param stringFormatter String formatter to get a readable value.
+ */
+ public SimpleMovingAverage(String name, @Nullable String desc, @Nullable DoubleFunction<String> stringFormatter) {
+ this(name, desc, stringFormatter, DFLT_ITEMS);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param name Name.
+ * @param desc Description.
+ * @param stringFormatter String formatter to get a readable value.
+ * @param items Quantity items to calculate average value.
+ */
+ public SimpleMovingAverage(String name, @Nullable String desc, DoubleFunction<String> stringFormatter, int items) {
+ super(name, desc);
+
+ this.items = items;
+ }
+
+ /**
+ * Adds some value.
+ *
+ * @param val Value.
+ */
+ public void add(double val) {
+ queue.add(val);
+
+ while (queue.size() > items) {
+ queue.pop();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double value() {
+ return queue.stream().mapToDouble(a -> a).average().orElse(0.);
+ }
+}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/configuration/MetricConfigurationModule.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/configuration/MetricConfigurationModule.java
index 7b58867..9b22ef1 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/configuration/MetricConfigurationModule.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/configuration/MetricConfigurationModule.java
@@ -24,6 +24,7 @@
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.internal.metrics.exporters.configuration.JmxExporterConfigurationSchema;
+import org.apache.ignite.internal.metrics.exporters.configuration.LogPushExporterConfigurationSchema;
/**
* Configuration module for metrics' configs.
@@ -45,6 +46,6 @@
/** {@inheritDoc} */
@Override
public Collection<Class<?>> polymorphicSchemaExtensions() {
- return List.of(JmxExporterConfigurationSchema.class);
+ return List.of(JmxExporterConfigurationSchema.class, LogPushExporterConfigurationSchema.class);
}
}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/MetricExporter.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/MetricExporter.java
index 2bae2df..d93db1f 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/MetricExporter.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/MetricExporter.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metrics.exporters;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.MetricProvider;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.metrics.exporters.configuration.ExporterConfiguration;
@@ -64,7 +65,7 @@
void reconfigure(CfgT newValue);
/**
- * {@link org.apache.ignite.internal.metrics.MetricManager} invokes this method,
+ * {@link MetricManagerImpl} invokes this method,
* when new metric source was enabled.
*
* @param metricSet Named metric set.
@@ -72,7 +73,7 @@
void addMetricSet(MetricSet metricSet);
/**
- * {@link org.apache.ignite.internal.metrics.MetricManager} invokes this method,
+ * {@link MetricManagerImpl} invokes this method,
* when the metric source was disabled.
*
* @param metricSetName Name of metric set to remove.
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/configuration/LogPushExporterConfigurationSchema.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/configuration/LogPushExporterConfigurationSchema.java
new file mode 100644
index 0000000..6e0b630
--- /dev/null
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/configuration/LogPushExporterConfigurationSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.exporters.configuration;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.metrics.exporters.log.LogPushExporter;
+
+/**
+ * Configuration for log push exporter.
+ */
+@PolymorphicConfigInstance(LogPushExporter.EXPORTER_NAME)
+public class LogPushExporterConfigurationSchema extends ExporterConfigurationSchema {
+ @Value(hasDefault = true)
+ public int period = 30_000;
+}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/MetricSetMbean.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/MetricSetMbean.java
index 9548679..4d4c0de 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/MetricSetMbean.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/MetricSetMbean.java
@@ -33,7 +33,7 @@
import org.apache.ignite.internal.metrics.IntMetric;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.MetricSet;
/**
@@ -135,7 +135,7 @@
});
return new MBeanInfo(
- MetricManager.class.getName(),
+ MetricManagerImpl.class.getName(),
metricSet.name(),
attrs.toArray(new MBeanAttributeInfo[0]),
null,
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/log/LogPushExporter.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/log/LogPushExporter.java
new file mode 100644
index 0000000..cd00a45
--- /dev/null
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/log/LogPushExporter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.exporters.log;
+
+import com.google.auto.service.AutoService;
+import java.util.Comparator;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricProvider;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.exporters.MetricExporter;
+import org.apache.ignite.internal.metrics.exporters.PushMetricExporter;
+import org.apache.ignite.internal.metrics.exporters.configuration.LogPushExporterView;
+import org.apache.ignite.internal.util.CollectionUtils;
+
+/**
+ * Log push metrics exporter.
+ */
+@AutoService(MetricExporter.class)
+public class LogPushExporter extends PushMetricExporter<LogPushExporterView> {
+ public static final String EXPORTER_NAME = "logPush";
+
+ private IgniteLogger log;
+
+ private long period;
+
+ @Override
+ public void start(MetricProvider metricsProvider, LogPushExporterView configuration) {
+ period = configuration.period();
+ log = Loggers.forClass(LogPushExporter.class);
+
+ super.start(metricsProvider, configuration);
+ }
+
+ @Override
+ protected long period() {
+ return period;
+ }
+
+ @Override
+ public void report() {
+ if (CollectionUtils.nullOrEmpty(metrics().get1().values())) {
+ return;
+ }
+
+ var report = new StringBuilder("Metric report: \n");
+
+ for (MetricSet metricSet : metrics().get1().values()) {
+ report.append(metricSet.name()).append(":\n");
+
+ StreamSupport.stream(metricSet.spliterator(), false).sorted(Comparator.comparing(Metric::name)).forEach(metric ->
+ report.append(metric.name())
+ .append(':')
+ .append(metric.getValueAsString())
+ .append('\n'));
+ }
+
+ log.info(report.toString());
+ }
+
+ @Override
+ public String name() {
+ return EXPORTER_NAME;
+ }
+
+ @Override
+ public void addMetricSet(MetricSet metricSet) {
+ }
+
+ @Override
+ public void removeMetricSet(String metricSetName) {
+ }
+}
diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/JvmMetricSource.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/JvmMetricSource.java
index adb6024..0ffb02a 100644
--- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/JvmMetricSource.java
+++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/JvmMetricSource.java
@@ -74,37 +74,48 @@
CachedMemoryUsage heapMemoryUsage = new CachedMemoryUsage(memoryMxBean::getHeapMemoryUsage, MEMORY_USAGE_CACHE_TIMEOUT);
metrics.put("memory.heap.Init",
- new LongGauge("memory.heap.Init", "Initial amount of heap memory", () -> heapMemoryUsage.get().getInit()));
+ new LongGauge(
+ "memory.heap.Init",
+ "Initial amount of heap memory",
+ () -> heapMemoryUsage.get().getInit()
+ ));
metrics.put("memory.heap.Used",
new LongGauge("memory.heap.Used",
"Current used amount of heap memory",
- () -> heapMemoryUsage.get().getUsed()));
+ () -> heapMemoryUsage.get().getUsed()
+ ));
metrics.put("memory.heap.Committed",
new LongGauge("memory.heap.Committed",
"Committed amount of heap memory",
- () -> heapMemoryUsage.get().getCommitted()));
+ () -> heapMemoryUsage.get().getCommitted()
+ ));
metrics.put("memory.heap.Max",
new LongGauge("memory.heap.Max",
"Maximum amount of heap memory",
- () -> heapMemoryUsage.get().getMax()));
+ () -> heapMemoryUsage.get().getMax()
+ ));
CachedMemoryUsage nonHeapMemoryUsage = new CachedMemoryUsage(memoryMxBean::getNonHeapMemoryUsage, MEMORY_USAGE_CACHE_TIMEOUT);
metrics.put("memory.non-heap.Init",
new LongGauge("memory.non-heap.Init",
"Initial amount of non-heap memory",
- () -> nonHeapMemoryUsage.get().getInit()));
+ () -> nonHeapMemoryUsage.get().getInit()
+ ));
metrics.put("memory.non-heap.Used",
new LongGauge("memory.non-heap.Used",
"Used amount of non-heap memory",
- () -> nonHeapMemoryUsage.get().getUsed()));
+ () -> nonHeapMemoryUsage.get().getUsed()
+ ));
metrics.put("memory.non-heap.Committed",
new LongGauge("memory.non-heap.Committed",
"Committed amount of non-heap memory",
- () -> nonHeapMemoryUsage.get().getCommitted()));
+ () -> nonHeapMemoryUsage.get().getCommitted()
+ ));
metrics.put("memory.non-heap.Max",
new LongGauge("memory.non-heap.Max",
"Maximum amount of non-heap memory",
- () -> nonHeapMemoryUsage.get().getMax()));
+ () -> nonHeapMemoryUsage.get().getMax()
+ ));
enabled = true;
diff --git a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MetricConfigurationTest.java b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MetricConfigurationTest.java
index b20aad6..f140346 100644
--- a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MetricConfigurationTest.java
+++ b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MetricConfigurationTest.java
@@ -53,7 +53,7 @@
@BeforeEach
public void setUp() {
- metricManager = new MetricManager();
+ metricManager = new MetricManagerImpl();
Map<String, MetricExporter> availableExporters = new HashMap<>();
diff --git a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MovingAverageTest.java b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MovingAverageTest.java
new file mode 100644
index 0000000..6f8f3e1
--- /dev/null
+++ b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/MovingAverageTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import java.util.Arrays;
+
+/**
+ * Metric to calculate moving average value.
+ */
+public class MovingAverageTest extends AbstractDoubleMetricTest {
+ public double[] items = new double[10];
+
+ int pos = 0;
+
+ @Override
+ protected void increment0(DoubleMetric metric) {
+ double avg = getAvg();
+
+ addValue(avg);
+
+ ((SimpleMovingAverage) metric).add(avg);
+ }
+
+ @Override
+ protected void decrement0(DoubleMetric metric) {
+ addValue(0);
+
+ ((SimpleMovingAverage) metric).add(0);
+ }
+
+ @Override
+ protected void add0(DoubleMetric metric, double value) {
+ addValue(value);
+
+ ((SimpleMovingAverage) metric).add(value);
+ }
+
+ @Override
+ protected void setValue0(DoubleMetric metric, double value) {
+ for (int i = 0; i < items.length; i++) {
+ addValue(value);
+
+ ((SimpleMovingAverage) metric).add(value);
+ }
+ }
+
+ @Override
+ protected DoubleMetric createMetric(String name, String description) {
+ return new SimpleMovingAverage(name, description, Double::toString, items.length);
+ }
+
+ @Override
+ protected Double expected() {
+ return getAvg();
+ }
+
+ /**
+ * Adds a value for the proper calculation of the expected one.
+ *
+ * @param value Some value.
+ */
+ private void addValue(double value) {
+ items[pos % items.length] = value;
+ pos++;
+ }
+
+ /**
+ * Calculated average value.
+ *
+ * @return Average value.
+ */
+ private double getAvg() {
+ return pos == 0 ? 0 : Arrays.stream(items, 0, Math.min(pos, items.length)).sum() / (Math.min(pos, items.length));
+ }
+}
diff --git a/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/NoOpMetricManager.java b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/NoOpMetricManager.java
new file mode 100644
index 0000000..1ad4383
--- /dev/null
+++ b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/NoOpMetricManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
+import org.apache.ignite.internal.metrics.exporters.MetricExporter;
+
+/**
+ * The metric manager does nothing in all operations. It is designed to be used in tests where not all component workflow steps might be
+ * fulfilled.
+ */
+public class NoOpMetricManager implements MetricManager {
+ @Override
+ public void configure(MetricConfiguration metricConfiguration) {
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync() {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void start(Map<String, MetricExporter> availableExporters) {
+ }
+
+ @Override
+ public void start(Iterable<MetricExporter<?>> exporters) {
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync() {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void registerSource(MetricSource src) {
+ }
+
+ @Override
+ public void unregisterSource(MetricSource src) {
+ }
+
+ @Override
+ public void unregisterSource(String srcName) {
+ }
+
+ @Override
+ public MetricSet enable(MetricSource src) {
+ return null;
+ }
+
+ @Override
+ public MetricSet enable(String srcName) {
+ return null;
+ }
+
+ @Override
+ public void disable(MetricSource src) {
+ }
+
+ @Override
+ public void disable(String srcName) {
+ }
+
+ @Override
+ public IgniteBiTuple<Map<String, MetricSet>, Long> metricSnapshot() {
+ return new IgniteBiTuple<>(Collections.emptyMap(), 1L);
+ }
+
+ @Override
+ public Collection<MetricSource> metricSources() {
+ return Collections.emptyList();
+ }
+}
diff --git a/modules/placement-driver/build.gradle b/modules/placement-driver/build.gradle
index 9efc3a5..ed2552c 100644
--- a/modules/placement-driver/build.gradle
+++ b/modules/placement-driver/build.gradle
@@ -56,6 +56,7 @@
integrationTestImplementation project(':ignite-replicator')
integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation project(':ignite-catalog')
+ integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-network')))
@@ -66,6 +67,7 @@
integrationTestImplementation(testFixtures(project(':ignite-distribution-zones')))
integrationTestImplementation(testFixtures(project(':ignite-runner')))
integrationTestImplementation(testFixtures(project(':ignite-replicator')))
+ integrationTestImplementation(testFixtures(project(':ignite-metrics:')))
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-metastorage')))
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index abcef59..fe784ef 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -53,6 +53,7 @@
import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -243,6 +244,7 @@
var raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
workDir.resolve(nodeName + "_loza"),
nodeClock,
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index bec4dba..4171b9f 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -72,6 +72,7 @@
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -180,6 +181,7 @@
raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
workDir.resolve("loza"),
nodeClock,
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 89aad16..98b0a17 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -42,6 +42,7 @@
implementation project(':ignite-raft-api')
implementation project(':ignite-network')
implementation project(':ignite-rocksdb-common')
+ implementation project(':ignite-metrics')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.disruptor
@@ -59,6 +60,7 @@
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-network')))
testImplementation(testFixtures(project(':ignite-configuration')))
+ testImplementation(testFixtures(project(':ignite-metrics:')))
testImplementation project(':ignite-configuration')
testImplementation project(':ignite-core')
testImplementation project(':ignite-network')
@@ -88,6 +90,7 @@
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
integrationTestImplementation(testFixtures(project(':ignite-network')))
integrationTestImplementation testFixtures(project(':ignite-workers'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics:'))
integrationTestImplementation project(':ignite-raft-api')
integrationTestImplementation project(':ignite-failure-handler')
integrationTestImplementation libs.jetbrains.annotations
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index d6cf4d5..d7f2279 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -52,6 +52,7 @@
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -109,7 +110,7 @@
Path raftDir = workDir.resolve(clusterService.nodeName());
- loza = new Loza(clusterService, raftConfiguration, raftDir, new HybridClockImpl());
+ loza = new Loza(clusterService, new NoOpMetricManager(), raftConfiguration, raftDir, new HybridClockImpl());
}
String consistentId() {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 0c08f1d..940a03a 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -38,6 +38,7 @@
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -125,7 +126,7 @@
CompletableFuture<NetworkMessage> exception = CompletableFuture.failedFuture(new IOException());
- loza = new Loza(service, raftConfiguration, dataPath, new HybridClockImpl());
+ loza = new Loza(service, new NoOpMetricManager(), raftConfiguration, dataPath, new HybridClockImpl());
assertThat(loza.startAsync(), willCompleteSuccessfully());
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 43e1589..c1401c0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -45,6 +45,7 @@
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -215,7 +216,13 @@
TestNode(TestInfo testInfo) {
this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, NODE_PORT_BASE + nodes.size(), NODE_FINDER);
- this.loza = new Loza(clusterService, raftConfiguration, workDir.resolve("node" + nodes.size()), new HybridClockImpl());
+ this.loza = new Loza(
+ clusterService,
+ new NoOpMetricManager(),
+ raftConfiguration,
+ workDir.resolve("node" + nodes.size()),
+ new HybridClockImpl()
+ );
}
String name() {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index a641db2..bbb2adc 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -49,6 +49,7 @@
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -184,7 +185,7 @@
assertThat(clusterSvc.startAsync(), willCompleteSuccessfully());
cleanup.add(() -> assertThat(clusterSvc.stopAsync(), willCompleteSuccessfully()));
- raftMgr = new Loza(clusterSvc, raftConfiguration, nodeDir, hybridClock);
+ raftMgr = new Loza(clusterSvc, new NoOpMetricManager(), raftConfiguration, nodeDir, hybridClock);
assertThat(raftMgr.startAsync(), willCompleteSuccessfully());
cleanup.add(() -> assertThat(raftMgr.stopAsync(), willCompleteSuccessfully()));
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/metrics/sources/RaftMetricSource.java b/modules/raft/src/main/java/org/apache/ignite/internal/metrics/sources/RaftMetricSource.java
new file mode 100644
index 0000000..ae0deab
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/metrics/sources/RaftMetricSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.sources;
+
+import java.util.HashMap;
+import java.util.stream.LongStream;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metrics of striped disruptor.
+ */
+public class RaftMetricSource implements MetricSource {
+ public static final String SOURCE_NAME = "raft";
+
+ /** True, if source is enabled, false otherwise. */
+ private boolean enabled;
+
+ /** Disruptor stripe count. */
+ private final int stripeCount;
+
+ /** Log disruptor stripe count. */
+ private final int logStripeCount;
+
+ /** Metric set. */
+ HashMap<String, Metric> metrics = new HashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param stripeCount Count of stripes.
+ * @param logStripeCount Log manager disruptor stripe count.
+ */
+ public RaftMetricSource(int stripeCount, int logStripeCount) {
+ this.stripeCount = stripeCount;
+ this.logStripeCount = logStripeCount;
+
+ initMetrics();
+ }
+
+ @Override
+ public String name() {
+ return SOURCE_NAME;
+ }
+
+ @Override
+ public @Nullable MetricSet enable() {
+ enabled = true;
+
+ return new MetricSet(SOURCE_NAME, metrics);
+ }
+
+ private void initMetrics() {
+ long[] bounds = new long[]{10L, 20L, 30L, 40L, 50L};
+
+ // jraft-fsmcaller-disruptor
+ metrics.put("raft.fsmcaller.disruptor.Batch",
+ new DistributionMetric(
+ "raft.fsmcaller.disruptor.Batch",
+ "The histogram of the batch size to handle in the state machine for partitions",
+ bounds
+ ));
+ metrics.put("raft.fsmcaller.disruptor.Stripes",
+ new DistributionMetric(
+ "raft.fsmcaller.disruptor.Stripes",
+ "The histogram of distribution data by stripes in the state machine for partitions",
+ LongStream.range(0, stripeCount).toArray()
+ ));
+
+ // jraft-nodeimpl-disruptor
+ metrics.put("raft.nodeimpl.disruptor.Batch",
+ new DistributionMetric(
+ "raft.nodeimpl.disruptor.Batch",
+ "The histogram of the batch size to handle node operations for partitions",
+ bounds
+ ));
+ metrics.put("raft.nodeimpl.disruptor.Stripes",
+ new DistributionMetric(
+ "raft.nodeimpl.disruptor.Stripes",
+ "The histogram of distribution data by stripes for node operations for partitions",
+ LongStream.range(0, stripeCount).toArray()
+ ));
+
+ // jraft-readonlyservice-disruptor
+ metrics.put("raft.readonlyservice.disruptor.Batch",
+ new DistributionMetric(
+ "raft.readonlyservice.disruptor.Batch",
+ "The histogram of the batch size to handle readonly operations for partitions",
+ bounds
+ ));
+ metrics.put("raft.readonlyservice.disruptor.Stripes",
+ new DistributionMetric(
+ "raft.readonlyservice.disruptor.Stripes",
+ "The histogram of distribution data by stripes readonly operations for partitions",
+ LongStream.range(0, stripeCount).toArray()
+ ));
+
+ // jraft-logmanager-disruptor
+ metrics.put("raft.logmanager.disruptor.Batch",
+ new DistributionMetric(
+ "raft.logmanager.disruptor.Batch",
+ "The histogram of the batch size to handle in the log for partitions",
+ bounds
+ ));
+ metrics.put("raft.logmanager.disruptor.Stripes",
+ new DistributionMetric(
+ "raft.logmanager.disruptor.Stripes",
+ "The histogram of distribution data by stripes in the log for partitions",
+ LongStream.range(0, logStripeCount).toArray()
+ ));
+ }
+
+ /**
+ * Disruptor metrics source.
+ *
+ * @param name Disruptor name.
+ * @return Object to track metrics.
+ */
+ public DisruptorMetrics disruptorMetrics(String name) {
+ return new DisruptorMetrics(
+ (DistributionMetric) metrics.get(name + ".Batch"),
+ (DistributionMetric) metrics.get(name + ".Stripes")
+ );
+ }
+
+ @Override
+ public void disable() {
+ enabled = false;
+ }
+
+ @Override
+ public boolean enabled() {
+ return enabled;
+ }
+
+ /**
+ * Striped disruptor metrics.
+ */
+ public class DisruptorMetrics {
+ private DistributionMetric batchSizeHistogramMetric;
+ private DistributionMetric stripeHistogramMetric;
+
+ public DisruptorMetrics(DistributionMetric averageBatchSizeMetric, DistributionMetric stripeHistogramMetric) {
+ this.batchSizeHistogramMetric = averageBatchSizeMetric;
+ this.stripeHistogramMetric = stripeHistogramMetric;
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public void addBatchSize(long size) {
+ batchSizeHistogramMetric.add(size);
+ }
+
+ public void hitToStripe(int stripe) {
+ stripeHistogramMetric.add(stripe);
+ }
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index a755cc0..f4c6732 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -35,6 +35,8 @@
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -98,16 +100,20 @@
private final NodeOptions opts;
+ private final MetricManager metricManager;
+
/**
* The constructor.
*
* @param clusterNetSvc Cluster network service.
+ * @param metricManager Metric manager.
* @param raftConfiguration Raft configuration.
* @param dataPath Data path.
* @param clock A hybrid logical clock.
*/
public Loza(
ClusterService clusterNetSvc,
+ MetricManager metricManager,
RaftConfiguration raftConfiguration,
Path dataPath,
HybridClock clock,
@@ -115,6 +121,7 @@
) {
this.clusterNetSvc = clusterNetSvc;
this.raftConfiguration = raftConfiguration;
+ this.metricManager = metricManager;
NodeOptions options = new NodeOptions();
@@ -135,6 +142,7 @@
* The constructor.
*
* @param clusterNetSvc Cluster network service.
+ * @param metricManager Metric manager.
* @param raftConfiguration Raft configuration.
* @param dataPath Data path.
* @param clock A hybrid logical clock.
@@ -142,12 +150,14 @@
@TestOnly
public Loza(
ClusterService clusterNetSvc,
+ MetricManager metricManager,
RaftConfiguration raftConfiguration,
Path dataPath,
HybridClock clock
) {
this(
clusterNetSvc,
+ metricManager,
raftConfiguration,
dataPath,
clock,
@@ -180,6 +190,11 @@
public CompletableFuture<Void> startAsync() {
RaftView raftConfig = raftConfiguration.value();
+ var stripeSource = new RaftMetricSource(raftConfiguration.value().stripes(), raftConfiguration.value().logStripesCount());
+
+ metricManager.registerSource(stripeSource);
+
+ opts.setRaftMetrics(stripeSource);
opts.setRpcInstallSnapshotTimeout(raftConfig.rpcInstallSnapshotTimeout());
opts.setStripes(raftConfig.stripes());
opts.setLogStripesCount(raftConfig.logStripesCount());
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 7497c9c..5cb2f14 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -48,6 +48,7 @@
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
@@ -319,6 +320,10 @@
actionRequestInterceptor
);
+ if (opts.getRaftMetrics() == null) {
+ opts.setRaftMetrics(new RaftMetricSource(opts.getStripes(), opts.getLogStripesCount()));
+ }
+
if (opts.getfSMCallerExecutorDisruptor() == null) {
opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<>(
opts.getServerName(),
@@ -328,7 +333,8 @@
ApplyTask::new,
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.fsmcaller.disruptor")
));
}
@@ -340,7 +346,8 @@
LogEntryAndClosure::new,
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.nodeimpl.disruptor")
));
}
@@ -352,7 +359,8 @@
ReadIndexEvent::new,
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.readonlyservice.disruptor")
));
}
@@ -364,7 +372,8 @@
StableClosureEvent::new,
opts.getLogStripesCount(),
true,
- opts.isLogYieldStrategy()
+ opts.isLogYieldStrategy(),
+ opts.getRaftMetrics().disruptorMetrics("raft.logmanager.disruptor")
));
opts.setLogStripes(IntStream.range(0, opts.getLogStripesCount()).mapToObj(i -> new Stripe()).collect(toList()));
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 972a0bc..963d25b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -43,7 +43,7 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.JraftGroupEventsListener;
+import org.apache.ignite.internal.metrics.sources.RaftMetricSource;import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
@@ -1238,6 +1238,10 @@
if (opts.getClientExecutor() == null && validateOption(opts, "clientExecutor"))
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
+ if (opts.getRaftMetrics() == null) {
+ opts.setRaftMetrics(new RaftMetricSource(opts.getStripes(), opts.getLogStripesCount()));
+ }
+
if (opts.getfSMCallerExecutorDisruptor() == null) {
opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
opts.getServerName(),
@@ -1247,17 +1251,19 @@
() -> new FSMCallerImpl.ApplyTask(),
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.fsmcaller.disruptor")
));
} else if (ownFsmCallerExecutorDisruptorConfig != null) {
opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
opts.getServerName(),
- "JRaft-FSMCaller-Disruptor-" + ownFsmCallerExecutorDisruptorConfig.getThreadPostfix(),
+ "JRaft-FSMCaller-Disruptor" + ownFsmCallerExecutorDisruptorConfig.getThreadPostfix(),
opts.getRaftOptions().getDisruptorBufferSize(),
() -> new FSMCallerImpl.ApplyTask(),
ownFsmCallerExecutorDisruptorConfig.getStripes(),
false,
- false
+ false,
+ null
));
}
@@ -1269,7 +1275,8 @@
() -> new NodeImpl.LogEntryAndClosure(),
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.nodeimpl.disruptor")
));
}
@@ -1281,7 +1288,8 @@
() -> new ReadOnlyServiceImpl.ReadIndexEvent(),
opts.getStripes(),
false,
- false
+ false,
+ opts.getRaftMetrics().disruptorMetrics("raft.readonlyservice.disruptor")
));
}
@@ -1293,7 +1301,8 @@
() -> new LogManagerImpl.StableClosureEvent(),
opts.getLogStripesCount(),
logStorage instanceof RocksDbSharedLogStorage,
- opts.isLogYieldStrategy()
+ opts.isLogYieldStrategy(),
+ opts.getRaftMetrics().disruptorMetrics("raft.logmanager.disruptor")
));
opts.setLogStripes(IntStream.range(0, opts.getLogStripesCount()).mapToObj(i -> new Stripe()).collect(toList()));
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 875c309..bb27f01 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -33,8 +33,10 @@
import java.util.function.BiFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.sources.RaftMetricSource.DisruptorMetrics;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.jetbrains.annotations.Nullable;
/**
* Stripe Disruptor is a set of queues which process several independent groups in one queue (in the stripe).
@@ -64,6 +66,8 @@
/** The Striped disruptor name. */
private final String name;
+ private final DisruptorMetrics metrics;
+
/**
* If {@code false}, this stripe will always pass {@code true} into {@link EventHandler#onEvent(Object, long, boolean)}.
* Otherwise, the data will be provided with batches.
@@ -80,6 +84,7 @@
* @param supportsBatches If {@code false}, this stripe will always pass {@code true} into
* {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, the data will be provided with batches.
* @param useYieldStrategy If {@code true}, the yield strategy is to be used, otherwise the blocking strategy.
+ * @param metrics Metrics.
*/
public StripedDisruptor(
String nodeName,
@@ -88,7 +93,8 @@
EventFactory<T> eventFactory,
int stripes,
boolean supportsBatches,
- boolean useYieldStrategy
+ boolean useYieldStrategy,
+ @Nullable DisruptorMetrics metrics
) {
this(
nodeName,
@@ -98,7 +104,8 @@
eventFactory,
stripes,
supportsBatches,
- useYieldStrategy
+ useYieldStrategy,
+ metrics
);
}
@@ -112,6 +119,7 @@
* @param supportsBatches If {@code false}, this stripe will always pass {@code true} into
* {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, the data will be provided with batches.
* @param useYieldStrategy If {@code true}, the yield strategy is to be used, otherwise the blocking strategy.
+ * @param raftMetrics Metrics.
*/
public StripedDisruptor(
String nodeName,
@@ -121,7 +129,8 @@
EventFactory<T> eventFactory,
int stripes,
boolean supportsBatches,
- boolean useYieldStrategy
+ boolean useYieldStrategy,
+ @Nullable DisruptorMetrics raftMetrics
) {
disruptors = new Disruptor[stripes];
queues = new RingBuffer[stripes];
@@ -130,6 +139,7 @@
this.stripes = stripes;
this.name = NamedThreadFactory.threadPrefix(nodeName, poolName);
this.supportsBatches = supportsBatches;
+ this.metrics = raftMetrics;
for (int i = 0; i < stripes; i++) {
String stripeName = format("{}_stripe_{}", poolName, i);
@@ -237,6 +247,9 @@
private class StripeEntryHandler implements EventHandler<T> {
private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
+ /** Size of the batch that is currently being handled. */
+ private int currentBatchSize = 0;
+
/**
* The constructor.
*/
@@ -269,6 +282,18 @@
// TODO: IGNITE-20536 Need to add assert that handler is not null and to implement a no-op handler.
if (handler != null) {
+ if (metrics != null && metrics.enabled()) {
+ metrics.hitToStripe(getStripe(event.nodeId()));
+
+ if (endOfBatch) {
+ metrics.addBatchSize(currentBatchSize + 1);
+
+ currentBatchSize = 0;
+ } else {
+ currentBatchSize ++;
+ }
+ }
+
handler.onEvent(event, sequence, endOfBatch || subscribers.size() > 1 && !supportsBatches);
} else {
LOG.warn(format("Group of the event is unsupported [nodeId={}, event={}]", event.nodeId(), event));
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 0d14098..e444800 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
@@ -269,11 +270,26 @@
private Marshaller commandsMarshaller;
+ private RaftMetricSource raftMetrics;
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
}
/**
+ * Gets raft metrics.
+ *
+ * @return Raft metrics.
+ */
+ public RaftMetricSource getRaftMetrics() {
+ return raftMetrics;
+ }
+
+ public void setRaftMetrics(RaftMetricSource raftMetrics) {
+ this.raftMetrics = raftMetrics;
+ }
+
+ /**
* @return Stripe count.
*/
public int getStripes() {
diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index b46e310..8105dff 100644
--- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -49,7 +49,8 @@
NodeIdAwareTestObj::new,
1,
false,
- false);
+ false,
+ null);
var nodeId1 = new NodeId("grp1", new PeerId("foo"));
var nodeId2 = new NodeId("grp2", new PeerId("foo"));
@@ -100,7 +101,8 @@
NodeIdAwareTestObj::new,
5,
false,
- false);
+ false,
+ null);
GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 1a9b70c..01fbc54 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -27,6 +27,7 @@
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -65,7 +66,7 @@
Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
- Loza loza = new Loza(clusterNetSvc, raftConfiguration, workDir, new HybridClockImpl());
+ Loza loza = new Loza(clusterNetSvc, new NoOpMetricManager(), raftConfiguration, workDir, new HybridClockImpl());
assertThat(loza.startAsync(), willCompleteSuccessfully());
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index 671b5bd..518ded3 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -99,11 +99,12 @@
opts.setClosureQueue(this.closureQueue);
opts.setRaftMessagesFactory(new RaftMessagesFactory());
opts.setfSMCallerExecutorDisruptor(disruptor = new StripedDisruptor<>("test", "TestFSMDisruptor",
- 1024,
- () -> new FSMCallerImpl.ApplyTask(),
- 1,
- false,
- false));
+ 1024,
+ () -> new FSMCallerImpl.ApplyTask(),
+ 1,
+ false,
+ false,
+ null));
assertTrue(this.fsmCaller.init(opts));
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index b5f84b8..be3e148 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -89,11 +89,12 @@
opts.setNode(this.node);
opts.setRaftOptions(raftOptions);
opts.setReadOnlyServiceDisruptor(disruptor = new StripedDisruptor<>("test", "TestReadOnlyServiceDisruptor",
- 1024,
- () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
- 1,
- false,
- false));
+ 1024,
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ 1,
+ false,
+ false,
+ null));
NodeOptions nodeOptions = new NodeOptions();
ExecutorService executor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
executors.add(executor);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index d2661b7..4049034 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -102,11 +102,12 @@
opts.setLogStorage(this.logStorage);
opts.setRaftOptions(raftOptions);
opts.setLogManagerDisruptor(disruptor = new StripedDisruptor<>("test", "TestLogManagerDisruptor",
- 1024,
- () -> new LogManagerImpl.StableClosureEvent(),
- 1,
- false,
- false));
+ 1024,
+ () -> new LogManagerImpl.StableClosureEvent(),
+ 1,
+ false,
+ false,
+ null));
assertTrue(this.logManager.init(opts));
}
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index 08d76b3..81ce518 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -44,12 +44,14 @@
integrationTestImplementation project(':ignite-placement-driver-api')
integrationTestImplementation project(':ignite-network-api')
integrationTestImplementation project(':ignite-cluster-management')
+ integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation testFixtures(project)
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-network'))
integrationTestImplementation testFixtures(project(':ignite-placement-driver-api'))
integrationTestImplementation testFixtures(project(':ignite-failure-handler'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics'))
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-placement-driver-api'))
diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 98f68d5..28d9bc8 100644
--- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -64,6 +64,7 @@
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteTriConsumer;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -170,6 +171,7 @@
var raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
workDir.resolve(nodeName + "_loza"),
clock,
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 6db1d35..aae26f8 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -174,6 +174,7 @@
integrationTestImplementation testFixtures(project(':ignite-placement-driver-api'))
integrationTestImplementation testFixtures(project(':ignite-jdbc'))
integrationTestImplementation testFixtures(project(':ignite-failure-handler'))
+ integrationTestImplementation testFixtures(project(':ignite-metrics:'))
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.rocksdb.jni
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 048c70c..a8c311b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -60,6 +60,7 @@
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -155,7 +156,13 @@
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
- raftManager = new Loza(clusterService, raftConfiguration, workDir, clock, raftGroupEventsClientListener);
+ raftManager = new Loza(
+ clusterService,
+ new NoOpMetricManager(),
+ raftConfiguration,
+ workDir, clock,
+ raftGroupEventsClientListener
+ );
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index a44ded2..ce8d0f5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -50,6 +50,7 @@
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -126,7 +127,14 @@
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
- raftManager = new Loza(clusterService, raftConfiguration, workDir, clock, raftGroupEventsClientListener);
+ raftManager = new Loza(
+ clusterService,
+ new NoOpMetricManager(),
+ raftConfiguration,
+ workDir,
+ clock,
+ raftGroupEventsClientListener
+ );
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c2b9b80..b88f8d3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -139,7 +139,8 @@
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NettyWorkersRegistrar;
@@ -353,7 +354,7 @@
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
- var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener);
+ var raftMgr = new Loza(clusterSvc, new NoOpMetricManager(), raftConfiguration, dir, hybridClock, raftGroupEventsClientListener);
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"), name);
@@ -632,7 +633,7 @@
lowWatermark
);
- var metricManager = new MetricManager();
+ var metricManager = new MetricManagerImpl();
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 620b143..5e54b3d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -249,7 +249,8 @@
() -> new ApplyTask(),
1,
false,
- false
+ false,
+ null
) {
@Override
public RingBuffer<ApplyTask> subscribe(NodeId group, EventHandler<ApplyTask> handler,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 2dfee69..c90b014 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -133,6 +133,7 @@
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.network.ChannelType;
@@ -418,7 +419,7 @@
vaultMgr = createVault(workDir);
- metricManager = new MetricManager();
+ metricManager = new MetricManagerImpl();
ConfigurationModules modules = loadConfigurationModules(serviceProviderClassLoader);
@@ -492,6 +493,7 @@
raftMgr = new Loza(
clusterSvc,
+ metricManager,
raftConfiguration,
workDir,
clock,
@@ -1237,6 +1239,11 @@
return failureProcessor;
}
+ @TestOnly
+ public MetricManager metricManager() {
+ return metricManager;
+ }
+
/** {@inheritDoc} */
@Override
public IgniteTransactions transactions() {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index de88550..5c5e170 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -81,7 +81,7 @@
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
@@ -191,8 +191,15 @@
public void init() {
testCluster = new TestCluster();
executionServices = nodeNames.stream().map(this::create).collect(Collectors.toList());
- prepareService = new PrepareServiceImpl("test", 0, CaffeineCacheFactory.INSTANCE, null, PLANNING_TIMEOUT, PLANNING_THREAD_COUNT,
- new MetricManager());
+ prepareService = new PrepareServiceImpl(
+ "test",
+ 0,
+ CaffeineCacheFactory.INSTANCE,
+ null,
+ PLANNING_TIMEOUT,
+ PLANNING_THREAD_COUNT,
+ new MetricManagerImpl()
+ );
parserService = new ParserServiceImpl();
prepareService.start();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 8bc31f9..4194a18 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -76,7 +76,7 @@
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
@@ -624,7 +624,7 @@
var parserService = new ParserServiceImpl();
var prepareService = new PrepareServiceImpl(clusterName, 0, CaffeineCacheFactory.INSTANCE,
new DdlSqlToCommandConverter(), PLANNING_TIMEOUT, PLANNING_THREAD_COUNT,
- mock(MetricManager.class));
+ mock(MetricManagerImpl.class));
Map<String, List<String>> owningNodesByTableName = new HashMap<>();
for (Entry<String, Map<String, ScannableTable>> entry : nodeName2tableName2table.entrySet()) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
index 035fdb0..c7933d3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
@@ -33,7 +33,7 @@
import org.apache.calcite.plan.volcano.VolcanoTimeoutException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelVisitor;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
@@ -65,7 +65,7 @@
BaseQueryContext ctx = baseQueryContext(Collections.singletonList(schema), null);
PrepareService prepareService = new PrepareServiceImpl("test", 0,
- CaffeineCacheFactory.INSTANCE, null, plannerTimeout, 1, new MetricManager());
+ CaffeineCacheFactory.INSTANCE, null, plannerTimeout, 1, new MetricManagerImpl());
prepareService.start();
try {
ParserService parserService = new ParserServiceImpl();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index 0546995..1527410 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -36,7 +36,7 @@
import java.util.stream.Stream;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -293,7 +293,7 @@
private static PrepareService createPlannerService() {
PrepareService service = new PrepareServiceImpl("test", 1_000, CaffeineCacheFactory.INSTANCE,
- mock(DdlSqlToCommandConverter.class), 5_000, 2, mock(MetricManager.class));
+ mock(DdlSqlToCommandConverter.class), 5_000, 2, mock(MetricManagerImpl.class));
createdServices.add(service);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index b57a2f9..352e02d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
@@ -46,7 +47,7 @@
@Test
public void plannerCacheStatisticsTest() throws Exception {
- MetricManager metricManager = new MetricManager();
+ MetricManager metricManager = new MetricManagerImpl();
// Run clean up tasks in the current thread, so no eviction event is delayed.
CacheFactory cacheFactory = CaffeineCacheFactory.create(Runnable::run);
PrepareService prepareService = new PrepareServiceImpl("test", 2, cacheFactory, null, 15_000L, 2, metricManager);
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index a9cce38..327a2b3 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -109,6 +109,7 @@
testFixturesImplementation(testFixtures(project(':ignite-placement-driver-api')))
testFixturesImplementation(testFixtures(project(':ignite-low-watermark')))
testFixturesImplementation(testFixtures(project(':ignite-failure-handler')))
+ testFixturesImplementation(testFixtures(project(':ignite-metrics')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.fastutil.core
testFixturesImplementation libs.mockito.core
@@ -137,6 +138,7 @@
integrationTestImplementation project(':ignite-sql-engine')
integrationTestImplementation project(':ignite-failure-handler')
integrationTestImplementation project(':ignite-low-watermark')
+ integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation(testFixtures(project))
integrationTestImplementation(testFixtures(project(':ignite-api')))
integrationTestImplementation(testFixtures(project(':ignite-core')))
@@ -154,6 +156,7 @@
integrationTestImplementation(testFixtures(project(':ignite-runner')))
integrationTestImplementation(testFixtures(project(':ignite-low-watermark')))
integrationTestImplementation(testFixtures(project(':ignite-failure-handler')))
+ integrationTestImplementation(testFixtures(project(':ignite-metrics')))
integrationTestImplementation libs.fastutil.core
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.calcite.core
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 223e1ae..4884d64 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -46,6 +46,7 @@
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lang.SafeTimeReorderException;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -262,6 +263,7 @@
raftManager = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfiguration,
workDir.resolve(nodeName + "_loza"),
new HybridClockImpl(),
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 73b885d..0c742a9 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -139,6 +139,7 @@
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -1034,7 +1035,14 @@
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
- raftManager = spy(new Loza(clusterService, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener));
+ raftManager = spy(new Loza(
+ clusterService,
+ new NoOpMetricManager(),
+ raftConfiguration,
+ dir,
+ hybridClock,
+ raftGroupEventsClientListener
+ ));
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7f57021..a7b3fe4 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -83,6 +83,7 @@
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -388,6 +389,7 @@
var raftSrv = new Loza(
clusterService,
+ new NoOpMetricManager(),
raftConfig,
workDir.resolve("node" + i),
clock