blob: bb74d20ec3343622f2466cf6d408d7f9d21020ec [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/spill/spill_reader.h"
#include <glog/logging.h>
#include <algorithm>
#include "common/cast_set.h"
#include "common/exception.h"
#include "io/file_factory.h"
#include "io/fs/file_reader.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "util/slice.h"
#include "vec/core/block.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris {
#include "common/compile_check_begin.h"
namespace io {
class FileSystem;
} // namespace io
namespace vectorized {
Status SpillReader::open() {
if (file_reader_) {
return Status::OK();
}
SCOPED_TIMER(_read_file_timer);
COUNTER_UPDATE(_read_file_count, 1);
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_, &file_reader_));
size_t file_size = file_reader_->size();
DCHECK(file_size >= 16); // max_sub_block_size, block count
Slice result((char*)&block_count_, sizeof(size_t));
size_t total_read_bytes = 0;
// read block count
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
total_read_bytes += bytes_read;
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read);
}
// read max sub block size
bytes_read = 0;
result.data = (char*)&max_sub_block_size_;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
total_read_bytes += bytes_read;
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read);
}
size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_);
read_buff_.reserve(buff_size);
// read block start offsets
size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t);
result.data = read_buff_.data();
result.size = block_count_ * sizeof(size_t);
RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
DCHECK(bytes_read == block_count_ * sizeof(size_t));
total_read_bytes += bytes_read;
COUNTER_UPDATE(_read_file_size, total_read_bytes);
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes);
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
}
block_start_offsets_.resize(block_count_ + 1);
for (size_t i = 0; i < block_count_; ++i) {
block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t));
}
block_start_offsets_[block_count_] = file_size - (block_count_ + 2) * sizeof(size_t);
return Status::OK();
}
void SpillReader::seek(size_t block_index) {
DCHECK_LT(block_index, block_count_);
read_block_index_ = block_index;
}
Status SpillReader::read(Block* block, bool* eos) {
DCHECK(file_reader_);
block->clear_column_data();
if (read_block_index_ >= block_count_) {
*eos = true;
return Status::OK();
}
size_t bytes_to_read =
block_start_offsets_[read_block_index_ + 1] - block_start_offsets_[read_block_index_];
if (bytes_to_read == 0) {
++read_block_index_;
return Status::OK();
}
Slice result(read_buff_.data(), bytes_to_read);
size_t bytes_read = 0;
{
SCOPED_TIMER(_read_file_timer);
RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], result,
&bytes_read));
}
DCHECK(bytes_read == bytes_to_read);
if (bytes_read > 0) {
COUNTER_UPDATE(_read_file_size, bytes_read);
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read);
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
}
COUNTER_UPDATE(_read_block_count, 1);
{
SCOPED_TIMER(_deserialize_timer);
if (!pb_block_.ParseFromArray(result.data, cast_set<int>(result.size))) {
return Status::InternalError("Failed to read spilled block");
}
size_t uncompressed_size = 0;
int64_t uncompressed_time = 0;
RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size, &uncompressed_time));
}
COUNTER_UPDATE(_read_block_data_size, block->bytes());
COUNTER_UPDATE(_read_rows_count, block->rows());
} else {
block->clear_column_data();
}
++read_block_index_;
return Status::OK();
}
Status SpillReader::close() {
if (!file_reader_) {
return Status::OK();
}
(void)file_reader_->close();
file_reader_.reset();
return Status::OK();
}
} // namespace vectorized
} // namespace doris