HDDS-2071. Support filters in ozone insight point
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
index d67a759..2f4f821 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
@@ -59,7 +59,7 @@
try {
if (logger.isTraceEnabled()) {
logger.trace(
- "{} {} request is received: <json>{}</json>",
+ "[service={}] [type={}] request is received: <json>{}</json>",
serviceName,
type.toString(),
request.toString().replaceAll("\n", "\\\\n"));
@@ -73,7 +73,7 @@
if (logger.isTraceEnabled()) {
logger.trace(
- "{} {} request is processed. Response: "
+ "[service={}] [type={}] request is processed. Response: "
+ "<json>{}</json>",
serviceName,
type.toString(),
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 91e0153..766b3a9 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -163,12 +163,12 @@
queuedCount.incrementAndGet();
if (LOG.isTraceEnabled()) {
LOG.debug(
- "Delivering event {} to executor/handler {}: <json>{}</json>",
+ "Delivering [event={}] to executor/handler {}: <json>{}</json>",
event.getName(),
executorAndHandlers.getKey().getName(),
TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n"));
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Delivering event {} to executor/handler {}: {}",
+ LOG.debug("Delivering [event={}] to executor/handler {}: {}",
event.getName(),
executorAndHandlers.getKey().getName(),
payload.getClass().getSimpleName());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index ed65ed3..7f88be4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -142,7 +142,6 @@
}
}
-
/**
* Returns all datanode that are in the given state. This function works by
* taking a snapshot of the current collection and then returning the list
@@ -154,7 +153,7 @@
@Override
public List<DatanodeDetails> getNodes(NodeState nodestate) {
return nodeStateManager.getNodes(nodestate).stream()
- .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
+ .map(node -> (DatanodeDetails) node).collect(Collectors.toList());
}
/**
@@ -165,7 +164,7 @@
@Override
public List<DatanodeDetails> getAllNodes() {
return nodeStateManager.getAllNodes().stream()
- .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
+ .map(node -> (DatanodeDetails) node).collect(Collectors.toList());
}
/**
@@ -229,11 +228,11 @@
* SCM.
*
* @param datanodeDetails - Send datanodeDetails with Node info.
- * This function generates and assigns new datanode ID
- * for the datanode. This allows SCM to be run independent
- * of Namenode if required.
- * @param nodeReport NodeReport.
- *
+ * This function generates and assigns new datanode ID
+ * for the datanode. This allows SCM to be run
+ * independent
+ * of Namenode if required.
+ * @param nodeReport NodeReport.
* @return SCMHeartbeatResponseProto
*/
@Override
@@ -336,7 +335,7 @@
*/
@Override
public void processNodeReport(DatanodeDetails datanodeDetails,
- NodeReportProto nodeReport) {
+ NodeReportProto nodeReport) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing node report from [datanode={}]",
datanodeDetails.getHostName());
@@ -361,6 +360,7 @@
/**
* Returns the aggregated node stats.
+ *
* @return the aggregated node stats.
*/
@Override
@@ -379,6 +379,7 @@
/**
* Return a map of node stats.
+ *
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
@@ -386,7 +387,7 @@
final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
- final List<DatanodeInfo> healthyNodes = nodeStateManager
+ final List<DatanodeInfo> healthyNodes = nodeStateManager
.getNodes(NodeState.HEALTHY);
final List<DatanodeInfo> staleNodes = nodeStateManager
.getNodes(NodeState.STALE);
@@ -404,6 +405,7 @@
/**
* Return the node stat of the specified datanode.
+ *
* @param datanodeDetails - datanode ID.
* @return node stat if it is live/stale, null if it is decommissioned or
* doesn't exist.
@@ -440,7 +442,7 @@
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
- for(NodeState state : NodeState.values()) {
+ for (NodeState state : NodeState.values()) {
nodeCountMap.put(state.toString(), getNodeCount(state));
}
return nodeCountMap;
@@ -458,7 +460,7 @@
long ssdUsed = 0L;
long ssdRemaining = 0L;
- List<DatanodeInfo> healthyNodes = nodeStateManager
+ List<DatanodeInfo> healthyNodes = nodeStateManager
.getNodes(NodeState.HEALTHY);
List<DatanodeInfo> staleNodes = nodeStateManager
.getNodes(NodeState.STALE);
@@ -494,9 +496,9 @@
return nodeInfo;
}
-
/**
* Get set of pipelines a datanode is part of.
+ *
* @param datanodeDetails - datanodeID
* @return Set of PipelineID
*/
@@ -505,9 +507,9 @@
return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
}
-
/**
* Add pipeline information in the NodeManager.
+ *
* @param pipeline - Pipeline to be added
*/
@Override
@@ -517,6 +519,7 @@
/**
* Remove a pipeline information from the NodeManager.
+ *
* @param pipeline - Pipeline to be removed
*/
@Override
@@ -526,17 +529,18 @@
@Override
public void addContainer(final DatanodeDetails datanodeDetails,
- final ContainerID containerId)
+ final ContainerID containerId)
throws NodeNotFoundException {
nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId);
}
/**
* Update set of containers available on a datanode.
+ *
* @param datanodeDetails - DatanodeID
- * @param containerIds - Set of containerIDs
+ * @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
- * use addDatanodeInContainerMap call.
+ * use addDatanodeInContainerMap call.
*/
@Override
public void setContainers(DatanodeDetails datanodeDetails,
@@ -547,6 +551,7 @@
/**
* Return set of containerIDs available on a datanode.
+ *
* @param datanodeDetails - DatanodeID
* @return - set of containerIDs
*/
@@ -570,7 +575,7 @@
* DATANODE_COMMAND to the Queue.
*
* @param commandForDatanode DatanodeCommand
- * @param ignored publisher
+ * @param ignored publisher
*/
@Override
public void onMessage(CommandForDatanode commandForDatanode,
@@ -653,6 +658,7 @@
/**
* Test utility to stop heartbeat check process.
+ *
* @return ScheduledFuture of next scheduled check that got cancelled.
*/
@VisibleForTesting
@@ -662,6 +668,7 @@
/**
* Test utility to resume the paused heartbeat check process.
+ *
* @return ScheduledFuture of the next scheduled check
*/
@VisibleForTesting
@@ -671,6 +678,7 @@
/**
* Test utility to get the count of skipped heartbeat check iterations.
+ *
* @return count of skipped heartbeat check iterations
*/
@VisibleForTesting
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
index a23b876..1cc4deb 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
@@ -185,4 +186,18 @@
metrics.add(performance);
}
+ @Override
+ public boolean filterLog(Map<String, String> filters, String logLine) {
+ if (filters == null) {
+ return true;
+ }
+ boolean result = true;
+ for (Entry<String, String> entry : filters.entrySet()) {
+ if (!logLine.matches(
+ String.format(".*\\[%s=%s\\].*", entry.getKey(), entry.getValue()))) {
+ result = result & false;
+ }
+ }
+ return result;
+ }
}
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
index 1284cfa..57e1ddd 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.insight;
import java.util.List;
+import java.util.Map;
/**
* Definition of a specific insight points.
@@ -44,6 +45,9 @@
*/
List<Class> getConfigurationClasses();
-
+ /**
+ * Decide if the specific log should be displayed or not..
+ */
+ boolean filterLog(Map<String, String> filters, String logLine);
}
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
index 2e8787f..0a06fe7 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java
@@ -23,8 +23,10 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -59,6 +61,10 @@
+ "show more information / detailed message")
private boolean verbose;
+ @CommandLine.Option(names = "-f", description = "Enable verbose mode to "
+ + "show more information / detailed message")
+ private Map<String, String> filters;
+
@Override
public Void call() throws Exception {
OzoneConfiguration conf =
@@ -76,7 +82,8 @@
Set<Component> sources = loggers.stream().map(LoggerSource::getComponent)
.collect(Collectors.toSet());
try {
- streamLog(conf, sources, loggers);
+ streamLog(conf, sources, loggers,
+ (logLine) -> insight.filterLog(filters, logLine));
} finally {
for (LoggerSource logger : loggers) {
setLogLevel(conf, logger.getLoggerName(), logger.getComponent(),
@@ -86,12 +93,20 @@
return null;
}
+ /**
+ * Stream log from multiple endpoint.
+ *
+ * @param conf Configuration (to find the log endpoints)
+ * @param sources Components to connect to (like scm, om...)
+ * @param relatedLoggers loggers to display
+ * @param filter any additional filter
+ */
private void streamLog(OzoneConfiguration conf, Set<Component> sources,
- List<LoggerSource> relatedLoggers) {
+ List<LoggerSource> relatedLoggers, Function<String, Boolean> filter) {
List<Thread> loggers = new ArrayList<>();
for (Component sourceComponent : sources) {
loggers.add(new Thread(
- () -> streamLog(conf, sourceComponent, relatedLoggers)));
+ () -> streamLog(conf, sourceComponent, relatedLoggers, filter)));
}
for (Thread thread : loggers) {
thread.start();
@@ -106,7 +121,7 @@
}
private void streamLog(OzoneConfiguration conf, Component logComponent,
- List<LoggerSource> loggers) {
+ List<LoggerSource> loggers, Function<String, Boolean> filter) {
HttpClient client = HttpClientBuilder.create().build();
HttpGet get = new HttpGet(getHost(conf, logComponent) + "/logstream");
@@ -118,7 +133,8 @@
bufferedReader.lines()
.filter(line -> {
for (LoggerSource logger : loggers) {
- if (line.contains(logger.getLoggerName())) {
+ if (line.contains(logger.getLoggerName()) && filter
+ .apply(line)) {
return true;
}
}
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
index b87955e..41130677 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -72,4 +73,8 @@
return "More information about one ratis datanode ring.";
}
+ @Override
+ public boolean filterLog(Map<String, String> filters, String logLine) {
+ return true;
+ }
}
diff --git a/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java
new file mode 100644
index 0000000..42fdb39
--- /dev/null
+++ b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.insight;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test common insight point utility methods.
+ */
+public class TestBaseInsightPoint {
+
+ @Test
+ public void filterLog() {
+
+ BaseInsightPoint insightPoint = new BaseInsightPoint() {
+ @Override
+ public String getDescription() {
+ return "test";
+ }
+ };
+
+ //with simple filter
+ Map<String, String> filters = new HashMap<>();
+ filters.put("datanode", "123");
+
+ Assert.assertTrue(insightPoint
+ .filterLog(filters, "This a log specific to [datanode=123]"));
+
+ Assert.assertFalse(insightPoint
+ .filterLog(filters, "This a log specific to [datanode=234]"));
+
+ //with empty filters
+ Assert.assertTrue(insightPoint
+ .filterLog(new HashMap<>(), "This a log specific to [datanode=234]"));
+
+ //with multiple filters
+ filters.clear();
+ filters.put("datanode", "123");
+ filters.put("pipeline", "abcd");
+
+ Assert.assertFalse(insightPoint
+ .filterLog(filters, "This a log specific to [datanode=123]"));
+
+ Assert.assertTrue(insightPoint
+ .filterLog(filters,
+ "This a log specific to [datanode=123] [pipeline=abcd]"));
+
+ Assert.assertFalse(insightPoint
+ .filterLog(filters,
+ "This a log specific to [datanode=456] [pipeline=abcd]"));
+
+ }
+}
\ No newline at end of file