IMPALA-11736: Copy data between ofs buckets
When Impala moves a file - such as for a LOAD DATA statement - it checks
whether the source and destination are the same filesystem. If the same,
it uses hdfsRename, otherwise it uses hdfsMove to move between
filesystems.
Ozone's ofs protocol supports referencing multiple buckets by path in
the same filesystem, but does not support rename between them. All other
filesystems Impala supports include the bucket name (if they use that
concept) in the authority. This patch updates the function used to
determine whether two paths are in the same filesystem to also check
that they're in the same bucket as a requirement for hdfsRename.
Testing: ran test suite with Ozone.
Change-Id: Ic61f01672fa605fec0377885b13a1621573e424e
Reviewed-on: http://gerrit.cloudera.org:8080/19262
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index b3cb806..94c0ba5 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -297,7 +297,9 @@
dir_deletion_ops.Add(DELETE, move.first);
} else {
VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
- if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+ // Files can't be renamed across different filesystems (considering both scheme and
+ // authority) or across different Ozone buckets/volumes.
+ if (FilesystemsAndBucketsMatch(move.first.c_str(), move.second.c_str())) {
move_ops.Add(RENAME, move.first, move.second);
} else {
move_ops.Add(MOVE, move.first, move.second);
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index de3bcc0..31d9016 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -244,7 +244,7 @@
friend class DataStreamTest;
// For access to InitHadoopConfig().
- FRIEND_TEST(HdfsUtilTest, CheckFilesystemsMatch);
+ FRIEND_TEST(HdfsUtilTest, CheckFilesystemsAndBucketsMatch);
static ExecEnv* exec_env_;
bool is_fe_tests_ = false;
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
index 748ab23..220f035 100644
--- a/be/src/util/hdfs-util-test.cc
+++ b/be/src/util/hdfs-util-test.cc
@@ -26,47 +26,72 @@
namespace impala {
-TEST(HdfsUtilTest, CheckFilesystemsMatch) {
+TEST(HdfsUtilTest, CheckFilesystemsAndBucketsMatch) {
// We do this to retrieve the default FS from the frontend without starting the rest
// of the ExecEnv services.
ExecEnv exec_env;
ASSERT_OK(exec_env.InitHadoopConfig());
// Tests with both paths qualified.
- EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "s3a://dummybucket/temp_dir_2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "s3a://dummybucket_2/temp_dir_2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "hdfs://namenode/temp_dir2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://namenode/temp_dir/temp_path",
- "hdfs://namenode_2/temp_dir2/temp_path_2"));
- EXPECT_TRUE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
- "hdfs://namenode:9999/temp_dir2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
- "hdfs://namenode:8888/temp_dir2/temp_path_2"));
- EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file:///path/to/dir/filename.parq"));
- EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file:/path_2/to/dir/filename.parq"));
- EXPECT_TRUE(FilesystemsMatch("file:///path/to/dir/filename.parq",
- "file:/path_2/to/dir/filename.parq"));
- EXPECT_FALSE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file2://path/to/dir/filename.parq"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://", "s3a://dummybucket/temp_dir/temp_path"));
- EXPECT_TRUE(FilesystemsMatch("hdfs://namenode", "hdfs://namenode/"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+ "s3a://dummybucket/temp_dir_2/temp_path_2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+ "s3a://dummybucket_2/temp_dir_2/temp_path_2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+ "hdfs://namenode/temp_dir2/temp_path_2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://namenode/temp_dir/temp_path",
+ "hdfs://namenode_2/temp_dir2/temp_path_2"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+ "hdfs://namenode:9999/temp_dir2/temp_path_2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+ "hdfs://namenode:8888/temp_dir2/temp_path_2"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+ "file:///path/to/dir/filename.parq"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+ "file:/path_2/to/dir/filename.parq"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:///path/to/dir/filename.parq",
+ "file:/path_2/to/dir/filename.parq"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+ "file2://path/to/dir/filename.parq"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://",
+ "s3a://dummybucket/temp_dir/temp_path"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("hdfs://namenode", "hdfs://namenode/"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("o3fs://bucket.volume.namenode:9862/path1",
+ "o3fs://bucket.volume.namenode:9862/path2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("o3fs://bucket1.volume.namenode:9862/path1",
+ "o3fs://bucket2.volume.namenode:9862/path2"));
+
+ // Provide unqualified paths for testing. With ofs, unqualified paths still include the
+ // volume and bucket name which causes tests to fail. Add a prefix so they pass.
+ std::string relpath1 = "tempdir/temppath";
+ std::string relpath2 = "tempdir2/temppath2";
+ std::string default_fs = exec_env.default_fs();
+ if (default_fs.rfind(FILESYS_PREFIX_OFS, 0) == 0) {
+ relpath1 = "volume/bucket/" + relpath1;
+ relpath2 = "volume/bucket/" + relpath2;
+ default_fs += "/volume/bucket";
+ }
// Tests with both paths paths unqualified.
- EXPECT_TRUE(FilesystemsMatch("tempdir/temppath", "tempdir2/temppath2"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(relpath1.c_str(), relpath2.c_str()));
// Tests with one path qualified and the other unqualified.
- const char* default_fs = exec_env.default_fs().c_str();
- EXPECT_TRUE(FilesystemsMatch(default_fs, "temp_dir/temp_path"));
- EXPECT_TRUE(FilesystemsMatch("temp_dir/temp_path", default_fs));
- EXPECT_FALSE(FilesystemsMatch("badscheme://namenode/temp_dir/temp_path",
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(default_fs.c_str(), relpath1.c_str()));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(relpath1.c_str(), default_fs.c_str()));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("badscheme://namenode/temp_dir/temp_path",
"temp_dir/temp_path"));
- EXPECT_FALSE(FilesystemsMatch("badscheme://namenode:1234/temp_dir/temp_path",
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("badscheme://namenode:1234/temp_dir/temp_path",
"temp_dir/temp_path"));
+
+ // Tests for ofs with volume/bucket
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume/bucket/path1",
+ "ofs://namenode:9862/volume/bucket/path2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume/bucket1/path1",
+ "ofs://namenode:9862/volume/bucket2/path2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume1/bucket/path1",
+ "ofs://namenode:9862/volume2/bucket/path2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume1/bucket1/path1",
+ "ofs://namenode:9862/volume2/bucket2/path2"));
}
TEST(HdfsUtilTest, CheckGetBaseName) {
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 1d6db5f..46ab67a 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -86,7 +86,7 @@
return Status::OK();
}
-bool IsSpecificPath(
+static bool IsSpecificPath(
const char* path, const char* specific_prefix, bool check_default_fs) {
size_t prefix_len = strlen(specific_prefix);
if (check_default_fs && strstr(path, ":/") == NULL) {
@@ -154,7 +154,7 @@
return after_authority - path;
}
-bool FilesystemsMatch(const char* path_a, const char* path_b) {
+static bool FilesystemsMatch(const char* path_a, const char* path_b) {
int fs_a_name_length = GetFilesystemNameLength(path_a);
int fs_b_name_length = GetFilesystemNameLength(path_b);
@@ -179,6 +179,39 @@
return strncmp(path_a, path_b, fs_a_name_length) == 0;
}
+static int VolumeBucketLength(const char* path) {
+ if (*path == '\0') return 0;
+ const char* afterVolume = strstr(path, "/");
+ if (afterVolume == nullptr) return strlen(path);
+ const char* afterBucket = strstr(afterVolume + 1, "/");
+ if (afterBucket == nullptr) return strlen(path);
+ return afterBucket - path;
+}
+
+static bool OfsBucketsMatch(const char* path_a, const char* path_b) {
+ // Examine only the path elements.
+ path_a = path_a + GetFilesystemNameLength(path_a);
+ path_b = path_b + GetFilesystemNameLength(path_b);
+ // Skip past starting slash for comparison to unqualified paths.
+ if (*path_a == '/') ++path_a;
+ if (*path_b == '/') ++path_b;
+
+ int vba_len = VolumeBucketLength(path_a);
+ int vbb_len = VolumeBucketLength(path_b);
+ if (vba_len != vbb_len) return false;
+ return strncmp(path_a, path_b, vba_len) == 0;
+}
+
+bool FilesystemsAndBucketsMatch(const char* path_a, const char* path_b) {
+ if (!FilesystemsMatch(path_a, path_b)) return false;
+
+ // path_a and path_b are in the same filesystem, so we just need to check one prefix.
+ if (IsSpecificPath(path_a, FILESYS_PREFIX_OFS, true)) {
+ return OfsBucketsMatch(path_a, path_b);
+ }
+ return true;
+}
+
string GetBaseName(const char* path) {
int fs_name_length = GetFilesystemNameLength(path);
if (fs_name_length >= strlen(path)) return ".";
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index a31a76d..cb9629a 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -33,6 +33,7 @@
extern const char* FILESYS_PREFIX_GCS;
extern const char* FILESYS_PREFIX_COS;
extern const char* FILESYS_PREFIX_OZONE;
+extern const char* FILESYS_PREFIX_OFS;
/// Utility function to get error messages from HDFS. This function takes prefix/file and
/// appends errno to it. Note: any stdlib function can reset errno, this should be called
@@ -83,8 +84,10 @@
/// Returns true iff the path refers to a location on an SFS filesystem.
bool IsSFSPath(const char* path, bool check_default_fs = true);
-/// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
-bool FilesystemsMatch(const char* pathA, const char* pathB);
+/// Returns true iff 'pathA' and 'pathB' are on the same filesystem and bucket.
+/// Most filesystems embed bucket in the authority, but Ozone's ofs protocol allows
+/// addressing volume/bucket via the path and does not allow renames across them.
+bool FilesystemsAndBucketsMatch(const char* pathA, const char* pathB);
/// Returns the terminal component of 'path'.
/// E.g. if 'path' is "hdfs://localhost:8020/a/b/c", "c" is returned.
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 2eca3b7..9b95fd6 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -247,6 +247,32 @@
return numFilesMoved;
}
+ // Returns the first two elements (volume, bucket) of the unqualified path.
+ public static String volumeBucketSubstring(Path p) {
+ String path = Path.getPathWithoutSchemeAndAuthority(p).toString();
+ if (path.startsWith("/")) path = path.substring(1);
+ int afterVolume = path.indexOf('/');
+ if (afterVolume == -1) return path;
+ int afterBucket = path.indexOf('/', afterVolume + 1);
+ if (afterBucket == -1) return path;
+ return path.substring(0, afterBucket);
+ }
+
+ /*
+ * Returns true if the source and path are in the same bucket. Ozone's ofs encodes
+ * volume/bucket into the path. All other filesystems make it part of the authority
+ * portion of the URI.
+ */
+ public static boolean isSameBucket(Path source, Path dest) throws IOException {
+ if (!isPathOnFileSystem(source, dest.getFileSystem(CONF))) return false;
+
+ // Return true for anything besides OFS.
+ if (!hasScheme(source, SCHEME_OFS)) return true;
+
+ // Compare (volume, bucket) for source and dest.
+ return volumeBucketSubstring(source).equals(volumeBucketSubstring(dest));
+ }
+
/**
* Relocates the given file to a new location (either another directory or a
* file in the same or different filesystem). The file is generally moved (renamed) to
@@ -261,7 +287,6 @@
public static void relocateFile(Path sourceFile, Path dest,
boolean renameIfAlreadyExists) throws IOException {
FileSystem destFs = dest.getFileSystem(CONF);
- FileSystem sourceFs = sourceFile.getFileSystem(CONF);
Path destFile =
destFs.isDirectory(dest) ? new Path(dest, sourceFile.getName()) : dest;
@@ -272,7 +297,7 @@
destFile = new Path(destDir,
appendToBaseFileName(destFile.getName(), UUID.randomUUID().toString()));
}
- boolean sameFileSystem = isPathOnFileSystem(sourceFile, destFs);
+ boolean sameBucket = isSameBucket(sourceFile, dest);
boolean destIsDfs = isDistributedFileSystem(destFs);
// If the source and the destination are on different file systems, or in different
@@ -282,15 +307,12 @@
arePathsInSameHdfsEncryptionZone(destFs, sourceFile, destFile);
// We can do a rename if the src and dst are in the same encryption zone in the same
// distributed filesystem.
- boolean doRename = destIsDfs && sameFileSystem && sameEncryptionZone;
+ boolean doRename = destIsDfs && sameBucket && sameEncryptionZone;
// Alternatively, we can do a rename if the src and dst are on the same
- // non-distributed filesystem.
- if (!doRename) doRename = !destIsDfs && sameFileSystem;
+ // non-distributed filesystem in the same bucket (if it has that concept).
+ if (!doRename) doRename = !destIsDfs && sameBucket;
if (doRename) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format(
- "Moving '%s' to '%s'", sourceFile.toString(), destFile.toString()));
- }
+ LOG.trace("Moving '{}' to '{}'", sourceFile, destFile);
// Move (rename) the file.
if (!destFs.rename(sourceFile, destFile)) {
throw new IOException(String.format(
@@ -298,24 +320,20 @@
}
return;
}
- if (destIsDfs && sameFileSystem) {
- Preconditions.checkState(!doRename);
- // We must copy rather than move if the source and dest are in different
- // encryption zones. A move would return an error from the NN because a move is a
+ Preconditions.checkState(!doRename);
+ if (destIsDfs && sameBucket) {
+ Preconditions.checkState(!sameEncryptionZone);
+ // We must copy rather than move if the source and dest are in different encryption
+ // zones or buckets. A move would return an error from the NN because a move is a
// metadata-only operation and the files would not be encrypted/decrypted properly
// on the DNs.
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format(
- "Copying source '%s' to '%s' because HDFS encryption zones are different.",
- sourceFile, destFile));
- }
+ LOG.trace(
+ "Copying source '{}' to '{}' because HDFS encryption zones are different.",
+ sourceFile, destFile);
} else {
- Preconditions.checkState(!sameFileSystem);
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Copying '%s' to '%s' between filesystems.",
- sourceFile, destFile));
- }
+ LOG.trace("Copying '{}' to '{}' between filesystems.", sourceFile, destFile);
}
+ FileSystem sourceFs = sourceFile.getFileSystem(CONF);
FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF);
}
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index e3b167b..b73bfef 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -17,6 +17,7 @@
package org.apache.impala.common;
+import org.apache.impala.common.Pair;
import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
import static org.apache.impala.common.FileSystemUtil.SPARK_TEMP_FILE_PREFIX;
import static org.apache.impala.common.FileSystemUtil.isIgnoredDir;
@@ -212,6 +213,23 @@
}
}
+ @Test
+ public void testVolumeBucketSubstring() throws IOException {
+ List<Pair<String, String>> cases = Arrays.asList(
+ Pair.create(mockLocation(FileSystemUtil.SCHEME_OFS), "volume1/bucket2"),
+ Pair.create("ofs://svc1:9876/volume/bucket/file", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/bucket/", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/bucket", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/", "volume"),
+ Pair.create("ofs://svc1:9876/volume", "volume"),
+ Pair.create("ofs://svc1:9876/", "")
+ );
+ for (Pair<String, String> c : cases) {
+ Path p = new Path(c.first);
+ assertEquals(c.second, FileSystemUtil.volumeBucketSubstring(p));
+ }
+ }
+
private boolean testIsInIgnoredDirectory(Path input) {
return testIsInIgnoredDirectory(input, true);
}
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index c9e5372..316d3b2 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -28,7 +28,7 @@
create_uncompressed_text_dimension)
from tests.common.skip import SkipIfLocal
from tests.common.test_vector import ImpalaTestDimension
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
TEST_TBL_PART = "test_load"
TEST_TBL_NOPART = "test_load_nopart"
@@ -37,6 +37,8 @@
MULTIAGG_PATH = '%s/alltypesaggmultifiles/year=2010/month=1/day=1' % WAREHOUSE
HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH),
"{0}/3/_100101.txt".format(STAGING_PATH)]
+# A path outside WAREHOUSE, which will be a different bucket for Ozone/ofs.
+TMP_STAGING_PATH = get_fs_path('/tmp/test_load_staging')
@SkipIfLocal.hdfs_client
class TestLoadData(ImpalaTestSuite):
@@ -108,6 +110,45 @@
@SkipIfLocal.hdfs_client
+class TestLoadDataExternal(ImpalaTestSuite):
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestLoadDataExternal, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+ cls.ImpalaTestMatrix.add_dimension(
+ create_uncompressed_text_dimension(cls.get_workload()))
+
+ def _clean_test_tables(self):
+ self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_NOPART))
+ self.filesystem_client.delete_file_dir(TMP_STAGING_PATH, recursive=True)
+
+ def teardown_method(self, method):
+ self._clean_test_tables()
+
+ def setup_method(self, method):
+ # Defensively clean the data dirs if they exist.
+ self._clean_test_tables()
+
+ self.filesystem_client.make_dir(TMP_STAGING_PATH)
+ self.filesystem_client.copy(ALLTYPES_PATH, "{0}/100101.txt".format(TMP_STAGING_PATH))
+
+ self.client.execute("create table functional.{0} like functional.alltypesnopart"
+ " location '{1}/{0}'".format(TEST_TBL_NOPART, WAREHOUSE))
+
+ def test_load(self, vector):
+ self.execute_query_expect_success(self.client, "load data inpath '{0}/100101.txt'"
+ " into table functional.{1}".format(TMP_STAGING_PATH, TEST_TBL_NOPART))
+ result = self.execute_scalar(
+ "select count(*) from functional.{0}".format(TEST_TBL_NOPART))
+ assert(result == '310')
+
+
+@SkipIfLocal.hdfs_client
class TestAsyncLoadData(ImpalaTestSuite):
@classmethod