HADOOP-15340. Provide meaningful RPC server name for RpcMetrics. Contributed by Elek Marton.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 639bbad..70fde60 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -419,8 +419,9 @@
String portRangeConfig)
throws IOException {
super(bindAddress, port, null, numHandlers,
- numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
- .getClass().getName()), secretManager, portRangeConfig);
+ numReaders, queueSizePerHandler, conf,
+ serverNameFromClass(protocolImpl.getClass()), secretManager,
+ portRangeConfig);
this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 8f8eda6..9cfadc7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -35,6 +35,8 @@
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.net.SocketFactory;
@@ -808,13 +810,45 @@
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
- boolean verbose;
- static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
+
+ boolean verbose;
+
+ private static final Pattern COMPLEX_SERVER_NAME_PATTERN =
+ Pattern.compile("(?:[^\\$]*\\$)*([A-Za-z][^\\$]+)(?:\\$\\d+)?");
+
+ /**
+ * Get a meaningful and short name for a server based on a java class.
+ *
+ * The rules are defined to support the current naming schema of the
+ * generated protobuf classes where the final class usually an anonymous
+ * inner class of an inner class.
+ *
+ * 1. For simple classes it returns with the simple name of the classes
+ * (with the name without package name)
+ *
+ * 2. For inner classes, this is the simple name of the inner class.
+ *
+ * 3. If it is an Object created from a class factory
+ * E.g., org.apache.hadoop.ipc.TestRPC$TestClass$2
+ * this method returns parent class TestClass.
+ *
+ * 4. If it is an anonymous class E.g., 'org.apache.hadoop.ipc.TestRPC$10'
+ * serverNameFromClass returns parent class TestRPC.
+ *
+ *
+ */
+ static String serverNameFromClass(Class<?> clazz) {
+ String name = clazz.getName();
+ String[] names = clazz.getName().split("\\.", -1);
+ if (names != null && names.length > 0) {
+ name = names[names.length - 1];
}
- return names[names.length-1];
+ Matcher matcher = COMPLEX_SERVER_NAME_PATTERN.matcher(name);
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ return name;
+ }
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c5da3b1..76d9c40 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -140,6 +140,10 @@
private RpcSaslProto negotiateResponse;
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
private Tracer tracer;
+ /**
+ * Logical name of the server used in metrics and monitor.
+ */
+ private final String serverName;
/**
* Add exception classes for which server won't log stack traces.
@@ -2768,6 +2772,7 @@
this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
+ this.serverName = serverName;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
@@ -3509,4 +3514,8 @@
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
+
+ public String getServerName() {
+ return serverName;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index f2b5862..0497931 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -406,7 +406,7 @@
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
- classNameBase(protocolImpl.getClass().getName()), secretManager,
+ serverNameFromClass(protocolImpl.getClass()), secretManager,
portRangeConfig);
this.verbose = verbose;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index d53d7d3..a36bcd8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.ipc.metrics;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -50,7 +52,9 @@
String port = String.valueOf(server.getListenerAddress().getPort());
name = "RpcActivityForPort" + port;
this.server = server;
- registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
+ registry = new MetricsRegistry("rpc")
+ .tag("port", "RPC port", port)
+ .tag("serverName", "Name of the RPC server", server.getServerName());
int[] intervals = conf.getInts(
CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
@@ -292,4 +296,9 @@
public double getDeferredRpcProcessingStdDev() {
return deferredRpcProcessingTime.lastStat().stddev();
}
+
+ @VisibleForTesting
+ public MetricsTag getTag(String tagName) {
+ return registry.getTag(tagName);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index c6209d2..b596642 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -90,8 +91,6 @@
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -447,6 +446,15 @@
assertCounterGt("SentBytes", 0L, rb);
assertCounterGt("ReceivedBytes", 0L, rb);
+ // Check tags of the metrics
+ assertEquals("" + server.getPort(),
+ server.getRpcMetrics().getTag("port").value());
+
+ assertEquals("TestProtobufRpcProto",
+ server.getRpcMetrics().getTag("serverName").value());
+
+
+
// Number of calls to echo method should be 2
rb = getMetrics(server.rpcDetailedMetrics.name());
assertCounter("EchoNumOps", 2L, rb);
@@ -1362,6 +1370,50 @@
}
}
+ @Test
+ public void testServerNameFromClass() {
+ Assert.assertEquals("TestRPC",
+ RPC.Server.serverNameFromClass(this.getClass()));
+ Assert.assertEquals("TestClass",
+ RPC.Server.serverNameFromClass(TestRPC.TestClass.class));
+
+ Object testing = new TestClass().classFactory();
+ Assert.assertEquals("Embedded",
+ RPC.Server.serverNameFromClass(testing.getClass()));
+
+ testing = new TestClass().classFactoryAbstract();
+ Assert.assertEquals("TestClass",
+ RPC.Server.serverNameFromClass(testing.getClass()));
+
+ testing = new TestClass().classFactoryObject();
+ Assert.assertEquals("TestClass",
+ RPC.Server.serverNameFromClass(testing.getClass()));
+
+ }
+
+ static class TestClass {
+ class Embedded {
+ }
+
+ abstract class AbstractEmbedded {
+
+ }
+
+ private Object classFactory() {
+ return new Embedded();
+ }
+
+ private Object classFactoryAbstract() {
+ return new AbstractEmbedded() {
+ };
+ }
+
+ private Object classFactoryObject() {
+ return new Object() {
+ };
+ }
+
+ }
public static class FakeRequestClass extends RpcWritable {
static volatile IOException exception;
@Override