HIVE-24187:Handle _files creation for HA config with same nameservice name on source and destination
Export( Pravin Kumar Sinha, reviewed by Aasha Medhi)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6138b9c..04325aa 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -522,6 +522,14 @@
     REPLCMINTERVAL("hive.repl.cm.interval","3600s",
         new TimeValidator(TimeUnit.SECONDS),
         "Inteval for cmroot cleanup thread."),
+    REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE("hive.repl.ha.datapath.replace.remote.nameservice", false,
+            "When HDFS is HA enabled and both source and target clusters are configured with same nameservice name," +
+                    "enable this flag and provide a new unique logical name for representing the remote cluster " +
+                    "nameservice using config " + "'hive.repl.ha.datapath.replace.remote.nameservice.name'."),
+    REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME("hive.repl.ha.datapath.replace.remote.nameservice.name", null,
+            "When HDFS is HA enabled and both source and target clusters are configured with same nameservice name, " +
+                    "use this config to provide a unique logical name for nameservice on the remote cluster (should " +
+                    "be different from nameservice name on the local cluster)"),
     REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/${system:user.name}/repl/functions/",
         "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
     REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 956e6ca..0cc7282 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -77,6 +77,7 @@
 import static org.junit.Assert.assertTrue;
 
 public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
+  private static final String NS_REMOTE = "nsRemote";
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     HashMap<String, String> overrides = new HashMap<>();
@@ -1605,6 +1606,106 @@
   }
 
   @Test
+  public void testHdfsNameserviceLazyCopy() throws Throwable {
+    List<String> clause = getHdfsNameserviceClause();
+    clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='true'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i int)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .run("create external table ext_table1 (id int)")
+            .run("insert into ext_table1 values (3)")
+            .run("insert into ext_table1 values (4)")
+            .dump(primaryDbName, clause);
+
+    try{
+      replica.load(replicatedDbName, primaryDbName, clause);
+      Assert.fail("Expected the UnknownHostException to be thrown.");
+    } catch (IllegalArgumentException ex) {
+      assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+    }
+  }
+
+  @Test
+  public void testHdfsNameserviceLazyCopyIncr() throws Throwable {
+    List<String> clause = getHdfsNameserviceClause();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 values (3)")
+            .dump(primaryDbName, clause);
+    try{
+      replica.load(replicatedDbName, primaryDbName, clause);
+      Assert.fail("Expected the UnknownHostException to be thrown.");
+    } catch (IllegalArgumentException ex) {
+      assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+    }
+  }
+
+  @Test
+  public void testHdfsNameserviceWithDataCopy() throws Throwable {
+    List<String> clause = getHdfsNameserviceClause();
+    //NS replacement parameters has no effect when data is also copied to staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 values (3)")
+            .dump(primaryDbName, clause);
+    replica.load(replicatedDbName, primaryDbName, clause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2", "3"});
+  }
+
+  @Test
+  public void testCreateFunctionWithHdfsNameservice() throws Throwable {
+    Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+    Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+    setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+    List<String> clause = getHdfsNameserviceClause();
+    primary.run("CREATE FUNCTION " + primaryDbName
+            + ".idFunc1 as 'IdentityStringUDF' "
+            + "using jar  '" + identityUdf1HdfsPath.toString() + "'");
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, clause);
+    try{
+      replica.load(replicatedDbName, primaryDbName, clause);
+      Assert.fail("Expected the UnknownHostException to be thrown.");
+    } catch (IllegalArgumentException ex) {
+      assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+    }
+  }
+
+  @Test
   public void testRangerReplicationRetryExhausted() throws Throwable {
     List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
       "'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION
@@ -1963,4 +2064,12 @@
     FileSystem fs = primary.miniDFSCluster.getFileSystem();
     fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
   }
+
+  private List<String> getHdfsNameserviceClause() {
+    List<String> withClause = new ArrayList<>();
+    withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE.varname + "'='true'");
+    withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname + "'='"
+            + NS_REMOTE + "'");
+    return withClause;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index f56ca3d..45d67c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -118,7 +118,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics ", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index 9ae4a73..3a65847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -102,7 +102,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics ", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index 29dcbbb..eb2af1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -148,7 +148,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics ", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index d09aaac..0049f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -166,7 +166,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics ", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 21dafb5..cfae99f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -209,7 +209,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 66bb3da..6265df1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -147,7 +147,7 @@
         } else {
           work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
         }
-      } catch (SemanticException ex) {
+      } catch (Exception ex) {
         LOG.error("Failed to collect Metrics ", ex);
       }
       return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index c09d7f7..c7656bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -23,9 +23,11 @@
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -49,6 +51,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -72,6 +76,40 @@
     writeOutput(listValues, outputFile, hiveConf, false);
   }
 
