HIVE-24187:Handle _files creation for HA config with same nameservice name on source and destination
Export( Pravin Kumar Sinha, reviewed by Aasha Medhi)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6138b9c..04325aa 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -522,6 +522,14 @@
REPLCMINTERVAL("hive.repl.cm.interval","3600s",
new TimeValidator(TimeUnit.SECONDS),
"Inteval for cmroot cleanup thread."),
+ REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE("hive.repl.ha.datapath.replace.remote.nameservice", false,
+ "When HDFS is HA enabled and both source and target clusters are configured with same nameservice name," +
+ "enable this flag and provide a new unique logical name for representing the remote cluster " +
+ "nameservice using config " + "'hive.repl.ha.datapath.replace.remote.nameservice.name'."),
+ REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME("hive.repl.ha.datapath.replace.remote.nameservice.name", null,
+ "When HDFS is HA enabled and both source and target clusters are configured with same nameservice name, " +
+ "use this config to provide a unique logical name for nameservice on the remote cluster (should " +
+ "be different from nameservice name on the local cluster)"),
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/${system:user.name}/repl/functions/",
"Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 956e6ca..0cc7282 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -77,6 +77,7 @@
import static org.junit.Assert.assertTrue;
public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
+ private static final String NS_REMOTE = "nsRemote";
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
@@ -1605,6 +1606,106 @@
}
@Test
+ public void testHdfsNameserviceLazyCopy() throws Throwable {
+ List<String> clause = getHdfsNameserviceClause();
+ clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='true'");
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i int)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .run("create external table ext_table1 (id int)")
+ .run("insert into ext_table1 values (3)")
+ .run("insert into ext_table1 values (4)")
+ .dump(primaryDbName, clause);
+
+ try{
+ replica.load(replicatedDbName, primaryDbName, clause);
+ Assert.fail("Expected the UnknownHostException to be thrown.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+ }
+ }
+
+ @Test
+ public void testHdfsNameserviceLazyCopyIncr() throws Throwable {
+ List<String> clause = getHdfsNameserviceClause();
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName);
+
+ replica.load(replicatedDbName, primaryDbName, clause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[] {"1", "2"});
+
+ primary.run("use " + primaryDbName)
+ .run("insert into table1 values (3)")
+ .dump(primaryDbName, clause);
+ try{
+ replica.load(replicatedDbName, primaryDbName, clause);
+ Assert.fail("Expected the UnknownHostException to be thrown.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+ }
+ }
+
+ @Test
+ public void testHdfsNameserviceWithDataCopy() throws Throwable {
+ List<String> clause = getHdfsNameserviceClause();
+ //NS replacement parameters has no effect when data is also copied to staging
+ clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'");
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, clause);
+ replica.load(replicatedDbName, primaryDbName, clause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[] {"1", "2"});
+
+ primary.run("use " + primaryDbName)
+ .run("insert into table1 values (3)")
+ .dump(primaryDbName, clause);
+ replica.load(replicatedDbName, primaryDbName, clause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[] {"1", "2", "3"});
+ }
+
+ @Test
+ public void testCreateFunctionWithHdfsNameservice() throws Throwable {
+ Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+ Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+ setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+ List<String> clause = getHdfsNameserviceClause();
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".idFunc1 as 'IdentityStringUDF' "
+ + "using jar '" + identityUdf1HdfsPath.toString() + "'");
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, clause);
+ try{
+ replica.load(replicatedDbName, primaryDbName, clause);
+ Assert.fail("Expected the UnknownHostException to be thrown.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote"));
+ }
+ }
+
+ @Test
public void testRangerReplicationRetryExhausted() throws Throwable {
List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
"'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION
@@ -1963,4 +2064,12 @@
FileSystem fs = primary.miniDFSCluster.getFileSystem();
fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
}
+
+ private List<String> getHdfsNameserviceClause() {
+ List<String> withClause = new ArrayList<>();
+ withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE.varname + "'='true'");
+ withClause.add("'" + HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname + "'='"
+ + NS_REMOTE + "'");
+ return withClause;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index f56ca3d..45d67c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -118,7 +118,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index 9ae4a73..3a65847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -102,7 +102,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index 29dcbbb..eb2af1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -148,7 +148,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index d09aaac..0049f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -166,7 +166,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 21dafb5..cfae99f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -209,7 +209,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 66bb3da..6265df1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -147,7 +147,7 @@
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
- } catch (SemanticException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index c09d7f7..c7656bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -23,9 +23,11 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -49,6 +51,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +76,40 @@
writeOutput(listValues, outputFile, hiveConf, false);
}
+ /**
+ * Given a ReplChangeManger's encoded uri, it replaces the nameservice and returns the modified encoded uri.
+ */
+ public static String replaceNameserviceInEncodedURI(String cmEncodedURI, HiveConf hiveConf) throws SemanticException {
+ String newNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname);
+ if (StringUtils.isEmpty(newNS)) {
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+ .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid "
+ + newNS == null ? "null" : newNS, ReplUtils.REPL_HIVE_SERVICE));
+ }
+ String[] decodedURISplits = ReplChangeManager.decodeFileUri(cmEncodedURI);
+ // replace both data path and repl cm root path and construct new URI. Checksum and subDir will be same as old.
+ String modifiedURI = ReplChangeManager.encodeFileUri(replaceHost(decodedURISplits[0], newNS), decodedURISplits[1],
+ replaceHost(decodedURISplits[2], newNS), decodedURISplits[3]);
+ LOG.debug("Modified encoded uri {}, to {} ", cmEncodedURI, modifiedURI);
+ return modifiedURI;
+ }
+
+ private static String replaceHost(String originalURIStr, String newHost) throws SemanticException {
+ if (StringUtils.isEmpty(originalURIStr)) {
+ return originalURIStr;
+ }
+ URI origUri = URI.create(originalURIStr);
+ try {
+ return new URI(origUri.getScheme(),
+ origUri.getUserInfo(), newHost, origUri.getPort(),
+ origUri.getPath(), origUri.getQuery(),
+ origUri.getFragment()).toString();
+ } catch (URISyntaxException ex) {
+ throw new SemanticException(ex);
+ }
+ }
+
+
public static void writeOutput(List<List<String>> listValues, Path outputFile, HiveConf hiveConf, boolean update)
throws SemanticException {
Retryable retryable = Retryable.builder()
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 1aff738..6a59b2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.slf4j.Logger;
@@ -97,11 +98,15 @@
}
protected void writeEncodedDumpFiles(Context withinContext, Iterable<String> files, Path dataPath)
- throws IOException {
+ throws IOException, SemanticException {
+ boolean replaceNSInHACase = withinContext.hiveConf.getBoolVar(
+ HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE);
// encoded filename/checksum of files, write into _files
try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
for (String file : files) {
- fileListWriter.write(file);
+ String encodedFilePath = replaceNSInHACase ? Utils.replaceNameserviceInEncodedURI(file, withinContext.hiveConf):
+ file;
+ fileListWriter.write(encodedFilePath);
fileListWriter.newLine();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 265fefe..4b29ff5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -59,7 +59,7 @@
private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
Path dataPath)
- throws IOException, LoginException, MetaException, HiveFatalException {
+ throws IOException, LoginException, MetaException, HiveFatalException, SemanticException {
boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (copyAtLoad) {
// encoded filename/checksum of files, write into _files
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index caa089f..a83f8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.io;
import java.io.BufferedWriter;
-import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -32,12 +31,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.utils.Retry;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -48,7 +44,6 @@
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,7 +181,7 @@
* The data export here is a list of files either in table/partition that are written to the _files
* in the exportRootDataDir provided.
*/
- void exportFilesAsList() throws SemanticException, IOException, LoginException {
+ void exportFilesAsList() throws SemanticException {
if (dataPathList.isEmpty()) {
return;
}
@@ -222,7 +217,7 @@
}
private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
- throws IOException {
+ throws IOException, SemanticException {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
// Write files inside the sub-directory.
@@ -252,10 +247,14 @@
}
private String encodedUri(FileStatus fileStatus, String encodedSubDir)
- throws IOException {
+ throws IOException, SemanticException {
ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
Path currentDataFilePath = fileStatus.getPath();
String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
- return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
+ String cmEncodedURIL = replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
+ return org.apache.hadoop.hive.ql.parse.repl.dump.Utils.replaceNameserviceInEncodedURI(cmEncodedURIL, hiveConf);
+ }
+ return cmEncodedURIL;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index 2e87267..b1cfcbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
@@ -67,6 +68,9 @@
String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
.encodeFileUri(qualifiedUri.toString(), checkSum, null);
if (copyAtLoad) {
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE)) {
+ encodedSrcUri = Utils.replaceNameserviceInEncodedURI(encodedSrcUri, hiveConf);
+ }
resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
} else {
Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 67c74d0..338e351 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -411,11 +411,18 @@
if (instance == null) {
throw new IllegalStateException("Uninitialized ReplChangeManager instance.");
}
+ Path cmRootPath = getCmRoot(new Path(fileUriStr));
+ String cmRoot = null;
+ if (cmRootPath != null) {
+ cmRoot = FileUtils.makeQualified(cmRootPath, conf).toString();
+ }
+ return ReplChangeManager.encodeFileUri(fileUriStr, fileChecksum, cmRoot, encodedSubDir);
+ }
+
+ public static String encodeFileUri(String fileUriStr, String fileChecksum, String cmRoot, String encodedSubDir) {
String encodedUri = fileUriStr;
- Path cmRoot = getCmRoot(new Path(fileUriStr));
if ((fileChecksum != null) && (cmRoot != null)) {
- encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum
- + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf);
+ encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + URI_FRAGMENT_SEPARATOR + cmRoot;
} else {
encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR;
}