blob: 7697d11f006dd0f79decae6762501d3616a92d12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef STORAGE_SRC_STORAGE_FORMAT_ORC_SEEKABLE_INPUT_STREAM_H_
#define STORAGE_SRC_STORAGE_FORMAT_ORC_SEEKABLE_INPUT_STREAM_H_
#include <google/protobuf/io/zero_copy_stream.h>
#include <snappy.h>
#include <algorithm>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <list>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include "storage/format/orc/input-stream.h"
#include "storage/format/orc/orc-proto-definition.h"
#include "zlib.h" //NOLINT
namespace orc {
void printBuffer(std::ostream& out, const char* buffer, // NOLINT
uint64_t length);
class PositionProvider {
private:
std::list<uint64_t>::const_iterator position;
public:
explicit PositionProvider(const std::list<uint64_t>& positions);
uint64_t next();
};
enum DecompressState {
DECOMPRESS_HEADER,
DECOMPRESS_START,
DECOMPRESS_CONTINUE,
DECOMPRESS_ORIGINAL,
DECOMPRESS_EOF
};
// A subclass of Google's ZeroCopyInputStream that supports seek.
// By extending Google's class, we get the ability to pass it directly
// to the protobuf readers.
class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {
public:
virtual ~SeekableInputStream();
virtual void seek(PositionProvider& position) = 0; // NOLINT
virtual std::string getName() const = 0;
};
// Create a seekable input stream based on a memory range.
class SeekableArrayInputStream : public SeekableInputStream {
private:
const char* data;
uint64_t length;
uint64_t position;
uint64_t blockSize;
public:
SeekableArrayInputStream(const unsigned char* list, uint64_t length,
uint64_t block_size = 0);
SeekableArrayInputStream(const char* list, uint64_t length,
uint64_t block_size = 0);
virtual ~SeekableArrayInputStream();
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
google::protobuf::int64 ByteCount() const override;
void seek(PositionProvider& position) override;
std::string getName() const override;
};
// Create a seekable input stream based on an input stream.
class SeekableFileInputStream : public SeekableInputStream {
protected:
dbcommon::MemoryPool& memoryPool;
InputStream* const input;
const uint64_t start;
const uint64_t length;
const uint64_t blockSize;
std::unique_ptr<DataBuffer<char> > buffer;
uint64_t position;
uint64_t pushBack;
public:
SeekableFileInputStream(InputStream* input, uint64_t offset,
uint64_t byteCount,
dbcommon::MemoryPool& pool, // NOLINT
uint64_t blockSize = 0);
explicit SeekableFileInputStream(InputStream* input,
dbcommon::MemoryPool& pool); // NOLINT
virtual ~SeekableFileInputStream();
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
int64_t ByteCount() const override;
void seek(PositionProvider& position) override;
std::string getName() const override;
};
class SeekableFileBloomFilterInputStream : public SeekableFileInputStream {
public:
SeekableFileBloomFilterInputStream(InputStream* input, uint64_t offset,
uint64_t byteCount,
dbcommon::MemoryPool& pool, // NOLINT
uint64_t blockSize = 0);
virtual ~SeekableFileBloomFilterInputStream();
bool Next(const void** data, int* size) override;
};
class ZlibDecompressionStream : public SeekableInputStream {
public:
ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
size_t blockSize,
dbcommon::MemoryPool& pool); // NOLINT
virtual ~ZlibDecompressionStream();
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
int64_t ByteCount() const override;
void seek(PositionProvider& position) override;
std::string getName() const override;
private:
void readBuffer(bool failOnEof) {
int length;
if (!input->Next(reinterpret_cast<const void**>(&inputBuffer), &length)) {
if (failOnEof) {
LOG_ERROR(ERRCODE_INTERNAL_ERROR,
"Read past EOF in "
"ZlibDecompressionStream::readBuffer");
}
state = DECOMPRESS_EOF;
inputBuffer = nullptr;
inputBufferEnd = nullptr;
} else {
inputBufferEnd = inputBuffer + length;
}
}
uint32_t readByte(bool failOnEof) {
if (inputBuffer == inputBufferEnd) {
readBuffer(failOnEof);
if (state == DECOMPRESS_EOF) {
return 0;
}
}
return static_cast<unsigned char>(*(inputBuffer++));
}
void readHeader() {
uint32_t header = readByte(false);
if (state != DECOMPRESS_EOF) {
header |= readByte(true) << 8;
header |= readByte(true) << 16;
if (header & 1) {
state = DECOMPRESS_ORIGINAL;
} else {
state = DECOMPRESS_START;
}
remainingLength = header >> 1;
} else {
remainingLength = 0;
}
}
dbcommon::MemoryPool& memoryPool;
const size_t blockSize;
std::unique_ptr<SeekableInputStream> input;
z_stream zstream;
DataBuffer<char> buffer;
// the current state
DecompressState state;
// the start of the current buffer
// This pointer is not owned by us. It is either owned by zstream or
// the underlying stream.
const char* outputBuffer;
// the size of the current buffer
size_t outputBufferLength;
// the size of the current chunk
size_t remainingLength;
// the last buffer returned from the input
const char* inputBuffer;
const char* inputBufferEnd;
// roughly the number of bytes returned
off_t bytesReturned;
};
class BlockDecompressionStream : public SeekableInputStream {
public:
BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
size_t blockSize,
dbcommon::MemoryPool& pool); // NOLINT
virtual ~BlockDecompressionStream() {}
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
int64_t ByteCount() const override;
void seek(PositionProvider& position) override;
std::string getName() const override = 0;
protected:
virtual uint64_t decompress(const char* input, uint64_t length, char* output,
size_t maxOutputLength) = 0;
std::string getStreamName() const { return input->getName(); }
private:
void readBuffer(bool failOnEof) {
int length;
if (!input->Next(reinterpret_cast<const void**>(&inputBufferPtr),
&length)) {
if (failOnEof) {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "%s getName() read past EOF",
getName().c_str());
}
state = DECOMPRESS_EOF;
inputBufferPtr = nullptr;
inputBufferPtrEnd = nullptr;
} else {
inputBufferPtrEnd = inputBufferPtr + length;
}
}
uint32_t readByte(bool failOnEof) {
if (inputBufferPtr == inputBufferPtrEnd) {
readBuffer(failOnEof);
if (state == DECOMPRESS_EOF) {
return 0;
}
}
return static_cast<unsigned char>(*(inputBufferPtr++));
}
void readHeader() {
uint32_t header = readByte(false);
if (state != DECOMPRESS_EOF) {
header |= readByte(true) << 8;
header |= readByte(true) << 16;
if (header & 1) {
state = DECOMPRESS_ORIGINAL;
} else {
state = DECOMPRESS_START;
}
remainingLength = header >> 1;
} else {
remainingLength = 0;
}
}
std::unique_ptr<SeekableInputStream> input;
dbcommon::MemoryPool& memoryPool;
// may need to stitch together multiple input buffers;
// to give snappy a contiguous block
DataBuffer<char> inputBuffer;
// uncompressed output
DataBuffer<char> outputBuffer;
// the current state
DecompressState state;
// the start of the current output buffer
const char* outputBufferPtr;
// the size of the current output buffer
size_t outputBufferLength;
// the size of the current chunk
size_t remainingLength;
// the last buffer returned from the input
const char* inputBufferPtr;
const char* inputBufferPtrEnd;
// bytes returned by this stream
off_t bytesReturned;
};
class SnappyDecompressionStream : public BlockDecompressionStream {
public:
SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
size_t blockSize,
dbcommon::MemoryPool& pool) // NOLINT
: BlockDecompressionStream(std::move(inStream), blockSize, pool) {
// PASS
}
std::string getName() const override {
std::ostringstream result;
result << "snappy(" << getStreamName() << ")";
return result.str();
}
protected:
uint64_t decompress(const char* input, uint64_t length, char* output,
size_t maxOutputLength) override;
};
class LzoDecompressionStream : public BlockDecompressionStream {
public:
LzoDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
size_t blockSize,
dbcommon::MemoryPool& pool) // NOLINT
: BlockDecompressionStream(std::move(inStream), blockSize, pool) {
// PASS
}
std::string getName() const override {
std::ostringstream result;
result << "lzo(" << getStreamName() << ")";
return result.str();
}
protected:
uint64_t decompress(const char* input, uint64_t length, char* output,
size_t maxOutputLength) override;
};
class Lz4DecompressionStream : public BlockDecompressionStream {
public:
Lz4DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
size_t blockSize,
dbcommon::MemoryPool& pool) // NOLINT
: BlockDecompressionStream(std::move(inStream), blockSize, pool) {
// PASS
}
std::string getName() const override {
std::ostringstream result;
result << "lz4(" << getStreamName() << ")";
return result.str();
}
protected:
uint64_t decompress(const char* input, uint64_t length, char* output,
size_t maxOutputLength) override;
};
// Create a decompressor for the given compression kind.
// @param kind the compression type to implement
// @param input the input stream that is the underlying source
// @param bufferSize the maximum size of the buffer
// @param pool the memory pool
std::unique_ptr<SeekableInputStream> createDecompressor(
CompressionKind kind, std::unique_ptr<SeekableInputStream> input,
uint64_t bufferSize, dbcommon::MemoryPool& pool); // NOLINT
} // namespace orc
#endif // STORAGE_SRC_STORAGE_FORMAT_ORC_SEEKABLE_INPUT_STREAM_H_