+  /**
+   * Given a ReplChangeManger's encoded uri, it replaces the nameservice and returns the modified encoded uri.
+   */
+  public static String replaceNameserviceInEncodedURI(String cmEncodedURI, HiveConf hiveConf) throws SemanticException {
+    String newNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname);
+    if (StringUtils.isEmpty(newNS)) {
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+              .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid "
+                      + newNS == null ? "null" : newNS, ReplUtils.REPL_HIVE_SERVICE));
+    }
+    String[] decodedURISplits = ReplChangeManager.decodeFileUri(cmEncodedURI);
+    // replace both data path and repl cm root path and construct new URI. Checksum and subDir will be same as old.
+    String modifiedURI =  ReplChangeManager.encodeFileUri(replaceHost(decodedURISplits[0], newNS), decodedURISplits[1],
+                                                          replaceHost(decodedURISplits[2], newNS), decodedURISplits[3]);
+    LOG.debug("Modified encoded uri {}, to {} ", cmEncodedURI, modifiedURI);
+    return modifiedURI;
+  }
+
+  private static String replaceHost(String originalURIStr, String newHost) throws SemanticException {
+    if (StringUtils.isEmpty(originalURIStr)) {
+      return originalURIStr;
+    }
+    URI origUri = URI.create(originalURIStr);
+    try {
+      return new URI(origUri.getScheme(),
+              origUri.getUserInfo(), newHost, origUri.getPort(),
+              origUri.getPath(), origUri.getQuery(),
+              origUri.getFragment()).toString();
+    } catch (URISyntaxException ex) {
+      throw new SemanticException(ex);
+    }
+  }
+
+
   public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf, boolean update)
           throws SemanticException {
     Retryable retryable = Retryable.builder()
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 1aff738..6a59b2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.slf4j.Logger;
@@ -97,11 +98,15 @@
   }
 
   protected void writeEncodedDumpFiles(Context withinContext, Iterable<String> files, Path dataPath)
-          throws IOException {
+          throws IOException, SemanticException {
+    boolean replaceNSInHACase = withinContext.hiveConf.getBoolVar(
+            HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE);
     // encoded filename/checksum of files, write into _files
     try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
       for (String file : files) {
-        fileListWriter.write(file);
+        String encodedFilePath = replaceNSInHACase ? Utils.replaceNameserviceInEncodedURI(file, withinContext.hiveConf):
+                file;
+        fileListWriter.write(encodedFilePath);
         fileListWriter.newLine();
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 265fefe..4b29ff5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -59,7 +59,7 @@
 
   private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
                               Path dataPath)
-          throws IOException, LoginException, MetaException, HiveFatalException {
+          throws IOException, LoginException, MetaException, HiveFatalException, SemanticException {
     boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
     if (copyAtLoad) {
       // encoded filename/checksum of files, write into _files
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index caa089f..a83f8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import java.io.BufferedWriter;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -32,12 +31,9 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.utils.Retry;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -48,7 +44,6 @@
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,7 +181,7 @@
    * The data export here is a list of files either in table/partition that are written to the _files
    * in the exportRootDataDir provided.
    */
-   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+   void exportFilesAsList() throws SemanticException {
     if (dataPathList.isEmpty()) {
       return;
     }
@@ -222,7 +217,7 @@
   }
 
   private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
-          throws IOException {
+          throws IOException, SemanticException {
     for (FileStatus fileStatus : fileStatuses) {
       if (fileStatus.isDirectory()) {
         // Write files inside the sub-directory.
@@ -252,10 +247,14 @@
   }
 
   private String encodedUri(FileStatus fileStatus, String encodedSubDir)
-          throws IOException {
+          throws IOException, SemanticException {
     ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
     Path currentDataFilePath = fileStatus.getPath();
     String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
-    return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
+    String cmEncodedURIL = replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
+      return org.apache.hadoop.hive.ql.parse.repl.dump.Utils.replaceNameserviceInEncodedURI(cmEncodedURIL, hiveConf);
+    }
+    return cmEncodedURIL;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index 2e87267..b1cfcbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
@@ -67,6 +68,9 @@
           String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
                   .encodeFileUri(qualifiedUri.toString(), checkSum, null);
           if (copyAtLoad) {
+            if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
+              encodedSrcUri = Utils.replaceNameserviceInEncodedURI(encodedSrcUri, hiveConf);
+            }
             resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
           } else {
             Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 67c74d0..338e351 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -411,11 +411,18 @@
     if (instance == null) {
       throw new IllegalStateException("Uninitialized ReplChangeManager instance.");
     }
+    Path cmRootPath = getCmRoot(new Path(fileUriStr));
+    String cmRoot = null;
+    if (cmRootPath != null) {
+      cmRoot = FileUtils.makeQualified(cmRootPath, conf).toString();
+    }
+    return ReplChangeManager.encodeFileUri(fileUriStr, fileChecksum, cmRoot, encodedSubDir);
+  }
+
+  public static String encodeFileUri(String fileUriStr, String fileChecksum, String cmRoot, String encodedSubDir) {
     String encodedUri = fileUriStr;
-    Path cmRoot = getCmRoot(new Path(fileUriStr));
     if ((fileChecksum != null) && (cmRoot != null)) {
-      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum
-              + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf);
+      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + URI_FRAGMENT_SEPARATOR + cmRoot;
     } else {
       encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR;
     }