blob: f3613f1ae3a1a74db19a0a5c12b571d01e11217f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <sstream> // IWYU pragma: keep
#include <string>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include <boost/filesystem.hpp> // NOLINT
#include "arrow/buffer.h"
#include "arrow/io/hdfs.h"
#include "arrow/io/hdfs_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
namespace arrow {
namespace io {
std::vector<uint8_t> RandomData(int64_t size) {
std::vector<uint8_t> buffer(size);
random_bytes(size, 0, buffer.data());
return buffer;
}
struct JNIDriver {
static HdfsDriver type;
};
struct PivotalDriver {
static HdfsDriver type;
};
template <typename DRIVER>
class TestHadoopFileSystem : public ::testing::Test {
public:
Status MakeScratchDir() {
if (client_->Exists(scratch_dir_)) {
RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
}
return client_->MakeDirectory(scratch_dir_);
}
Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
bool append = false, int buffer_size = 0, int16_t replication = 0,
int default_block_size = 0) {
std::shared_ptr<HdfsOutputStream> file;
RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication,
default_block_size, &file));
RETURN_NOT_OK(file->Write(buffer, size));
RETURN_NOT_OK(file->Close());
return Status::OK();
}
std::string ScratchPath(const std::string& name) {
std::stringstream ss;
ss << scratch_dir_ << "/" << name;
return ss.str();
}
std::string HdfsAbsPath(const std::string& relpath) {
std::stringstream ss;
ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
return ss.str();
}
// Set up shared state between unit tests
void SetUp() {
internal::LibHdfsShim* driver_shim;
client_ = nullptr;
scratch_dir_ =
boost::filesystem::unique_path(boost::filesystem::temp_directory_path() /
"arrow-hdfs/scratch-%%%%")
.string();
loaded_driver_ = false;
Status msg;
if (DRIVER::type == HdfsDriver::LIBHDFS) {
msg = ConnectLibHdfs(&driver_shim);
if (!msg.ok()) {
std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
return;
}
} else {
msg = ConnectLibHdfs3(&driver_shim);
if (!msg.ok()) {
std::cout << "Loading libhdfs3 failed, skipping tests gracefully. "
<< msg.ToString() << std::endl;
return;
}
}
loaded_driver_ = true;
const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
const char* user = std::getenv("ARROW_HDFS_TEST_USER");
ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER";
conf_.host = host == nullptr ? "localhost" : host;
conf_.user = user;
conf_.port = port == nullptr ? 20500 : atoi(port);
conf_.driver = DRIVER::type;
ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_));
}
void TearDown() {
if (client_) {
if (client_->Exists(scratch_dir_)) {
ARROW_EXPECT_OK(client_->Delete(scratch_dir_, true));
}
ARROW_EXPECT_OK(client_->Disconnect());
}
}
HdfsConnectionConfig conf_;
bool loaded_driver_;
// Resources shared amongst unit tests
std::string scratch_dir_;
std::shared_ptr<HadoopFileSystem> client_;
};
template <>
std::string TestHadoopFileSystem<PivotalDriver>::HdfsAbsPath(const std::string& relpath) {
std::stringstream ss;
ss << relpath;
return ss.str();
}
#define SKIP_IF_NO_DRIVER() \
if (!this->loaded_driver_) { \
std::cout << "Driver not loaded, skipping" << std::endl; \
return; \
}
HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;
typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes;
TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes);
TYPED_TEST(TestHadoopFileSystem, ConnectsAgain) {
SKIP_IF_NO_DRIVER();
std::shared_ptr<HadoopFileSystem> client;
ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client));
ASSERT_OK(client->Disconnect());
}
TYPED_TEST(TestHadoopFileSystem, MultipleClients) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
std::shared_ptr<HadoopFileSystem> client1;
std::shared_ptr<HadoopFileSystem> client2;
ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1));
ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2));
ASSERT_OK(client1->Disconnect());
// client2 continues to function after equivalent client1 has shutdown
std::vector<HdfsPathInfo> listing;
ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing));
ASSERT_OK(client2->Disconnect());
}
TYPED_TEST(TestHadoopFileSystem, MakeDirectory) {
SKIP_IF_NO_DRIVER();
std::string path = this->ScratchPath("create-directory");
if (this->client_->Exists(path)) {
ASSERT_OK(this->client_->Delete(path, true));
}
ASSERT_OK(this->client_->MakeDirectory(path));
ASSERT_TRUE(this->client_->Exists(path));
std::vector<HdfsPathInfo> listing;
ARROW_EXPECT_OK(this->client_->ListDirectory(path, &listing));
ASSERT_EQ(0, listing.size());
ARROW_EXPECT_OK(this->client_->Delete(path, true));
ASSERT_FALSE(this->client_->Exists(path));
ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing));
}
TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) {
SKIP_IF_NO_DRIVER();
// Who knows what is actually in your DFS cluster, but expect it to have
// positive used bytes and capacity
int64_t nbytes = 0;
ASSERT_OK(this->client_->GetCapacity(&nbytes));
ASSERT_LT(0, nbytes);
ASSERT_OK(this->client_->GetUsed(&nbytes));
ASSERT_LT(0, nbytes);
}
TYPED_TEST(TestHadoopFileSystem, GetPathInfo) {
SKIP_IF_NO_DRIVER();
HdfsPathInfo info;
ASSERT_OK(this->MakeScratchDir());
// Directory info
ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info));
ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name);
ASSERT_EQ(this->conf_.user, info.owner);
// TODO(wesm): test group, other attrs
auto path = this->ScratchPath("test-file");
const int size = 100;
std::vector<uint8_t> buffer = RandomData(size);
ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
ASSERT_OK(this->client_->GetPathInfo(path, &info));
ASSERT_EQ(ObjectType::FILE, info.kind);
ASSERT_EQ(this->HdfsAbsPath(path), info.name);
ASSERT_EQ(this->conf_.user, info.owner);
ASSERT_EQ(size, info.size);
}
TYPED_TEST(TestHadoopFileSystem, GetPathInfoNotExist) {
// ARROW-2919: Test that the error message is reasonable
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto path = this->ScratchPath("path-does-not-exist");
HdfsPathInfo info;
Status s = this->client_->GetPathInfo(path, &info);
ASSERT_TRUE(s.IsIOError());
const std::string error_message = s.ToString();
// Check that the file path is found in the error message
ASSERT_LT(error_message.find(path), std::string::npos);
}
TYPED_TEST(TestHadoopFileSystem, AppendToFile) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto path = this->ScratchPath("test-file");
const int size = 100;
std::vector<uint8_t> buffer = RandomData(size);
ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
// now append
ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true));
HdfsPathInfo info;
ASSERT_OK(this->client_->GetPathInfo(path, &info));
ASSERT_EQ(size * 2, info.size);
}
TYPED_TEST(TestHadoopFileSystem, ListDirectory) {
SKIP_IF_NO_DRIVER();
const int size = 100;
std::vector<uint8_t> data = RandomData(size);
auto p1 = this->ScratchPath("test-file-1");
auto p2 = this->ScratchPath("test-file-2");
auto d1 = this->ScratchPath("test-dir-1");
ASSERT_OK(this->MakeScratchDir());
ASSERT_OK(this->WriteDummyFile(p1, data.data(), size));
ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2));
ASSERT_OK(this->client_->MakeDirectory(d1));
std::vector<HdfsPathInfo> listing;
ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
// Do it again, appends!
ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
ASSERT_EQ(6, static_cast<int>(listing.size()));
// Argh, well, shouldn't expect the listing to be in any particular order
for (size_t i = 0; i < listing.size(); ++i) {
const HdfsPathInfo& info = listing[i];
if (info.name == this->HdfsAbsPath(p1)) {
ASSERT_EQ(ObjectType::FILE, info.kind);
ASSERT_EQ(size, info.size);
} else if (info.name == this->HdfsAbsPath(p2)) {
ASSERT_EQ(ObjectType::FILE, info.kind);
ASSERT_EQ(size / 2, info.size);
} else if (info.name == this->HdfsAbsPath(d1)) {
ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
} else {
FAIL() << "Unexpected path: " << info.name;
}
}
}
TYPED_TEST(TestHadoopFileSystem, ReadableMethods) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto path = this->ScratchPath("test-file");
const int size = 100;
std::vector<uint8_t> data = RandomData(size);
ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
std::shared_ptr<HdfsReadableFile> file;
ASSERT_OK(this->client_->OpenReadable(path, &file));
// Test GetSize -- move this into its own unit test if ever needed
ASSERT_OK_AND_EQ(size, file->GetSize());
uint8_t buffer[50];
ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
// EOF
ASSERT_OK_AND_EQ(0, file->Read(1, buffer));
// ReadAt to EOF
ASSERT_OK_AND_EQ(40, file->ReadAt(60, 100, buffer));
ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, 40));
// Seek, Tell
ASSERT_OK(file->Seek(60));
ASSERT_OK_AND_EQ(60, file->Tell());
}
TYPED_TEST(TestHadoopFileSystem, LargeFile) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto path = this->ScratchPath("test-large-file");
const int size = 1000000;
std::vector<uint8_t> data = RandomData(size);
ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
std::shared_ptr<HdfsReadableFile> file;
ASSERT_OK(this->client_->OpenReadable(path, &file));
ASSERT_FALSE(file->closed());
std::shared_ptr<Buffer> buffer;
ASSERT_OK(AllocateBuffer(nullptr, size, &buffer));
ASSERT_OK_AND_EQ(size, file->Read(size, buffer->mutable_data()));
ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
// explicit buffer size
std::shared_ptr<HdfsReadableFile> file2;
ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
std::shared_ptr<Buffer> buffer2;
ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2));
ASSERT_OK_AND_EQ(size, file2->Read(size, buffer2->mutable_data()));
ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
}
TYPED_TEST(TestHadoopFileSystem, RenameFile) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto src_path = this->ScratchPath("src-file");
auto dst_path = this->ScratchPath("dst-file");
const int size = 100;
std::vector<uint8_t> data = RandomData(size);
ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size));
ASSERT_OK(this->client_->Rename(src_path, dst_path));
ASSERT_FALSE(this->client_->Exists(src_path));
ASSERT_TRUE(this->client_->Exists(dst_path));
}
TYPED_TEST(TestHadoopFileSystem, ChmodChown) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto path = this->ScratchPath("path-to-chmod");
int16_t mode = 0755;
const int size = 100;
std::vector<uint8_t> data = RandomData(size);
ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
HdfsPathInfo info;
ASSERT_OK(this->client_->Chmod(path, mode));
ASSERT_OK(this->client_->GetPathInfo(path, &info));
ASSERT_EQ(mode, info.permissions);
std::string owner = "hadoop";
std::string group = "hadoop";
ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str()));
ASSERT_OK(this->client_->GetPathInfo(path, &info));
ASSERT_EQ("hadoop", info.owner);
ASSERT_EQ("hadoop", info.group);
}
TYPED_TEST(TestHadoopFileSystem, ThreadSafety) {
SKIP_IF_NO_DRIVER();
ASSERT_OK(this->MakeScratchDir());
auto src_path = this->ScratchPath("threadsafety");
std::string data = "foobar";
ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
static_cast<int64_t>(data.size())));
std::shared_ptr<HdfsReadableFile> file;
ASSERT_OK(this->client_->OpenReadable(src_path, &file));
std::atomic<int> correct_count(0);
int niter = 1000;
auto ReadData = [&file, &correct_count, &data, &niter]() {
for (int i = 0; i < niter; ++i) {
std::shared_ptr<Buffer> buffer;
if (i % 2 == 0) {
ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(3, 3));
if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) {
correct_count += 1;
}
} else {
ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(0, 4));
if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) {
correct_count += 1;
}
}
}
};
std::thread thread1(ReadData);
std::thread thread2(ReadData);
std::thread thread3(ReadData);
std::thread thread4(ReadData);
thread1.join();
thread2.join();
thread3.join();
thread4.join();
ASSERT_EQ(niter * 4, correct_count);
}
} // namespace io
} // namespace arrow