| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "mock_connection.h" |
| |
| #include "datatransfer.pb.h" |
| #include "common/util.h" |
| #include "common/cancel_tracker.h" |
| #include "reader/block_reader.h" |
| #include "reader/datatransfer.h" |
| #include "reader/fileinfo.h" |
| |
| #include <google/protobuf/io/coded_stream.h> |
| #include <google/protobuf/io/zero_copy_stream_impl.h> |
| #include <gtest/gtest.h> |
| #include <gmock/gmock.h> |
| #include <gmock/gmock-spec-builders.h> |
| #include <gmock/gmock-generated-actions.h> |
| #include <boost/system/error_code.hpp> |
| #include <boost/asio/buffer.hpp> |
| #include <boost/asio/io_service.hpp> |
| |
| #include <iostream> |
| |
| using namespace hdfs; |
| |
| using ::hadoop::common::TokenProto; |
| using ::hadoop::hdfs::BlockOpResponseProto; |
| using ::hadoop::hdfs::ChecksumProto; |
| using ::hadoop::hdfs::DataTransferEncryptorMessageProto; |
| using ::hadoop::hdfs::ExtendedBlockProto; |
| using ::hadoop::hdfs::PacketHeaderProto; |
| using ::hadoop::hdfs::ReadOpChecksumInfoProto; |
| using ::hadoop::hdfs::LocatedBlockProto; |
| using ::hadoop::hdfs::LocatedBlocksProto; |
| |
| using boost::asio::buffer; |
| using boost::system::error_code; |
| using boost::asio::mutable_buffers_1; |
| using ::testing::_; |
| using ::testing::InvokeArgument; |
| using ::testing::Return; |
| using std::make_pair; |
| using std::string; |
| |
| namespace pb = ::google::protobuf; |
| namespace pbio = pb::io; |
| |
| namespace hdfs { |
| |
| class MockDNConnection : public MockConnectionBase, public DataNodeConnection{ |
| public: |
| MockDNConnection(boost::asio::io_service &io_service) |
| : MockConnectionBase(&io_service), OnRead([](){}) {} |
| MOCK_METHOD0(Produce, ProducerResult()); |
| |
| MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>)); |
| |
| /* event handler to trigger side effects */ |
| std::function<void(void)> OnRead; |
| |
| void async_read_some(const MutableBuffer &buf, |
| std::function<void (const boost::system::error_code & error, |
| std::size_t bytes_transferred) > handler) override { |
| this->OnRead(); |
| this->MockConnectionBase::async_read_some(buf, handler); |
| } |
| |
| void async_write_some(const ConstBuffer &buf, |
| std::function<void (const boost::system::error_code & error, |
| std::size_t bytes_transferred) > handler) override { |
| this->MockConnectionBase::async_write_some(buf, handler); |
| } |
| |
| void Cancel() override { |
| /* no-op, declared pure virtual */ |
| } |
| }; |
| |
| // Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we |
| // can test the logic of AsyncReadBlock |
| class PartialMockReader : public BlockReaderImpl { |
| public: |
| PartialMockReader() : |
| BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>(), CancelTracker::New()) {}; |
| |
| MOCK_METHOD2( |
| AsyncReadPacket, |
| void(const boost::asio::mutable_buffers_1 &, |
| const std::function<void(const Status &, size_t transferred)> &)); |
| |
| MOCK_METHOD5(AsyncRequestBlock, |
| void(const std::string &client_name, |
| const hadoop::hdfs::ExtendedBlockProto *block, |
| uint64_t length, uint64_t offset, |
| const std::function<void(Status)> &handler)); |
| }; |
| |
| |
| } |
| |
| static inline string ToDelimitedString(const pb::MessageLite *msg) { |
| string res; |
| res.reserve(hdfs::DelimitedPBMessageSize(msg)); |
| pbio::StringOutputStream os(&res); |
| pbio::CodedOutputStream out(&os); |
| out.WriteVarint32(msg->ByteSize()); |
| msg->SerializeToCodedStream(&out); |
| return res; |
| } |
| |
| static inline std::pair<error_code, string> Produce(const std::string &s) { |
| return make_pair(error_code(), s); |
| } |
| |
| static inline std::pair<error_code, string> ProducePacket( |
| const std::string &data, const std::string &checksum, int offset_in_block, |
| int seqno, bool last_packet) { |
| PacketHeaderProto proto; |
| proto.set_datalen(data.size()); |
| proto.set_offsetinblock(offset_in_block); |
| proto.set_seqno(seqno); |
| proto.set_lastpacketinblock(last_packet); |
| |
| char prefix[6]; |
| *reinterpret_cast<unsigned *>(prefix) = |
| htonl(data.size() + checksum.size() + sizeof(int32_t)); |
| *reinterpret_cast<short *>(prefix + sizeof(int32_t)) = |
| htons(proto.ByteSize()); |
| std::string payload(prefix, sizeof(prefix)); |
| payload.reserve(payload.size() + proto.ByteSize() + checksum.size() + |
| data.size()); |
| proto.AppendToString(&payload); |
| payload += checksum; |
| payload += data; |
| return std::make_pair(error_code(), std::move(payload)); |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadSingleTrunk) { |
| auto file_info = std::make_shared<struct FileInfo>(); |
| LocatedBlocksProto blocks; |
| LocatedBlockProto block; |
| char buf[4096] = { |
| 0, |
| }; |
| |
| Status stat; |
| size_t read = 0; |
| PartialMockReader reader; |
| EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) |
| .WillOnce(InvokeArgument<4>(Status::OK())); |
| EXPECT_CALL(reader, AsyncReadPacket(_, _)) |
| .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| reader.AsyncReadBlock( |
| *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), |
| [&stat, &read](const Status &status, size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(sizeof(buf), read); |
| read = 0; |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) { |
| LocatedBlockProto block; |
| char buf[4096] = { |
| 0, |
| }; |
| Status stat; |
| size_t read = 0; |
| |
| PartialMockReader reader; |
| EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) |
| .WillOnce(InvokeArgument<4>(Status::OK())); |
| |
| EXPECT_CALL(reader, AsyncReadPacket(_, _)) |
| .Times(4) |
| .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| reader.AsyncReadBlock( |
| *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), |
| [&stat, &read](const Status &status, size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(sizeof(buf), read); |
| read = 0; |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadError) { |
| LocatedBlockProto block; |
| char buf[4096] = { |
| 0, |
| }; |
| Status stat; |
| size_t read = 0; |
| PartialMockReader reader; |
| EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) |
| .WillOnce(InvokeArgument<4>(Status::OK())); |
| |
| EXPECT_CALL(reader, AsyncReadPacket(_, _)) |
| .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) |
| .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) |
| .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) |
| .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); |
| |
| const auto client_name = GetRandomClientName(); |
| ASSERT_NE(client_name, nullptr); |
| reader.AsyncReadBlock( |
| *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), |
| [&stat, &read](const Status &status, size_t transferred) { |
| stat = status; |
| read = transferred; |
| }); |
| ASSERT_FALSE(stat.ok()); |
| ASSERT_EQ(sizeof(buf) / 4 * 3, read); |
| read = 0; |
| } |
| |
| template <class Stream = MockDNConnection, class Handler> |
| static std::shared_ptr<BlockReaderImpl> |
| ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block, |
| uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, |
| const Handler &handler, CancelHandle cancel_handle = CancelTracker::New()) { |
| BlockReaderOptions options; |
| auto reader = std::make_shared<BlockReaderImpl>(options, conn, cancel_handle); |
| Status result; |
| reader->AsyncRequestBlock("libhdfs++", &block, length, offset, |
| [buf, reader, handler](const Status &stat) { |
| if (!stat.ok()) { |
| handler(stat, 0); |
| } else { |
| reader->AsyncReadPacket(buf, handler); |
| } |
| }); |
| return reader; |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadWholeBlock) { |
| static const size_t kChunkSize = 512; |
| static const string kChunkData(kChunkSize, 'a'); |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); |
| |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| bool done = false; |
| std::string data(kChunkSize, 0); |
| ReadContent(conn, block, kChunkSize, 0, |
| buffer(const_cast<char *>(data.c_str()), data.size()), |
| [&data, &io_service, &done](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kChunkSize, transferred); |
| ASSERT_EQ(kChunkData, data); |
| done = true; |
| io_service.stop(); |
| }); |
| io_service.run(); |
| ASSERT_TRUE(done); |
| } |
| |
| /* used for cancelation tests, global to avoid cluttering capture lists */ |
| CancelHandle packet_canceller; |
| |
| TEST(RemoteBlockReaderTest, TestCancelWhileReceiving) { |
| packet_canceller = CancelTracker::New(); |
| |
| static const size_t kChunkSize = 512; |
| static const string kChunkData(kChunkSize, 'a'); |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| |
| /** |
| * async_read would normally get called 5 times here; once for each |
| * continuation in the pipeline. Cancel will be triggered on the |
| * fourth call to catch the pipeline mid-execution. |
| **/ |
| int call_count = 0; |
| int trigger_at_count = 4; |
| auto cancel_trigger = [&call_count, &trigger_at_count]() { |
| call_count += 1; |
| std::cout << "read called " << call_count << " times" << std::endl; |
| if(call_count == trigger_at_count) |
| packet_canceller->set_canceled(); |
| }; |
| |
| conn->OnRead = cancel_trigger; |
| |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); |
| |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| bool done = false; |
| std::string data(kChunkSize, 0); |
| ReadContent(conn, block, kChunkSize, 0, |
| buffer(const_cast<char *>(data.c_str()), data.size()), |
| [&data, &io_service, &done](const Status &stat, size_t transferred) { |
| ASSERT_EQ(stat.code(), Status::kOperationCanceled); |
| ASSERT_EQ(0, transferred); |
| done = true; |
| io_service.stop(); |
| }, packet_canceller); |
| |
| io_service.run(); |
| ASSERT_TRUE(done); |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadWithinChunk) { |
| static const size_t kChunkSize = 1024; |
| static const size_t kLength = kChunkSize / 4 * 3; |
| static const size_t kOffset = kChunkSize / 4; |
| static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); |
| |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| ReadOpChecksumInfoProto *checksum_info = |
| block_op_resp.mutable_readopchecksuminfo(); |
| checksum_info->set_chunkoffset(0); |
| ChecksumProto *checksum = checksum_info->mutable_checksum(); |
| checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL); |
| checksum->set_bytesperchecksum(512); |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); |
| |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| bool done = false; |
| |
| string data(kLength, 0); |
| ReadContent(conn, block, data.size(), kOffset, |
| buffer(const_cast<char *>(data.c_str()), data.size()), |
| [&data, &io_service,&done](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kLength, transferred); |
| ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); |
| done = true; |
| io_service.stop(); |
| }); |
| io_service.run(); |
| ASSERT_TRUE(done); |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { |
| static const size_t kChunkSize = 1024; |
| static const string kChunkData(kChunkSize, 'a'); |
| |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); |
| |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| string data(kChunkSize, 0); |
| mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); |
| BlockReaderOptions options; |
| auto reader = std::make_shared<BlockReaderImpl>(options, conn, CancelTracker::New()); |
| Status result; |
| reader->AsyncRequestBlock( |
| "libhdfs++", &block, data.size(), 0, |
| [buf, reader, &data, &io_service](const Status &stat) { |
| ASSERT_TRUE(stat.ok()); |
| reader->AsyncReadPacket( |
| buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kChunkSize, transferred); |
| ASSERT_EQ(kChunkData, data); |
| data.clear(); |
| data.resize(kChunkSize); |
| transferred = 0; |
| reader->AsyncReadPacket( |
| buf, [&data,&io_service](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kChunkSize, transferred); |
| ASSERT_EQ(kChunkData, data); |
| io_service.stop(); |
| }); |
| }); |
| }); |
| io_service.run(); |
| } |
| |
| TEST(RemoteBlockReaderTest, TestReadCancelBetweenPackets) { |
| packet_canceller = CancelTracker::New(); |
| |
| static const size_t kChunkSize = 1024; |
| static const string kChunkData(kChunkSize, 'a'); |
| |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))); |
| /* the second AsyncReadPacket should never attempt to read */ |
| |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| string data(kChunkSize, 0); |
| mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); |
| BlockReaderOptions options; |
| auto reader = std::make_shared<BlockReaderImpl>(options, conn, packet_canceller); |
| Status result; |
| reader->AsyncRequestBlock( |
| "libhdfs++", &block, data.size(), 0, |
| [buf, reader, &data, &io_service](const Status &stat) { |
| ASSERT_TRUE(stat.ok()); |
| reader->AsyncReadPacket( |
| buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kChunkSize, transferred); |
| ASSERT_EQ(kChunkData, data); |
| data.clear(); |
| data.resize(kChunkSize); |
| transferred = 0; |
| |
| /* Cancel the operation.*/ |
| packet_canceller->set_canceled(); |
| |
| reader->AsyncReadPacket( |
| buf, [&data,&io_service](const Status &stat, size_t transferred) { |
| ASSERT_EQ(stat.code(), Status::kOperationCanceled); |
| ASSERT_EQ(0, transferred); |
| io_service.stop(); |
| }); |
| }); |
| }); |
| io_service.run(); |
| } |
| |
| |
| TEST(RemoteBlockReaderTest, TestSaslConnection) { |
| static const size_t kChunkSize = 512; |
| static const string kChunkData(kChunkSize, 'a'); |
| static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" |
| "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," |
| "charset=utf-8,algorithm=md5-sess"; |
| boost::asio::io_service io_service; |
| auto conn = std::make_shared<MockDNConnection>(io_service); |
| BlockOpResponseProto block_op_resp; |
| block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); |
| |
| DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1; |
| sasl_resp0.set_status( |
| ::hadoop::hdfs:: |
| DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); |
| sasl_resp0.set_payload(kAuthPayload); |
| sasl_resp1.set_status( |
| ::hadoop::hdfs:: |
| DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); |
| |
| EXPECT_CALL(*conn, Produce()) |
| .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) |
| .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) |
| .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) |
| .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); |
| |
| auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar"); |
| ExtendedBlockProto block; |
| block.set_poolid("foo"); |
| block.set_blockid(0); |
| block.set_generationstamp(0); |
| |
| std::string data(kChunkSize, 0); |
| sasl_conn->Handshake([sasl_conn, &block, &data, &io_service]( |
| const Status &s) { |
| ASSERT_TRUE(s.ok()); |
| ReadContent(sasl_conn, block, kChunkSize, 0, |
| buffer(const_cast<char *>(data.c_str()), data.size()), |
| [&data, &io_service](const Status &stat, size_t transferred) { |
| ASSERT_TRUE(stat.ok()); |
| ASSERT_EQ(kChunkSize, transferred); |
| ASSERT_EQ(kChunkData, data); |
| io_service.stop(); |
| }); |
| }); |
| io_service.run(); |
| } |
| |
| int main(int argc, char *argv[]) { |
| // The following line must be executed to initialize Google Mock |
| // (and Google Test) before running the tests. |
| ::testing::InitGoogleMock(&argc, argv); |
| int exit_code = RUN_ALL_TESTS(); |
| |
| // Clean up static data and prevent valgrind memory leaks |
| google::protobuf::ShutdownProtobufLibrary(); |
| return exit_code; |
| } |