blob: 1f7843fad7d6aaedc7148581b36458d40d22ebbd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "BlockBuffer.hh"
#include "orc/OrcFile.hh"
#include "orc/Writer.hh"
#include <algorithm>
namespace orc {
BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
: memoryPool(pool), currentSize(0), currentCapacity(0), blockSize(_blockSize) {
if (blockSize == 0) {
throw std::logic_error("Block size cannot be zero");
}
reserve(blockSize);
}
BlockBuffer::~BlockBuffer() {
for (size_t i = 0; i < blocks.size(); ++i) {
memoryPool.free(blocks[i]);
}
blocks.clear();
currentSize = currentCapacity = 0;
}
BlockBuffer::Block BlockBuffer::getBlock(uint64_t blockIndex) const {
if (blockIndex >= getBlockNumber()) {
throw std::out_of_range("Block index out of range");
}
return Block(blocks[blockIndex], std::min(currentSize - blockIndex * blockSize, blockSize));
}
BlockBuffer::Block BlockBuffer::getNextBlock() {
if (currentSize < currentCapacity) {
Block emptyBlock(blocks[currentSize / blockSize] + currentSize % blockSize,
blockSize - currentSize % blockSize);
currentSize = (currentSize / blockSize + 1) * blockSize;
return emptyBlock;
} else {
resize(currentSize + blockSize);
return Block(blocks.back(), blockSize);
}
}
void BlockBuffer::resize(uint64_t size) {
reserve(size);
if (currentCapacity >= size) {
currentSize = size;
} else {
throw std::logic_error("Block buffer resize error");
}
}
void BlockBuffer::reserve(uint64_t newCapacity) {
while (currentCapacity < newCapacity) {
char* newBlockPtr = memoryPool.malloc(blockSize);
if (newBlockPtr != nullptr) {
blocks.push_back(newBlockPtr);
currentCapacity += blockSize;
} else {
break;
}
}
}
void BlockBuffer::writeTo(OutputStream* output, WriterMetrics* metrics) {
if (currentSize == 0) {
return;
}
static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
if (chunkSize == 0) {
throw std::logic_error("Natural write size cannot be zero");
}
uint64_t ioCount = 0;
uint64_t blockNumber = getBlockNumber();
// if only exists one block, currentSize is equal to first block size
if (blockNumber == 1 && currentSize <= chunkSize) {
Block block = getBlock(0);
output->write(block.data, block.size);
++ioCount;
} else {
char* chunk = memoryPool.malloc(chunkSize);
uint64_t chunkOffset = 0;
for (uint64_t i = 0; i < blockNumber; ++i) {
Block block = getBlock(i);
uint64_t blockOffset = 0;
while (blockOffset < block.size) {
// copy current block into chunk
uint64_t copySize = std::min(chunkSize - chunkOffset, block.size - blockOffset);
memcpy(chunk + chunkOffset, block.data + blockOffset, copySize);
chunkOffset += copySize;
blockOffset += copySize;
// chunk is full
if (chunkOffset >= chunkSize) {
output->write(chunk, chunkSize);
chunkOffset = 0;
++ioCount;
}
}
}
if (chunkOffset != 0) {
output->write(chunk, chunkOffset);
++ioCount;
}
memoryPool.free(chunk);
}
if (metrics != nullptr) {
metrics->IOCount.fetch_add(ioCount);
}
}
} // namespace orc