| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "common/libhdfs_events_impl.h" |
| #include "common/util.h" |
| #include "fs/filesystem.h" |
| #include "fs/filehandle.h" |
| #include "fs/bad_datanode_tracker.h" |
| #include "reader/block_reader.h" |
| |
| #include <gtest/gtest.h> |
| #include <gmock/gmock.h> |
| #include <gmock/gmock-spec-builders.h> |
| #include <gmock/gmock-generated-actions.h> |
| |
| #include <boost/asio/buffer.hpp> |
| #include <boost/asio/error.hpp> |
| |
| |
| |
| using hadoop::common::TokenProto; |
| using hadoop::hdfs::DatanodeInfoProto; |
| using hadoop::hdfs::DatanodeIDProto; |
| using hadoop::hdfs::ExtendedBlockProto; |
| using hadoop::hdfs::LocatedBlockProto; |
| using hadoop::hdfs::LocatedBlocksProto; |
| |
| using ::testing::_; |
| using ::testing::InvokeArgument; |
| using ::testing::Return; |
| |
| using namespace hdfs; |
| |
| class MockReader : public BlockReader { |
| public: |
| MOCK_METHOD2( |
| AsyncReadPacket, |
| void(const boost::asio::mutable_buffers_1 &, |
| const std::function<void(const Status &, size_t transferred)> &)); |
| |
| MOCK_METHOD5(AsyncRequestBlock, |
| void(const std::string &client_name, |
| const hadoop::hdfs::ExtendedBlockProto *block, |
| uint64_t length, uint64_t offset, |
| const std::function<void(Status)> &handler)); |
| |
| MOCK_METHOD5(AsyncReadBlock, void( |
| const std::string & client_name, |
| const hadoop::hdfs::LocatedBlockProto &block, |
| size_t offset, |
| const MutableBuffer &buffer, |
| const std::function<void(const Status &, size_t)> handler)); |
| |
| virtual void CancelOperation() override { |
| /* no-op, declared pure virtual */ |
| } |
| }; |
| |
| class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> { |
| void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override { |
| handler(Status::OK(), shared_from_this()); |
| } |
| |
| void async_read_some(const MutableBuffer &buf, |
| std::function<void (const boost::system::error_code & error, |
| std::size_t bytes_transferred) > handler) override { |
| (void)buf; |
| handler(boost::asio::error::fault, 0); |
| } |
| |
| void async_write_some(const ConstBuffer &buf, |
| std::function<void (const boost::system::error_code & error, |
| std::size_t bytes_transferred) > handler) override { |
| (void)buf; |
| handler(boost::asio::error::fault, 0); |
| } |
| |
| virtual void Cancel() override { |
| /* no-op, declared pure virtual */ |
| } |
| }; |
| |
| |
| class PartialMockFileHandle : public FileHandleImpl { |
| using FileHandleImpl::FileHandleImpl; |
| public: |
| std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>(); |
| protected: |
| std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, |
| std::shared_ptr<DataNodeConnection> dn, |
| std::shared_ptr<hdfs::LibhdfsEvents> event_handlers) override |
| { |
| (void) options; (void) dn; (void) event_handlers; |
| assert(mock_reader_); |
| return mock_reader_; |
| } |
| std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( |
| std::shared_ptr<IoService> io_service, |
| const ::hadoop::hdfs::DatanodeInfoProto & dn, |
| const hadoop::common::TokenProto * token) override { |
| (void) io_service; (void) dn; (void) token; |
| return std::make_shared<MockDNConnection>(); |
| } |
| |
| |
| }; |
| |
| TEST(BadDataNodeTest, TestNoNodes) { |
| auto file_info = std::make_shared<struct FileInfo>(); |
| file_info->file_length_ = 1; //To avoid running into EOF |
| file_info->blocks_.push_back(LocatedBlockProto()); |
| LocatedBlockProto & block = file_info->blocks_[0]; |
| ExtendedBlockProto *b = block.mutable_b(); |
| b->set_poolid(""); |
| b->set_blockid(1); |
| b->set_generationstamp(1); |
| b->set_numbytes(4096); |
| |
| // Set up the one block to have one datanode holding it |
| DatanodeInfoProto *di = block.add_locs(); |
| DatanodeIDProto *dnid = di->mutable_id(); |
| dnid->set_datanodeuuid("foo"); |
| |
| char buf[4096] = { |
| 0, |
| }; |
| std::shared_ptr<IoService> io_service = IoService::MakeShared(); |
| auto bad_node_tracker = std::make_shared<BadDataNodeTracker>(); |
| auto monitors = std::make_shared<LibhdfsEvents>(); |
| bad_node_tracker->AddBadNode("foo"); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| |
| PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, bad_node_tracker, monitors); |
| Status stat; |
| size_t read = 0; |
| |
| // Exclude the one datanode with the data |
| is.AsyncPreadSome(0, boost::asio::buffer(buf, sizeof(buf)), nullptr, |
| [&stat, &read](const Status &status, const std::string &, size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| |
| // Should fail with no resource available |
| ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code()); |
| ASSERT_EQ(0UL, read); |
| } |
| |
| TEST(BadDataNodeTest, NNEventCallback) { |
| auto file_info = std::make_shared<struct FileInfo>(); |
| file_info->file_length_ = 1; //To avoid running into EOF |
| file_info->blocks_.push_back(LocatedBlockProto()); |
| LocatedBlockProto & block = file_info->blocks_[0]; |
| ExtendedBlockProto *b = block.mutable_b(); |
| b->set_poolid(""); |
| b->set_blockid(1); |
| b->set_generationstamp(1); |
| b->set_numbytes(4096); |
| |
| // Set up the one block to have one datanodes holding it |
| DatanodeInfoProto *di = block.add_locs(); |
| DatanodeIDProto *dnid = di->mutable_id(); |
| dnid->set_datanodeuuid("dn1"); |
| |
| char buf[4096] = { |
| 0, |
| }; |
| std::shared_ptr<IoService> io_service = IoService::MakeShared(); |
| auto tracker = std::make_shared<BadDataNodeTracker>(); |
| |
| |
| // Set up event callbacks |
| int calls = 0; |
| std::vector<std::string> callbacks; |
| auto monitors = std::make_shared<LibhdfsEvents>(); |
| monitors->set_file_callback([&calls, &callbacks] (const char * event, |
| const char * cluster, |
| const char * file, |
| int64_t value) { |
| (void)cluster; (void) file; (void)value; |
| callbacks.push_back(event); |
| |
| // Allow connect call to succeed by fail on read |
| if (calls++ == 1) |
| return event_response::test_err(Status::Error("Test")); |
| |
| return event_response::make_ok(); |
| }); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| |
| PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); |
| Status stat; |
| size_t read = 0; |
| |
| EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) |
| // Will return OK, but our callback will subvert it |
| .WillOnce(InvokeArgument<4>( |
| Status::OK(), 0)); |
| |
| is.AsyncPreadSome( |
| 0, boost::asio::buffer(buf, sizeof(buf)), nullptr, |
| [&stat, &read](const Status &status, const std::string &, |
| size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| |
| ASSERT_FALSE(stat.ok()); |
| ASSERT_EQ(2, callbacks.size()); |
| ASSERT_EQ(FILE_DN_CONNECT_EVENT, callbacks[0]); |
| ASSERT_EQ(FILE_DN_READ_EVENT, callbacks[1]); |
| } |
| |
| |
| TEST(BadDataNodeTest, RecoverableError) { |
| auto file_info = std::make_shared<struct FileInfo>(); |
| file_info->file_length_ = 1; //To avoid running into EOF |
| file_info->blocks_.push_back(LocatedBlockProto()); |
| LocatedBlockProto & block = file_info->blocks_[0]; |
| ExtendedBlockProto *b = block.mutable_b(); |
| b->set_poolid(""); |
| b->set_blockid(1); |
| b->set_generationstamp(1); |
| b->set_numbytes(4096); |
| |
| // Set up the one block to have one datanode holding it |
| DatanodeInfoProto *di = block.add_locs(); |
| DatanodeIDProto *dnid = di->mutable_id(); |
| dnid->set_datanodeuuid("foo"); |
| |
| char buf[4096] = { |
| 0, |
| }; |
| std::shared_ptr<IoService> io_service = IoService::MakeShared(); |
| auto tracker = std::make_shared<BadDataNodeTracker>(); |
| auto monitors = std::make_shared<LibhdfsEvents>(); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| |
| PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); |
| Status stat; |
| size_t read = 0; |
| EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) |
| // resource unavailable error |
| .WillOnce(InvokeArgument<4>( |
| Status::ResourceUnavailable("Unable to get some resource, try again later"), 0)); |
| |
| |
| is.AsyncPreadSome( |
| 0, boost::asio::buffer(buf, sizeof(buf)), nullptr, |
| [&stat, &read](const Status &status, const std::string &, |
| size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| |
| ASSERT_FALSE(stat.ok()); |
| |
| std::string failing_dn = "id_of_bad_datanode"; |
| if (!stat.ok()) { |
| if (FileHandle::ShouldExclude(stat)) { |
| tracker->AddBadNode(failing_dn); |
| } |
| } |
| |
| ASSERT_FALSE(tracker->IsBadNode(failing_dn)); |
| } |
| |
| TEST(BadDataNodeTest, InternalError) { |
| auto file_info = std::make_shared<struct FileInfo>(); |
| file_info->file_length_ = 1; //To avoid running into EOF |
| file_info->blocks_.push_back(LocatedBlockProto()); |
| LocatedBlockProto & block = file_info->blocks_[0]; |
| ExtendedBlockProto *b = block.mutable_b(); |
| b->set_poolid(""); |
| b->set_blockid(1); |
| b->set_generationstamp(1); |
| b->set_numbytes(4096); |
| |
| // Set up the one block to have one datanode holding it |
| DatanodeInfoProto *di = block.add_locs(); |
| DatanodeIDProto *dnid = di->mutable_id(); |
| dnid->set_datanodeuuid("foo"); |
| |
| char buf[4096] = { |
| 0, |
| }; |
| std::shared_ptr<IoService> io_service = IoService::MakeShared(); |
| auto tracker = std::make_shared<BadDataNodeTracker>(); |
| auto monitors = std::make_shared<LibhdfsEvents>(); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| |
| PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); |
| Status stat; |
| size_t read = 0; |
| EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) |
| // resource unavailable error |
| .WillOnce(InvokeArgument<4>( |
| Status::Exception("server_explosion_exception", |
| "the server exploded"), |
| sizeof(buf))); |
| |
| is.AsyncPreadSome( |
| 0, boost::asio::buffer(buf, sizeof(buf)), nullptr, |
| [&stat, &read](const Status &status, const std::string &, |
| size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| |
| ASSERT_FALSE(stat.ok()); |
| |
| std::string failing_dn = "id_of_bad_datanode"; |
| if (!stat.ok()) { |
| if (FileHandle::ShouldExclude(stat)) { |
| tracker->AddBadNode(failing_dn); |
| } |
| } |
| |
| ASSERT_TRUE(tracker->IsBadNode(failing_dn)); |
| } |
| |
| int main(int argc, char *argv[]) { |
| // The following line must be executed to initialize Google Mock |
| // (and Google Test) before running the tests. |
| ::testing::InitGoogleMock(&argc, argv); |
| int exit_code = RUN_ALL_TESTS(); |
| |
| // Clean up static data and prevent valgrind memory leaks |
| google::protobuf::ShutdownProtobufLibrary(); |
| return exit_code; |
| } |