blob: 04efecb9f0ae41c0f5f1c63e0e398d00abffdab2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/multi_column_writer.h"
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
namespace kudu {
namespace tablet {
using cfile::CFileWriter;
using fs::BlockCreationTransaction;
using fs::CreateBlockOptions;
using fs::WritableBlock;
using std::unique_ptr;
MultiColumnWriter::MultiColumnWriter(FsManager* fs,
const Schema* schema,
std::string tablet_id)
: fs_(fs),
schema_(schema),
finished_(false),
tablet_id_(std::move(tablet_id)) {
}
MultiColumnWriter::~MultiColumnWriter() {
STLDeleteElements(&cfile_writers_);
}
Status MultiColumnWriter::Open() {
CHECK(cfile_writers_.empty());
// Open columns.
const CreateBlockOptions block_opts({ tablet_id_ });
for (int i = 0; i < schema_->num_columns(); i++) {
const ColumnSchema &col = schema_->column(i);
cfile::WriterOptions opts;
// Index all columns by ordinal position, so we can match up
// the corresponding rows.
opts.write_posidx = true;
/// Set the column storage attributes.
opts.storage_attributes = col.attributes();
// If the schema has a single PK and this is the PK col
if (i == 0 && schema_->num_key_columns() == 1) {
opts.write_validx = true;
}
// Open file for write.
unique_ptr<WritableBlock> block;
RETURN_NOT_OK_PREPEND(fs_->CreateNewBlock(block_opts, &block),
"Unable to open output file for column " + col.ToString());
BlockId block_id(block->id());
// Create the CFile writer itself.
unique_ptr<CFileWriter> writer(new CFileWriter(
std::move(opts),
col.type_info(),
col.is_nullable(),
std::move(block)));
RETURN_NOT_OK_PREPEND(writer->Start(),
"Unable to Start() writer for column " + col.ToString());
cfile_writers_.push_back(writer.release());
block_ids_.push_back(block_id);
}
VLOG(1) << strings::Substitute("Opened CFile writers for $0 column(s)",
cfile_writers_.size());
return Status::OK();
}
Status MultiColumnWriter::AppendBlock(const RowBlock& block) {
for (int i = 0; i < schema_->num_columns(); i++) {
ColumnBlock column = block.column_block(i);
if (column.is_nullable()) {
RETURN_NOT_OK(cfile_writers_[i]->AppendNullableEntries(column.non_null_bitmap(),
column.data(), column.nrows()));
} else {
RETURN_NOT_OK(cfile_writers_[i]->AppendEntries(column.data(), column.nrows()));
}
}
return Status::OK();
}
Status MultiColumnWriter::FinishAndReleaseBlocks(
BlockCreationTransaction* transaction) {
CHECK(!finished_);
for (int i = 0; i < schema_->num_columns(); i++) {
CFileWriter *writer = cfile_writers_[i];
Status s = writer->FinishAndReleaseBlock(transaction);
if (!s.ok()) {
LOG(WARNING) << "Unable to Finish writer for column " <<
schema_->column(i).ToString() << ": " << s.ToString();
return s;
}
}
finished_ = true;
return Status::OK();
}
void MultiColumnWriter::GetFlushedBlocksByColumnId(std::map<ColumnId, BlockId>* ret) const {
CHECK(finished_);
ret->clear();
for (int i = 0; i < schema_->num_columns(); i++) {
(*ret)[schema_->column_id(i)] = block_ids_[i];
}
}
size_t MultiColumnWriter::written_size() const {
size_t size = 0;
for (const CFileWriter *writer : cfile_writers_) {
size += writer->written_size();
}
return size;
}
} // namespace tablet
} // namespace kudu