[FLINK-19130] [core] Rename MetricsFactory to FunctionTypeMetricsFactory
Now that we have multiple types of metrics with different scopes
(per-function / per-operator), this interface should be renamed to
convey that it is a factory specific for per-function scoped metrics.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index 44686e0..4a90a20 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -32,11 +32,11 @@
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FlinkFuncionTypeMetricsFactory;
import org.apache.flink.statefun.flink.core.metrics.FlinkFunctionDispatcherMetrics;
-import org.apache.flink.statefun.flink.core.metrics.FlinkMetricsFactory;
+import org.apache.flink.statefun.flink.core.metrics.FuncionTypeMetricsFactory;
import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
-import org.apache.flink.statefun.flink.core.metrics.MetricsFactory;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
@@ -103,7 +103,10 @@
container.add("function-loader", FunctionLoader.class, PredefinedFunctionLoader.class);
container.add(Reductions.class);
container.add(LocalFunctionGroup.class);
- container.add("metrics-factory", MetricsFactory.class, new FlinkMetricsFactory(metricGroup));
+ container.add(
+ "function-metrics-factory",
+ FuncionTypeMetricsFactory.class,
+ new FlinkFuncionTypeMetricsFactory(metricGroup));
container.add(
"function-dispatcher-metrics",
FunctionDispatcherMetrics.class,
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StatefulFunctionRepository.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StatefulFunctionRepository.java
index d2af982..5167c82 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StatefulFunctionRepository.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StatefulFunctionRepository.java
@@ -23,9 +23,9 @@
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FuncionTypeMetricsFactory;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
-import org.apache.flink.statefun.flink.core.metrics.MetricsFactory;
import org.apache.flink.statefun.flink.core.state.FlinkStateBinder;
import org.apache.flink.statefun.flink.core.state.PersistedStates;
import org.apache.flink.statefun.flink.core.state.State;
@@ -36,18 +36,18 @@
private final ObjectOpenHashMap<FunctionType, StatefulFunction> instances;
private final State flinkState;
private final FunctionLoader functionLoader;
- private final MetricsFactory metricsFactory;
+ private final FuncionTypeMetricsFactory metricsFactory;
private final MessageFactory messageFactory;
@Inject
StatefulFunctionRepository(
@Label("function-loader") FunctionLoader functionLoader,
- @Label("metrics-factory") MetricsFactory metricsFactory,
+ @Label("function-metrics-factory") FuncionTypeMetricsFactory functionMetricsFactory,
@Label("state") State state,
MessageFactory messageFactory) {
this.instances = new ObjectOpenHashMap<>();
this.functionLoader = Objects.requireNonNull(functionLoader);
- this.metricsFactory = Objects.requireNonNull(metricsFactory);
+ this.metricsFactory = Objects.requireNonNull(functionMetricsFactory);
this.flinkState = Objects.requireNonNull(state);
this.messageFactory = Objects.requireNonNull(messageFactory);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkMetricsFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFuncionTypeMetricsFactory.java
similarity index 90%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkMetricsFactory.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFuncionTypeMetricsFactory.java
index d65c896..4d003bb 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkMetricsFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFuncionTypeMetricsFactory.java
@@ -21,11 +21,11 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.statefun.sdk.FunctionType;
-public class FlinkMetricsFactory implements MetricsFactory {
+public class FlinkFuncionTypeMetricsFactory implements FuncionTypeMetricsFactory {
private final MetricGroup metricGroup;
- public FlinkMetricsFactory(MetricGroup metricGroup) {
+ public FlinkFuncionTypeMetricsFactory(MetricGroup metricGroup) {
this.metricGroup = Objects.requireNonNull(metricGroup);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/MetricsFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FuncionTypeMetricsFactory.java
similarity index 95%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/MetricsFactory.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FuncionTypeMetricsFactory.java
index 7b558f3..2634ed0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/MetricsFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FuncionTypeMetricsFactory.java
@@ -19,7 +19,7 @@
import org.apache.flink.statefun.sdk.FunctionType;
-public interface MetricsFactory {
+public interface FuncionTypeMetricsFactory {
FunctionTypeMetrics forType(FunctionType functionType);
}