blob: de36eac399f4b835b9c959d84020d38d6c2bacaf [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "client/FileSystem.h"
#include "client/FileSystemImpl.h"
#include "client/FileSystemInter.h"
#include "client/OutputStream.h"
#include "client/OutputStreamImpl.h"
#include "client/Packet.h"
#include "client/Pipeline.h"
#include "DateTime.h"
#include "MockFileSystemInter.h"
#include "MockCryptoCodec.h"
#include "MockLeaseRenewer.h"
#include "MockPipeline.h"
#include "NamenodeStub.h"
#include "server/ExtendedBlock.h"
#include "TestDatanodeStub.h"
#include "TestUtil.h"
#include "Thread.h"
#include "XmlConfig.h"
#include <string>
using namespace Hdfs;
using namespace Internal;
using namespace Hdfs::Mock;
using namespace testing;
using ::testing::AtLeast;
#define BASE_DIR "test/"
class TestOutputStream: public ::testing::Test {
public:
TestOutputStream() {
renewer = MakeMockLeaseRenewer();
}
~TestOutputStream() {
ResetMockLeaseRenewer(renewer);
}
protected:
shared_ptr<LeaseRenewer> renewer;
MockFileSystemInter fs;
OutputStreamImpl ous;
};
class MockPipelineStub: public PipelineStub {
public:
MOCK_METHOD0(getPipeline, shared_ptr<MockPipeline> ());
};
class MockNamenodeStub: public NamenodeStub {
public:
MOCK_METHOD0(getNamenode, MockNamenode * ());
};
static void LeaseRenew(int flag) {
Config conf;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
MockNamenodeStub stub;
SessionConfig sconf(conf);
shared_ptr<MockFileSystemInter> myfs(new MockFileSystemInter());
EXPECT_CALL(*myfs, getFileStatus(_)).Times(AtMost(1)).WillOnce(Return(fileinfo));
EXPECT_CALL(*myfs, getConf()).Times(1).WillOnce(ReturnRef(sconf));
//EXPECT_CALL(stub, getNamenode()).Times(1).WillOnce(Return(nn));
OutputStreamImpl leaseous;
if (flag & Append) {
EXPECT_CALL(*myfs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
} else {
EXPECT_CALL(*myfs, create(_, _, _, _, _, _)).Times(1);
}
EXPECT_CALL(*myfs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_CALL(*myfs, getStandardPath(_)).Times(1);
EXPECT_NO_THROW(DebugException(leaseous.open(myfs, BASE_DIR"testrenewlease", flag, 0644, true, 0, 2048)));
EXPECT_NO_THROW(leaseous.close());
};
static void heartBeatSender(int flag) {
OutputStreamImpl ous;
shared_ptr<MockPipeline> pipeline(new MockPipeline());
MockPipelineStub stub;
ous.stub = &stub;
MockFileSystemInter * fs = new MockFileSystemInter;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
Config conf;
const SessionConfig sessionConf(conf);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testheartBeat"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
if (flag & Append) {
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
} else {
EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1);
}
EXPECT_CALL(*fs, registerOpenedOutputStream()).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testheartBeat", flag, 0644, false, 3, 1024 * 1024));
char buffer[20];
Hdfs::FillBuffer(buffer, sizeof(buffer), 0);
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipeline));
EXPECT_CALL(*pipeline, send(_)).Times(3);
EXPECT_CALL(*pipeline, flush()).Times(1);
EXPECT_NO_THROW(ous.flush());
sleep_for(seconds(21));
EXPECT_CALL(*pipeline, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*fs, unregisterOpenedOutputStream()).Times(1);
EXPECT_NO_THROW(ous.close());
}
static void heartBeatSenderThrow(int flag) {
OutputStreamImpl ous;
shared_ptr<MockPipeline> pipeline(new MockPipeline());
MockPipelineStub stub;
ous.stub = &stub;
MockFileSystemInter * fs = new MockFileSystemInter;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
Config conf;
const SessionConfig sessionConf(conf);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
HdfsIOException e("test", "test", 3, "test");
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testheartBeat"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
if (flag & Append) {
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
} else {
EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1);
}
EXPECT_CALL(*fs, registerOpenedOutputStream()).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testheartBeat", flag, 0644, false, 3, 1024 * 1024));
char buffer[20];
Hdfs::FillBuffer(buffer, sizeof(buffer), 0);
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipeline));
EXPECT_CALL(*pipeline, send(_)).Times(2).WillOnce(Return()).WillOnce(Throw(e));
EXPECT_CALL(*pipeline, flush()).Times(1);
EXPECT_NO_THROW(ous.flush());
sleep_for(seconds(11));
EXPECT_CALL(*pipeline, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*fs, unregisterOpenedOutputStream()).Times(1);
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, LeaseRenewForAppend_Success) {
LeaseRenew(Create | Append);
}
TEST_F(TestOutputStream, LeaseRenewForCreate_Success) {
LeaseRenew(Create);
}
TEST_F(TestOutputStream, DISABLED_heartBeatSenderForAppend_Success) {
heartBeatSender(Create | Append);
}
TEST_F(TestOutputStream, DISABLED_heartBeatSenderForCreate_Success) {
heartBeatSender(Create);
}
TEST_F(TestOutputStream, DISABLED_heartBeatSenderForCreate_Throw) {
heartBeatSenderThrow(Create);
}
TEST_F(TestOutputStream, DISABLED_heartBeatSenderForAppend_Throw) {
heartBeatSenderThrow(Create | Append);
}
TEST_F(TestOutputStream, DISABLED_openForCreate_Success) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create, 0644, false, 3, 1024 * 1024));
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, DISABLED_registerForCreate_Success) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testregiester"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testregiester", Create, 0644, false, 3, 1024 * 1024));
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, registerForAppend_Success) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testregiester"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testregiester", Append, 0644, false, 0, 0));
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, openForCreate_Fail) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
HdfsIOException e("test", "test", 2, "test");
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, create(_, _, _, _, _, _)).Times(1).WillOnce(Throw(e));
EXPECT_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create, 0644, false, 3, 1024 * 1024), HdfsIOException);
}
TEST_F(TestOutputStream, openForAppend_Success) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Append, 0644, false, 0, 0));
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, openForAppend_Fail) {
OutputStreamImpl ous;
MockFileSystemInter * fs = new MockFileSystemInter;
Config conf;
const SessionConfig sessionConf(conf);
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Throw(FileNotFoundException("test", "test", 2, "test")));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Append, 0644, false, 0, 0), FileNotFoundException);
}
TEST_F(TestOutputStream, append_Success) {
OutputStreamImpl ous;
shared_ptr<MockPipeline> pipelineStub(new MockPipeline());
MockPipelineStub stub;
ous.stub = &stub;
MockFileSystemInter * fs = new MockFileSystemInter;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
Config conf;
const SessionConfig sessionConf(conf);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create | Append, 0644, false, 3, 2048));
char buffer[4096 + 523];
Hdfs::FillBuffer(buffer, sizeof(buffer), 0);
EXPECT_CALL(stub, getPipeline()).Times(3).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub));
EXPECT_CALL(*pipelineStub, send(_)).Times(4);
EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(2);
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, appendEncryption_Success) {
OutputStreamImpl ous;
shared_ptr<MockPipeline> pipelineStub(new MockPipeline());
MockPipelineStub stub;
ous.stub = &stub;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
Config conf;
conf.set("hadoop.kms.authentication.type", "simple");
conf.set("dfs.encryption.key.provider.uri","kms://http@0.0.0.0:16000/kms");
SessionConfig sconf(conf);
shared_ptr<SessionConfig> sessionConf(new SessionConfig(conf));
UserInfo userInfo;
userInfo.setRealUser("abai");
shared_ptr<RpcAuth> auth(new RpcAuth(userInfo, RpcAuth::ParseMethod(sessionConf->getKmsMethod())));
FileEncryptionInfo * encryptionInfo = fileinfo.getFileEncryption();
encryptionInfo->setKey("TDE");
encryptionInfo->setKeyName("TDEName");
shared_ptr<KmsClientProvider> kcp(new KmsClientProvider(auth, sessionConf));
int32_t bufSize = 8192;
MockCryptoCodec *cryptoC= new MockCryptoCodec(encryptionInfo, kcp, bufSize);
ous.setCryptoCodec(shared_ptr<CryptoCodec>(cryptoC));
MockFileSystemInter * fs = new MockFileSystemInter;
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testopen"));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sconf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testopen", Create | Append, 0644, false, 3, 2048));
char buffer[4096 + 523];
Hdfs::FillBuffer(buffer, sizeof(buffer), 0);
EXPECT_CALL(stub, getPipeline()).Times(3).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub)).WillOnce(Return(pipelineStub));
EXPECT_CALL(*pipelineStub, send(_)).Times(4);
EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(2);
std::string bufferEn;
EXPECT_CALL(*cryptoC, cipher_wrap(_,_)).Times(1).WillOnce(Return(bufferEn));
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, flush_Success) {
OutputStreamImpl ous;
shared_ptr<MockPipeline> pipelineStub(new MockPipeline());
MockPipelineStub stub;
ous.stub = &stub;
MockFileSystemInter * fs = new MockFileSystemInter;
FileStatus fileinfo;
fileinfo.setBlocksize(2048);
fileinfo.setLength(1024);
Config conf;
const SessionConfig sessionConf(conf);
shared_ptr<LocatedBlock> lastBlock(new LocatedBlock);
HdfsIOException e("test", "test", 3, "test");
lastBlock->setNumBytes(0);
std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
lastBlockWithStatus.first = lastBlock;
lastBlockWithStatus.second = shared_ptr<FileStatus>(new FileStatus(fileinfo));
EXPECT_CALL(*fs, getStandardPath(_)).Times(1).WillOnce(Return("/testflush"));
EXPECT_CALL(*fs, getConf()).Times(1).WillOnce(ReturnRef(sessionConf));
EXPECT_CALL(*fs, append(_)).Times(1).WillOnce(Return(lastBlockWithStatus));
EXPECT_CALL(*fs, getFileStatus(_)).Times(1).WillOnce(Return(fileinfo));
EXPECT_CALL(GetMockLeaseRenewer(), StartRenew(_)).Times(1);
EXPECT_CALL(GetMockLeaseRenewer(), StopRenew(_)).Times(1);
EXPECT_NO_THROW(ous.open(shared_ptr<FileSystemInter>(fs), "testflush", Create | Append, 0644, false, 3, 1024 * 1024));
char buffer[20];
Hdfs::FillBuffer(buffer, sizeof(buffer), 0);
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(stub, getPipeline()).Times(1).WillOnce(Return(pipelineStub));
EXPECT_CALL(*pipelineStub, send(_)).Times(1);
EXPECT_CALL(*pipelineStub, flush()).Times(1);
EXPECT_NO_THROW(ous.flush());
EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);
EXPECT_CALL(*fs, complete(_, _)).Times(1).WillOnce(Return(true));
EXPECT_NO_THROW(ous.close());
}
TEST_F(TestOutputStream, ValidateFirstBadLink) {
EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat(""));
EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:1234"));
EXPECT_NO_THROW(PipelineImpl::checkBadLinkFormat("2001:0db8:85a3:0000:0000:8a2e:0370:7334:50010"));
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("3"), HdfsException);
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8"), HdfsException);
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:"), HdfsException);
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.888:50010"), HdfsException);
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:500101"), HdfsException);
EXPECT_THROW(PipelineImpl::checkBadLinkFormat("8.8.8.8:50010a"), HdfsException);
}