HDDS-2071. Support filters in ozone insight point
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
index d67a759..2f4f821 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
@@ -59,7 +59,7 @@
     try {
       if (logger.isTraceEnabled()) {
         logger.trace(
-            "{} {} request is received: <json>{}</json>",
+            "[service={}] [type={}] request is received: <json>{}</json>",
             serviceName,
             type.toString(),
             request.toString().replaceAll("\n", "\\\\n"));
@@ -73,7 +73,7 @@
 
       if (logger.isTraceEnabled()) {
         logger.trace(
-            "{} {} request is processed. Response: "
+            "[service={}] [type={}] request is processed. Response: "
                 + "<json>{}</json>",
             serviceName,
             type.toString(),
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 91e0153..766b3a9 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -163,12 +163,12 @@
           queuedCount.incrementAndGet();
           if (LOG.isTraceEnabled()) {
             LOG.debug(
-                "Delivering event {} to executor/handler {}: <json>{}</json>",
+                "Delivering [event={}] to executor/handler {}: <json>{}</json>",
                 event.getName(),
                 executorAndHandlers.getKey().getName(),
                 TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n"));
           } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Delivering event {} to executor/handler {}: {}",
+            LOG.debug("Delivering [event={}] to executor/handler {}: {}",
                 event.getName(),
                 executorAndHandlers.getKey().getName(),
                 payload.getClass().getSimpleName());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index ed65ed3..7f88be4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -142,7 +142,6 @@
     }
   }
 
-
   /**
    * Returns all datanode that are in the given state. This function works by
    * taking a snapshot of the current collection and then returning the list
@@ -154,7 +153,7 @@
   @Override
   public List<DatanodeDetails> getNodes(NodeState nodestate) {
     return nodeStateManager.getNodes(nodestate).stream()
-        .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
+        .map(node -> (DatanodeDetails) node).collect(Collectors.toList());
   }
 
   /**
@@ -165,7 +164,7 @@
   @Override
   public List<DatanodeDetails> getAllNodes() {
     return nodeStateManager.getAllNodes().stream()
-        .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
+        .map(node -> (DatanodeDetails) node).collect(Collectors.toList());
   }
 
   /**
@@ -229,11 +228,11 @@
    * SCM.
    *
    * @param datanodeDetails - Send datanodeDetails with Node info.
-   *                   This function generates and assigns new datanode ID
-   *                   for the datanode. This allows SCM to be run independent
-   *                   of Namenode if required.
-   * @param nodeReport NodeReport.
-   *
+   *                        This function generates and assigns new datanode ID
+   *                        for the datanode. This allows SCM to be run
+   *                        independent
+   *                        of Namenode if required.
+   * @param nodeReport      NodeReport.
    * @return SCMHeartbeatResponseProto
    */
   @Override
@@ -336,7 +335,7 @@
    */
   @Override
   public void processNodeReport(DatanodeDetails datanodeDetails,
-                                NodeReportProto nodeReport) {
+      NodeReportProto nodeReport) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing node report from [datanode={}]",
           datanodeDetails.getHostName());
@@ -361,6 +360,7 @@
 
   /**
    * Returns the aggregated node stats.
+   *
    * @return the aggregated node stats.
    */
   @Override
@@ -379,6 +379,7 @@
 
   /**
    * Return a map of node stats.
+   *
    * @return a map of individual node stats (live/stale but not dead).
    */
   @Override
@@ -386,7 +387,7 @@
 
     final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
 
-    final List<DatanodeInfo> healthyNodes =  nodeStateManager
+    final List<DatanodeInfo> healthyNodes = nodeStateManager
         .getNodes(NodeState.HEALTHY);
     final List<DatanodeInfo> staleNodes = nodeStateManager
         .getNodes(NodeState.STALE);
@@ -404,6 +405,7 @@
 
   /**
    * Return the node stat of the specified datanode.
+   *
    * @param datanodeDetails - datanode ID.
    * @return node stat if it is live/stale, null if it is decommissioned or
    * doesn't exist.
@@ -440,7 +442,7 @@
   @Override
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
-    for(NodeState state : NodeState.values()) {
+    for (NodeState state : NodeState.values()) {
       nodeCountMap.put(state.toString(), getNodeCount(state));
     }
     return nodeCountMap;
@@ -458,7 +460,7 @@
     long ssdUsed = 0L;
     long ssdRemaining = 0L;
 
-    List<DatanodeInfo> healthyNodes =  nodeStateManager
+    List<DatanodeInfo> healthyNodes = nodeStateManager
         .getNodes(NodeState.HEALTHY);
     List<DatanodeInfo> staleNodes = nodeStateManager
         .getNodes(NodeState.STALE);
@@ -494,9 +496,9 @@
     return nodeInfo;
   }
 
-
   /**
    * Get set of pipelines a datanode is part of.
+   *
    * @param datanodeDetails - datanodeID
    * @return Set of PipelineID
    */
@@ -505,9 +507,9 @@
     return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
   }
 
-
   /**
    * Add pipeline information in the NodeManager.
+   *
    * @param pipeline - Pipeline to be added
    */
   @Override
@@ -517,6 +519,7 @@
 
   /**
    * Remove a pipeline information from the NodeManager.
+   *
    * @param pipeline - Pipeline to be removed
    */
   @Override
@@ -526,17 +529,18 @@
 
   @Override
   public void addContainer(final DatanodeDetails datanodeDetails,
-                           final ContainerID containerId)
+      final ContainerID containerId)
       throws NodeNotFoundException {
     nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId);
   }
 
   /**
    * Update set of containers available on a datanode.
+   *
    * @param datanodeDetails - DatanodeID
-   * @param containerIds - Set of containerIDs
+   * @param containerIds    - Set of containerIDs
    * @throws NodeNotFoundException - if datanode is not known. For new datanode
-   *                        use addDatanodeInContainerMap call.
+   *                               use addDatanodeInContainerMap call.
    */
   @Override
   public void setContainers(DatanodeDetails datanodeDetails,
@@ -547,6 +551,7 @@
 
   /**
    * Return set of containerIDs available on a datanode.
+   *
    * @param datanodeDetails - DatanodeID
    * @return - set of containerIDs
    */
@@ -570,7 +575,7 @@
    * DATANODE_COMMAND to the Queue.
    *
    * @param commandForDatanode DatanodeCommand
-   * @param ignored publisher
+   * @param ignored            publisher
    */
   @Override
   public void onMessage(CommandForDatanode commandForDatanode,
@@ -653,6 +658,7 @@
 
   /**
    * Test utility to stop heartbeat check process.
+   *
    * @return ScheduledFuture of next scheduled check that got cancelled.
    */
   @VisibleForTesting
@@ -662,6 +668,7 @@
 
   /**
    * Test utility to resume the paused heartbeat check process.
+   *
    * @return ScheduledFuture of the next scheduled check
    */
   @VisibleForTesting
@@ -671,6 +678,7 @@
 
   /**
    * Test utility to get the count of skipped heartbeat check iterations.
+   *
    * @return count of skipped heartbeat check iterations
    */
   @VisibleForTesting
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
index a23b876..1cc4deb 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -185,4 +186,18 @@
     metrics.add(performance);
   }
 
+  @Override
+  public boolean filterLog(Map<String, String> filters, String logLine) {
+    if (filters == null) {
+      return true;
+    }
+    boolean result = true;
+    for (Entry<String, String> entry : filters.entrySet()) {
+      if (!logLine.matches(
+          String.format(".*\\[%s=%s\\].*", entry.getKey(), entry.getValue()))) {
+        result = result & false;
+      }
+    }
+    return result;
+  }
 }
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
index 1284cfa..57e1ddd 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.insight;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Definition of a specific insight points.
@@ -44,6 +45,9 @@
    */
   List<Class> getConfigurationClasses();
 
