[improve] add lookup mertics (#377)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java
new file mode 100644
index 0000000..a86379c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+
+/** lookup join metrics. */
+public class LookupMetrics implements Serializable {
+ public static final String HIT_COUNT = "hitCount";
+ public static final String MISS_COUNT = "missCount";
+ public static final String LOAD_COUNT = "loadCount";
+ private transient Counter hitCounter;
+ private transient Counter missCounter;
+ private transient Counter loadCounter;
+
+ public LookupMetrics(MetricGroup metricGroup) {
+ hitCounter = metricGroup.counter(HIT_COUNT);
+ missCounter = metricGroup.counter(MISS_COUNT);
+ loadCounter = metricGroup.counter(LOAD_COUNT);
+ }
+
+ public void incHitCount() {
+ hitCounter.inc();
+ }
+
+ public void incMissCount() {
+ missCounter.inc();
+ }
+
+ public void incLoadCount() {
+ loadCounter.inc();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
index a20a9e1..2cd3c99 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -31,6 +31,7 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupMetrics;
import org.apache.doris.flink.lookup.LookupSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@
private transient Cache<RowData, List<RowData>> cache;
private DorisLookupReader lookupReader;
private LookupSchema lookupSchema;
+ private LookupMetrics lookupMetrics;
public DorisRowDataAsyncLookupFunction(
DorisOptions options,
@@ -91,6 +93,7 @@
.maximumSize(cacheMaxSize)
.build();
this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+ this.lookupMetrics = new LookupMetrics(context.getMetricGroup());
}
/** This is a lookup method which is called by Flink framework in runtime. */
@@ -100,10 +103,17 @@
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
+ lookupMetrics.incHitCount();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lookup cache hit for key: {}", keyRow);
+ }
future.complete(cachedRows);
return;
+ } else {
+ lookupMetrics.incMissCount();
}
}
+
CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
resultFuture.handleAsync(
(resultRows, throwable) -> {
@@ -114,11 +124,13 @@
if (resultRows == null || resultRows.isEmpty()) {
if (cache != null) {
cache.put(keyRow, Collections.emptyList());
+ lookupMetrics.incLoadCount();
}
future.complete(Collections.emptyList());
} else {
if (cache != null) {
cache.put(keyRow, resultRows);
+ lookupMetrics.incLoadCount();
}
future.complete(resultRows);
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 4376a53..a4b0767 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -31,6 +31,7 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupMetrics;
import org.apache.doris.flink.lookup.LookupSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@
private transient Cache<RowData, List<RowData>> cache;
private DorisLookupReader lookupReader;
private LookupSchema lookupSchema;
+ private LookupMetrics lookupMetrics;
public DorisRowDataJdbcLookupFunction(
DorisOptions options,
@@ -84,6 +86,7 @@
.maximumSize(cacheMaxSize)
.build();
this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+ this.lookupMetrics = new LookupMetrics(context.getMetricGroup());
}
/**
@@ -96,10 +99,16 @@
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
+ lookupMetrics.incHitCount();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lookup cache hit for key: {}", keyRow);
+ }
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
return;
+ } else {
+ lookupMetrics.incMissCount();
}
}
queryRecord(keyRow);
@@ -112,6 +121,7 @@
}
if (cache != null) {
cache.put(keyRow, rowData);
+ lookupMetrics.incLoadCount();
}
rowData.forEach(this::collect);
}