| /******************************************************************** |
| * 2014 - |
| * open source under Apache License Version 2.0 |
| ********************************************************************/ |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "gtest/gtest.h" |
| #include "gmock/gmock.h" |
| |
| #include "client/FileSystem.h" |
| #include "client/FileSystemImpl.h" |
| #include "client/FileSystemInter.h" |
| #include "client/OutputStream.h" |
| #include "client/OutputStreamImpl.h" |
| #include "client/Packet.h" |
| #include "client/Pipeline.h" |
| #include "DateTime.h" |
| #include "MockFileSystemInter.h" |
| #include "MockCryptoCodec.h" |
| #include "MockLeaseRenewer.h" |
| #include "MockPipeline.h" |
| #include "NamenodeStub.h" |
| #include "server/ExtendedBlock.h" |
| #include "TestDatanodeStub.h" |
| #include "TestUtil.h" |
| #include "Thread.h" |
| #include "XmlConfig.h" |
| |
| #include <string> |
| |
| using namespace Hdfs; |
| using namespace Internal; |
| using namespace Hdfs::Mock; |
| using namespace testing; |
| using ::testing::AtLeast; |
| |
| #define BASE_DIR "test/" |
| |
| class TestOutputStream: public ::testing::Test { |
| public: |
| TestOutputStream() { |
| renewer = MakeMockLeaseRenewer(); |
| } |
| |
| ~TestOutputStream() { |
| ResetMockLeaseRenewer(renewer); |
| } |
| |
| protected: |
| shared_ptr<LeaseRenewer> renewer; |
| MockFileSystemInter fs; |
| OutputStreamImpl ous; |
| }; |
| |
| class MockPipelineStub: public PipelineStub { |
| public: |
| MOCK_METHOD0(getPipeline, shared_ptr<MockPipeline> ()); |
| }; |
| |
| class MockNamenodeStub: public NamenodeStub { |
| public: |
| MOCK_METHOD0(getNamenode, MockNamenode * ()); |
| }; |
| |
| static void LeaseRenew(int flag) { |
| Config conf; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| MockNamenodeStub stub; |
| SessionConfig sconf(conf); |
| shared_ptr<MockFileSystemInter> myfs(new MockFileSystemInter()); |
| EXPECT_CALL(*myfs, getFileStatus(_)).Times(AtMost(1)).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(*myfs, getConf()).Times(1).WillOnce(ReturnRef(sconf)); |
| //EXPECT_CALL(stub, getNamenode()).Times(1).WillOnce(Return(nn)); |
| OutputStreamImpl leaseous; |
| |
| if (flag & Append) { |
| EXPECT_CALL(*myfs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| } else { |
| EXPECT_CALL(*myfs, create(_, _, _, _, _, _)).Times(1); |
| } |
| |
| EXPECT_CALL(*myfs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_CALL(*myfs, getStandardPath(_)).Times(1); |
| EXPECT_NO_THROW(DebugException(leaseous.open(myfs, BASE_DIR"testrenewlease", flag, 0644, true, 0, 2048))); |
| EXPECT_NO_THROW(leaseous.close()); |
| }; |
| |
| static void heartBeatSender(int flag) { |
| OutputStreamImpl ous; |
| shared_ptr<MockPipeline> pipeline(new MockPipeline()); |
| MockPipelineStub stub; |
| ous.stub = &stub; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testheartBeat")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| |
| if (flag & Append) { |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| } else { |
| EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1); |
| } |
| |
| EXPECT_CALL(*fs, registerOpenedOutputStream()).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testheartBeat", flag, 0644, false, 3, 1024 * 1024)); |
| char buffer[20]; |
| Hdfs::FillBuffer(buffer, sizeof(buffer), 0); |
| EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); |
| EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipeline)); |
| EXPECT_CALL(*pipeline, send(_)).Times(3); |
| EXPECT_CALL(*pipeline, flush()).Times(1); |
| EXPECT_NO_THROW(ous.flush()); |
| sleep_for(seconds(21)); |
| EXPECT_CALL(*pipeline, close(_)).Times(1).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(1); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_CALL(*fs, unregisterOpenedOutputStream()).Times(1); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| static void heartBeatSenderThrow(int flag) { |
| OutputStreamImpl ous; |
| shared_ptr<MockPipeline> pipeline(new MockPipeline()); |
| MockPipelineStub stub; |
| ous.stub = &stub; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| HdfsIOException e("test", "test", 3, "test"); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testheartBeat")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| |
| if (flag & Append) { |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| } else { |
| EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1); |
| } |
| |
| EXPECT_CALL(*fs, registerOpenedOutputStream()).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testheartBeat", flag, 0644, false, 3, 1024 * 1024)); |
| char buffer[20]; |
| Hdfs::FillBuffer(buffer, sizeof(buffer), 0); |
| EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); |
| EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipeline)); |
| EXPECT_CALL(*pipeline, send(_)).Times(2).WillOnce(Return()).WillOnce(Throw(e)); |
| EXPECT_CALL(*pipeline, flush()).Times(1); |
| EXPECT_NO_THROW(ous.flush()); |
| sleep_for(seconds(11)); |
| EXPECT_CALL(*pipeline, close(_)).Times(1).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(1); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_CALL(*fs, unregisterOpenedOutputStream()).Times(1); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, LeaseRenewForAppend_Success) { |
| LeaseRenew(Create | Append); |
| } |
| |
| TEST_F(TestOutputStream, LeaseRenewForCreate_Success) { |
| LeaseRenew(Create); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_heartBeatSenderForAppend_Success) { |
| heartBeatSender(Create | Append); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_heartBeatSenderForCreate_Success) { |
| heartBeatSender(Create); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_heartBeatSenderForCreate_Throw) { |
| heartBeatSenderThrow(Create); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_heartBeatSenderForAppend_Throw) { |
| heartBeatSenderThrow(Create | Append); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_openForCreate_Success) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create, 0644, false, 3, 1024 * 1024)); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, DISABLED_registerForCreate_Success) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testregiester")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testregiester", Create, 0644, false, 3, 1024 * 1024)); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, registerForAppend_Success) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testregiester")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testregiester", Append, 0644, false, 0, 0)); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, openForCreate_Fail) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| HdfsIOException e("test", "test", 2, "test"); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1).WillOnce(Throw(e)); |
| EXPECT_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create, 0644, false, 3, 1024 * 1024), HdfsIOException); |
| } |
| |
| |
| TEST_F(TestOutputStream, openForAppend_Success) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Append, 0644, false, 0, 0)); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, openForAppend_Fail) { |
| OutputStreamImpl ous; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Throw(FileNotFoundException("test", "test", 2, "test"))); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Append, 0644, false, 0, 0), FileNotFoundException); |
| } |
| |
| TEST_F(TestOutputStream, append_Success) { |
| OutputStreamImpl ous; |
| shared_ptr<MockPipeline> pipelineStub(new MockPipeline()); |
| MockPipelineStub stub; |
| ous.stub = &stub; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create | Append, 0644, false, 3, 2048)); |
| char buffer[4096 + 523]; |
| Hdfs::FillBuffer(buffer, sizeof(buffer), 0); |
| EXPECT_CALL(stub, getPipeline()).Times(3).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)); |
| EXPECT_CALL(*pipelineStub, send(_)).Times(4); |
| EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(2); |
| EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); |
| EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(1); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, appendEncryption_Success) { |
| OutputStreamImpl ous; |
| shared_ptr<MockPipeline> pipelineStub(new MockPipeline()); |
| MockPipelineStub stub; |
| ous.stub = &stub; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| |
| Config conf; |
| conf.set("hadoop.kms.authentication.type", "simple"); |
| conf.set("dfs.encryption.key.provider.uri","kms://http@0.0.0.0:16000/kms"); |
| SessionConfig sconf(conf); |
| shared_ptr<SessionConfig> sessionConf(new SessionConfig(conf)); |
| UserInfo userInfo; |
| userInfo.setRealUser("abai"); |
| shared_ptr<RpcAuth> auth(new RpcAuth(userInfo, RpcAuth::ParseMethod(sessionConf->getKmsMethod()))); |
| FileEncryptionInfo * encryptionInfo = fileinfo.getFileEncryption(); |
| encryptionInfo->setKey("TDE"); |
| encryptionInfo->setKeyName("TDEName"); |
| shared_ptr<KmsClientProvider> kcp(new KmsClientProvider(auth, sessionConf)); |
| int32_t bufSize = 8192; |
| MockCryptoCodec *cryptoC= new MockCryptoCodec(encryptionInfo, kcp, bufSize); |
| ous.setCryptoCodec(shared_ptr<CryptoCodec>(cryptoC)); |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen")); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sconf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create | Append, 0644, false, 3, 2048)); |
| |
| char buffer[4096 + 523]; |
| Hdfs::FillBuffer(buffer, sizeof(buffer), 0); |
| EXPECT_CALL(stub, getPipeline()).Times(3).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)); |
| EXPECT_CALL(*pipelineStub, send(_)).Times(4); |
| EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(2); |
| std::string bufferEn; |
| EXPECT_CALL(*cryptoC, cipher_wrap(_,_)).Times(1).WillOnce(Return(bufferEn)); |
| EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); |
| EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(1); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, flush_Success) { |
| OutputStreamImpl ous; |
| shared_ptr<MockPipeline> pipelineStub(new MockPipeline()); |
| MockPipelineStub stub; |
| ous.stub = &stub; |
| MockFileSystemInter * fs = new MockFileSystemInter; |
| FileStatus fileinfo; |
| fileinfo.setBlocksize(2048); |
| fileinfo.setLength(1024); |
| Config conf; |
| const SessionConfig sessionConf(conf); |
| shared_ptr<LocatedBlock> lastBlock(new LocatedBlock); |
| HdfsIOException e("test", "test", 3, "test"); |
| lastBlock->setNumBytes(0); |
| std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; |
| lastBlockWithStatus.first = lastBlock; |
| lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo)); |
| EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testflush")); |
| EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf)); |
| EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus)); |
| EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo)); |
| EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1); |
| EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1); |
| EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testflush", Create | Append, 0644, false, 3, 1024 * 1024)); |
| char buffer[20]; |
| Hdfs::FillBuffer(buffer, sizeof(buffer), 0); |
| EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer))); |
| EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipelineStub)); |
| EXPECT_CALL(*pipelineStub, send(_)).Times(1); |
| EXPECT_CALL(*pipelineStub, flush()).Times(1); |
| EXPECT_NO_THROW(ous.flush()); |
| EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock)); |
| EXPECT_CALL(*fs, fsync(_)).Times(1); |
| EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true)); |
| EXPECT_NO_THROW(ous.close()); |
| } |
| |
| TEST_F(TestOutputStream, ValidateFirstBadLink) { |
| EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat("")); |
| EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:1234")); |
| EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat("2001:0db8:85a3:0000:0000:8a2e:0370:7334:50010")); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("3"), HdfsException); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8"), HdfsException); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:"), HdfsException); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.888:50010"), HdfsException); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:500101"), HdfsException); |
| EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:50010a"), HdfsException); |
| } |
| |