HDFS-16738. Invalid CallerContext caused NullPointerException (#4791)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 63c7721..9e397b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -541,18 +541,21 @@
    * @return The actual client's machine.
    */
   public static String getClientMachine(final String[] ipProxyUsers) {
+    String clientMachine = null;
     String cc = clientInfoFromContext(ipProxyUsers);
     if (cc != null) {
       // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
       // return "1.2.3.4" as the client machine.
       String key = CallerContext.CLIENT_IP_STR +
           CallerContext.Builder.KEY_VALUE_SEPARATOR;
-      return parseSpecialValue(cc, key);
+      clientMachine = parseSpecialValue(cc, key);
     }
 
-    String clientMachine = Server.getRemoteAddress();
-    if (clientMachine == null) { //not a RPC client
-      clientMachine = "";
+    if (clientMachine == null) {
+      clientMachine = Server.getRemoteAddress();
+      if (clientMachine == null) { //not a RPC client
+        clientMachine = "";
+      }
     }
     return clientMachine;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
index 74d85bc..2960a7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
@@ -28,22 +28,29 @@
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
 
 public class TestNameNodeRpcServer {
 
@@ -91,6 +98,66 @@
   // trials. 1/3^20=3e-10, so that should be good enough.
   static final int ITERATIONS_TO_USE = 20;
 
+  @Test
+  @Timeout(30000)
+  public void testNamenodeRpcClientIpProxyWithFailBack() throws Exception {
+    // Make 3 nodes & racks so that we have a decent shot of detecting when
+    // our change overrides the random choice of datanode.
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
+    final CallerContext original = CallerContext.getCurrent();
+
+    MiniQJMHACluster qjmhaCluster = null;
+    try {
+      String baseDir = GenericTestUtils.getRandomizedTempPath();
+      MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder().numDataNodes(3);
+      qjmhaCluster = builder.baseDir(baseDir).build();
+      MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
+      dfsCluster.waitActive();
+      dfsCluster.transitionToActive(0);
+
+      // Set the caller context to set the ip address
+      CallerContext.setCurrent(
+          new CallerContext.Builder("test", conf)
+              .build());
+
+      dfsCluster.getFileSystem(0).setPermission(
+          new Path("/"), FsPermission.getDirDefault());
+
+      // Run as fake joe to authorize the test
+      UserGroupInformation joe =
+          UserGroupInformation.createUserForTesting("fake_joe",
+              new String[]{"fake_group"});
+
+      FileSystem joeFs = joe.doAs((PrivilegedExceptionAction<FileSystem>) () ->
+          FileSystem.get(dfsCluster.getURI(0), conf));
+
+      Path testPath = new Path("/foo");
+      // Write a sample file
+      FSDataOutputStream stream = joeFs.create(testPath);
+      stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
+      stream.close();
+
+      qjmhaCluster.getDfsCluster().transitionToStandby(0);
+      qjmhaCluster.getDfsCluster().transitionToActive(1);
+
+      DistributedFileSystem nn1 = dfsCluster.getFileSystem(1);
+      assertNotNull(nn1.getFileStatus(testPath));
+    } finally {
+      CallerContext.setCurrent(original);
+      if (qjmhaCluster != null) {
+        try {
+          qjmhaCluster.shutdown();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+      // Reset the config
+      conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
+    }
+  }
+
   /**
    * A test to make sure that if an authorized user adds "clientIp:" to their
    * caller context, it will be used to make locality decisions on the NN.