-
+  /**
+   * Decide if the specific log should be displayed or not..
+   */
+  boolean filterLog(Map<String, String> filters, String logLine);
 
 }
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
index 2e8787f..0a06fe7 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
@@ -23,8 +23,10 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -59,6 +61,10 @@
       + "show more information / detailed message")
   private boolean verbose;
 
+  @CommandLine.Option(names = "-f", description = "Enable verbose mode to "
+      + "show more information / detailed message")
+  private Map<String, String> filters;
+
   @Override
   public Void call() throws Exception {
     OzoneConfiguration conf =
@@ -76,7 +82,8 @@
     Set<Component> sources = loggers.stream().map(LoggerSource::getComponent)
         .collect(Collectors.toSet());
     try {
-      streamLog(conf, sources, loggers);
+      streamLog(conf, sources, loggers,
+          (logLine) -> insight.filterLog(filters, logLine));
     } finally {
       for (LoggerSource logger : loggers) {
         setLogLevel(conf, logger.getLoggerName(), logger.getComponent(),
@@ -86,12 +93,20 @@
     return null;
   }
 
+  /**
+   * Stream log from multiple endpoint.
+   *
+   * @param conf           Configuration (to find the log endpoints)
+   * @param sources        Components to connect to (like scm, om...)
+   * @param relatedLoggers loggers to display
+   * @param filter         any additional filter
+   */
   private void streamLog(OzoneConfiguration conf, Set<Component> sources,
-      List<LoggerSource> relatedLoggers) {
+      List<LoggerSource> relatedLoggers, Function<String, Boolean> filter) {
     List<Thread> loggers = new ArrayList<>();
     for (Component sourceComponent : sources) {
       loggers.add(new Thread(
-          () -> streamLog(conf, sourceComponent, relatedLoggers)));
+          () -> streamLog(conf, sourceComponent, relatedLoggers, filter)));
     }
     for (Thread thread : loggers) {
       thread.start();
@@ -106,7 +121,7 @@
   }
 
   private void streamLog(OzoneConfiguration conf, Component logComponent,
-      List<LoggerSource> loggers) {
+      List<LoggerSource> loggers, Function<String, Boolean> filter) {
     HttpClient client = HttpClientBuilder.create().build();
 
     HttpGet get = new HttpGet(getHost(conf, logComponent) + "/logstream");
@@ -118,7 +133,8 @@
         bufferedReader.lines()
             .filter(line -> {
               for (LoggerSource logger : loggers) {
-                if (line.contains(logger.getLoggerName())) {
+                if (line.contains(logger.getLoggerName()) && filter
+                    .apply(line)) {
                   return true;
                 }
               }
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
index b87955e..41130677 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -72,4 +73,8 @@
     return "More information about one ratis datanode ring.";
   }
 
+  @Override
+  public boolean filterLog(Map<String, String> filters, String logLine) {
+    return true;
+  }
 }
diff --git a/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java
new file mode 100644
index 0000000..42fdb39
--- /dev/null
+++ b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.insight;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test common insight point utility methods.
+ */
+public class TestBaseInsightPoint {
+
+  @Test
+  public void filterLog() {
+
+    BaseInsightPoint insightPoint = new BaseInsightPoint() {
+      @Override
+      public String getDescription() {
+        return "test";
+      }
+    };
+
+    //with simple filter
+    Map<String, String> filters = new HashMap<>();
+    filters.put("datanode", "123");
+
+    Assert.assertTrue(insightPoint
+        .filterLog(filters, "This a log specific to [datanode=123]"));
+
+    Assert.assertFalse(insightPoint
+        .filterLog(filters, "This a log specific to [datanode=234]"));
+
+    //with empty filters
+    Assert.assertTrue(insightPoint
+        .filterLog(new HashMap<>(), "This a log specific to [datanode=234]"));
+
+    //with multiple filters
+    filters.clear();
+    filters.put("datanode", "123");
+    filters.put("pipeline", "abcd");
+
+    Assert.assertFalse(insightPoint
+        .filterLog(filters, "This a log specific to [datanode=123]"));
+
+    Assert.assertTrue(insightPoint
+        .filterLog(filters,
+            "This a log specific to [datanode=123] [pipeline=abcd]"));
+
+    Assert.assertFalse(insightPoint
+        .filterLog(filters,
+            "This a log specific to [datanode=456] [pipeline=abcd]"));
+
+  }
+}
\ No newline at end of file