blob: 8ffb5d1c840954b2ee1a7e8005fb7b1a78389752 [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
#include "Atomic.h"
#include "Checksum.h"
#include "DateTime.h"
#include "ExceptionInternal.h"
#include "FileSystem.h"
#include "Memory.h"
#include "OutputStreamInter.h"
#include "PacketPool.h"
#include "Permission.h"
#include "Pipeline.h"
#include "server/LocatedBlock.h"
#include "SessionConfig.h"
#include "Thread.h"
#include "CryptoCodec.h"
#include "KmsClientProvider.h"
#ifdef MOCK
#include "PipelineStub.h"
#endif
namespace Hdfs {
namespace Internal {
/**
* A output stream used to write data to hdfs.
*/
class OutputStreamImpl: public OutputStreamInter {
public:
OutputStreamImpl();
~OutputStreamImpl();
/**
* To create or append a file.
* @param fs hdfs file system.
* @param path the file path.
* @param flag creation flag, can be Create, Append or Create|Overwrite.
* @param permission create a new file with given permission.
* @param createParent if the parent does not exist, create it.
* @param replication create a file with given number of replication.
* @param blockSize create a file with given block size.
*/
void open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
const Permission & permission, bool createParent, int replication,
int64_t blockSize);
/**
* To append data to file.
* @param buf the data used to append.
* @param size the data size.
*/
void append(const char * buf, int64_t size);
/**
* Flush all data in buffer and waiting for ack.
* Will block until get all acks.
*/
void flush();
/**
* return the current file length.
* @return current file length.
*/
int64_t tell();
/**
* @ref OutputStream::sync
*/
void sync();
/**
* close the stream.
*/
void close();
/**
* Output a readable string of this output stream.
*/
std::string toString();
/**
* Keep the last error of this stream.
* @error the error to be kept.
*/
void setError(const exception_ptr & error);
/**
* Get KmsClientProvider.
*/
shared_ptr<KmsClientProvider> getKmsClientProvider();
/**
* Set KmsClientProvider.
*/
void setKmsClientProvider(shared_ptr<KmsClientProvider> kcp);
/**
* Get CryptoCodec.
*/
shared_ptr<CryptoCodec> getCryptoCodec();
/**
* Set CryptoCodec.
*/
void setCryptoCodec(shared_ptr<CryptoCodec> cryptoCodec);
private:
void appendChunkToPacket(const char * buf, int size);
void appendInternal(const char * buf, int64_t size);
void checkStatus();
void closePipeline();
void completeFile(bool throwError);
void computePacketChunkSize();
void flushInternal(bool needSync);
//void heartBeatSenderRoutine();
void initAppend();
void openInternal(shared_ptr<FileSystemInter> fs, const char * path, int flag,
const Permission & permission, bool createParent, int replication,
int64_t blockSize);
void reset();
void sendPacket(shared_ptr<Packet> packet);
void setupPipeline();
private:
//atomic<bool> heartBeatStop;
bool closed;
bool isAppend;
bool syncBlock;
//condition_variable condHeartBeatSender;
exception_ptr lastError;
int checksumSize;
int chunkSize;
int chunksPerPacket;
int closeTimeout;
int heartBeatInterval;
int packetSize;
int position; //cursor in buffer
int replication;
int64_t blockSize; //max size of block
int64_t bytesWritten; //the size of bytes has be written into packet (not include the data in chunk buffer).
int64_t cursor; //cursor in file.
int64_t lastFlushed; //the position last flushed
int64_t nextSeqNo;
mutex mut;
PacketPool packets;
shared_ptr<Checksum> checksum;
shared_ptr<FileSystemInter> filesystem;
shared_ptr<LocatedBlock> lastBlock;
shared_ptr<Packet> currentPacket;
shared_ptr<Pipeline> pipeline;
shared_ptr<SessionConfig> conf;
std::string path;
std::vector<char> buffer;
steady_clock::time_point lastSend;
//thread heartBeatSender;
FileStatus fileStatus;
shared_ptr<CryptoCodec> cryptoCodec;
shared_ptr<KmsClientProvider> kcp;
shared_ptr<RpcAuth> auth;
friend class Pipeline;
#ifdef MOCK
private:
Hdfs::Mock::PipelineStub * stub;
#endif
};
}
}
#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ */