[improve] add lookup mertics (#377)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java
new file mode 100644
index 0000000..a86379c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupMetrics.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+
+/** lookup join metrics. */
+public class LookupMetrics implements Serializable {
+    public static final String HIT_COUNT = "hitCount";
+    public static final String MISS_COUNT = "missCount";
+    public static final String LOAD_COUNT = "loadCount";
+    private transient Counter hitCounter;
+    private transient Counter missCounter;
+    private transient Counter loadCounter;
+
+    public LookupMetrics(MetricGroup metricGroup) {
+        hitCounter = metricGroup.counter(HIT_COUNT);
+        missCounter = metricGroup.counter(MISS_COUNT);
+        loadCounter = metricGroup.counter(LOAD_COUNT);
+    }
+
+    public void incHitCount() {
+        hitCounter.inc();
+    }
+
+    public void incMissCount() {
+        missCounter.inc();
+    }
+
+    public void incLoadCount() {
+        loadCounter.inc();
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
index a20a9e1..2cd3c99 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -31,6 +31,7 @@
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
 import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupMetrics;
 import org.apache.doris.flink.lookup.LookupSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@
     private transient Cache<RowData, List<RowData>> cache;
     private DorisLookupReader lookupReader;
     private LookupSchema lookupSchema;
+    private LookupMetrics lookupMetrics;
 
     public DorisRowDataAsyncLookupFunction(
             DorisOptions options,
@@ -91,6 +93,7 @@
                                 .maximumSize(cacheMaxSize)
                                 .build();
         this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+        this.lookupMetrics = new LookupMetrics(context.getMetricGroup());
     }
 
     /** This is a lookup method which is called by Flink framework in runtime. */
@@ -100,10 +103,17 @@
         if (cache != null) {
             List<RowData> cachedRows = cache.getIfPresent(keyRow);
             if (cachedRows != null) {
+                lookupMetrics.incHitCount();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("lookup cache hit for key: {}", keyRow);
+                }
                 future.complete(cachedRows);
                 return;
+            } else {
+                lookupMetrics.incMissCount();
             }
         }
+
         CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
         resultFuture.handleAsync(
                 (resultRows, throwable) -> {
@@ -114,11 +124,13 @@
                             if (resultRows == null || resultRows.isEmpty()) {
                                 if (cache != null) {
                                     cache.put(keyRow, Collections.emptyList());
+                                    lookupMetrics.incLoadCount();
                                 }
                                 future.complete(Collections.emptyList());
                             } else {
                                 if (cache != null) {
                                     cache.put(keyRow, resultRows);
+                                    lookupMetrics.incLoadCount();
                                 }
                                 future.complete(resultRows);
                             }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 4376a53..a4b0767 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -31,6 +31,7 @@
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
 import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupMetrics;
 import org.apache.doris.flink.lookup.LookupSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@
     private transient Cache<RowData, List<RowData>> cache;
     private DorisLookupReader lookupReader;
     private LookupSchema lookupSchema;
+    private LookupMetrics lookupMetrics;
 
     public DorisRowDataJdbcLookupFunction(
             DorisOptions options,
@@ -84,6 +86,7 @@
                                 .maximumSize(cacheMaxSize)
                                 .build();
         this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+        this.lookupMetrics = new LookupMetrics(context.getMetricGroup());
     }
 
     /**
@@ -96,10 +99,16 @@
         if (cache != null) {
             List<RowData> cachedRows = cache.getIfPresent(keyRow);
             if (cachedRows != null) {
+                lookupMetrics.incHitCount();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("lookup cache hit for key: {}", keyRow);
+                }
                 for (RowData cachedRow : cachedRows) {
                     collect(cachedRow);
                 }
                 return;
+            } else {
+                lookupMetrics.incMissCount();
             }
         }
         queryRecord(keyRow);
@@ -112,6 +121,7 @@
         }
         if (cache != null) {
             cache.put(keyRow, rowData);
+            lookupMetrics.incLoadCount();
         }
         rowData.forEach(this::collect);
     }