blob: ce36239ff7b265ad64923b8e56e4a6406c494462 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "lib/commons.h"
#include "NativeTask.h"
#include "BlockCodec.h"
namespace NativeTask {
BlockCompressStream::BlockCompressStream(OutputStream * stream, uint32_t bufferSizeHint)
: CompressStream(stream), _tempBuffer(NULL), _tempBufferSize(0), _compressedBytesWritten(0) {
_hint = bufferSizeHint;
_blockMax = bufferSizeHint / 2 * 3;
}
void BlockCompressStream::init() {
_tempBufferSize = maxCompressedLength(_blockMax) + 8;
_tempBuffer = new char[_tempBufferSize];
}
BlockCompressStream::~BlockCompressStream() {
delete[] _tempBuffer;
_tempBuffer = NULL;
_tempBufferSize = 0;
}
void BlockCompressStream::write(const void * buff, uint32_t length) {
while (length > 0) {
uint32_t take = length < _blockMax ? length : _hint;
compressOneBlock(buff, take);
buff = ((const char *)buff) + take;
length -= take;
}
}
void BlockCompressStream::flush() {
_stream->flush();
}
void BlockCompressStream::close() {
flush();
}
void BlockCompressStream::writeDirect(const void * buff, uint32_t length) {
_stream->write(buff, length);
_compressedBytesWritten += length;
}
uint64_t BlockCompressStream::compressedBytesWritten() {
return _compressedBytesWritten;
}
//////////////////////////////////////////////////////////////
BlockDecompressStream::BlockDecompressStream(InputStream * stream, uint32_t bufferSizeHint)
: DecompressStream(stream), _tempBuffer(NULL), _tempBufferSize(0) {
_hint = bufferSizeHint;
_blockMax = bufferSizeHint / 2 * 3;
_tempDecompressBuffer = NULL;
_tempDecompressBufferSize = 0;
_tempDecompressBufferUsed = 0;
_tempDecompressBufferCapacity = 0;
_compressedBytesRead = 0;
}
void BlockDecompressStream::init() {
_tempBufferSize = maxCompressedLength(_blockMax) + 8;
_tempBuffer = (char*)malloc(_tempBufferSize);
}
BlockDecompressStream::~BlockDecompressStream() {
close();
if (NULL != _tempBuffer) {
free(_tempBuffer);
_tempBuffer = NULL;
}
_tempBufferSize = 0;
}
int32_t BlockDecompressStream::read(void * buff, uint32_t length) {
if (_tempDecompressBufferSize == 0) {
uint32_t sizes[2];
int32_t rd = _stream->readFully(&sizes, sizeof(uint32_t) * 2);
if (rd <= 0) {
// EOF
return -1;
}
if (rd != sizeof(uint32_t) * 2) {
THROW_EXCEPTION(IOException, "readFully get incomplete data");
}
_compressedBytesRead += rd;
sizes[0] = bswap(sizes[0]);
sizes[1] = bswap(sizes[1]);
if (sizes[0] <= length) {
uint32_t len = decompressOneBlock(sizes[1], buff, sizes[0]);
if (len != sizes[0]) {
THROW_EXCEPTION(IOException, "Block decompress data error, length not match");
}
return len;
} else {
if (sizes[0] > _tempDecompressBufferCapacity) {
char * newBuffer = (char *)realloc(_tempDecompressBuffer, sizes[0]);
if (newBuffer == NULL) {
THROW_EXCEPTION(OutOfMemoryException, "realloc failed");
}
_tempDecompressBuffer = newBuffer;
_tempDecompressBufferCapacity = sizes[0];
}
uint32_t len = decompressOneBlock(sizes[1], _tempDecompressBuffer, sizes[0]);
if (len != sizes[0]) {
THROW_EXCEPTION(IOException, "Block decompress data error, length not match");
}
_tempDecompressBufferSize = sizes[0];
_tempDecompressBufferUsed = 0;
}
}
if (_tempDecompressBufferSize > 0) {
uint32_t left = _tempDecompressBufferSize - _tempDecompressBufferUsed;
if (length < left) {
memcpy(buff, _tempDecompressBuffer + _tempDecompressBufferUsed, length);
_tempDecompressBufferUsed += length;
return length;
} else {
memcpy(buff, _tempDecompressBuffer + _tempDecompressBufferUsed, left);
_tempDecompressBufferSize = 0;
_tempDecompressBufferUsed = 0;
return left;
}
}
// should not get here
THROW_EXCEPTION(IOException, "Decompress logic error");
return -1;
}
void BlockDecompressStream::close() {
if (_tempDecompressBufferSize > 0) {
LOG("[BlockDecompressStream] Some data left in the _tempDecompressBuffer when close()");
}
if (NULL != _tempDecompressBuffer) {
free(_tempDecompressBuffer);
_tempDecompressBuffer = NULL;
_tempDecompressBufferCapacity = 0;
}
_tempDecompressBufferSize = 0;
_tempDecompressBufferUsed = 0;
}
int32_t BlockDecompressStream::readDirect(void * buff, uint32_t length) {
if (_tempDecompressBufferSize > 0) {
THROW_EXCEPTION(IOException, "temp decompress data exists when call readDirect()");
}
int32_t ret = _stream->readFully(buff, length);
if (ret > 0) {
_compressedBytesRead += ret;
}
return ret;
}
uint64_t BlockDecompressStream::compressedBytesRead() {
return _compressedBytesRead;
}
} // namespace NativeTask