blob: 811db5bdbd7e3fbee93555f23ddb7a856bcfa9a9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_TABLET_DELTAFILE_H
#define KUDU_TABLET_DELTAFILE_H
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <string>
#include <vector>
#include <glog/logging.h>
#include "kudu/cfile/binary_plain_block.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/index_btree.h"
#include "kudu/common/rowid.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/rowset.h"
#include "kudu/util/faststring.h"
#include "kudu/util/once.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
namespace kudu {
class Arena;
class BlockId;
class ColumnBlock;
class FsManager;
class MemTracker;
class RowChangeList;
class ScanSpec;
class SelectionVector;
struct ColumnId;
namespace tablet {
class MvccSnapshot;
} // namespace tablet
namespace cfile {
struct ReaderOptions;
} // namespace cfile
namespace fs {
class BlockCreationTransaction;
class ReadableBlock;
class WritableBlock;
struct IOContext;
} // namespace fs
namespace tablet {
class Mutation;
template<DeltaType Type>
struct ApplyingVisitor;
template<DeltaType Type>
struct CollectingVisitor;
template<DeltaType Type>
struct LivenessVisitor;
class DeltaFileWriter {
public:
// Construct a new delta file writer.
//
// The writer takes ownership of the block and will Close it in Finish().
explicit DeltaFileWriter(std::unique_ptr<fs::WritableBlock> block);
Status Start();
// Closes the delta file, including the underlying writable block.
// Returns Status::Aborted() if no deltas were ever appended to this
// writer.
Status Finish();
// Closes the delta file, finalizing the underlying block and releasing
// it to 'transaction'.
//
// Returns Status::Aborted() if no deltas were ever appended to this
// writer.
Status FinishAndReleaseBlock(fs::BlockCreationTransaction* transaction);
// Append a given delta to the file. This must be called in ascending order
// of (key, timestamp) for REDOS and ascending order of key, descending order
// of timestamp for UNDOS.
template<DeltaType Type>
Status AppendDelta(const DeltaKey &key, const RowChangeList &delta);
void WriteDeltaStats(const DeltaStats& stats);
size_t written_size() const {
return writer_->written_size();
}
private:
Status DoAppendDelta(const DeltaKey &key, const RowChangeList &delta);
std::unique_ptr<cfile::CFileWriter> writer_;
// Buffer used as a temporary for storing the serialized form
// of the deltas
faststring tmp_buf_;
#ifndef NDEBUG
// The index of the previously written row.
// This is used in debug mode to make sure that rows are appended
// in order.
DeltaKey last_key_;
bool has_appended_;
#endif
DISALLOW_COPY_AND_ASSIGN(DeltaFileWriter);
};
class DeltaFileReader : public DeltaStore,
public std::enable_shared_from_this<DeltaFileReader> {
public:
static const char * const kDeltaStatsEntryName;
// Fully open a delta file using a previously opened block.
//
// After this call, the delta reader is safe for use.
static Status Open(std::unique_ptr<fs::ReadableBlock> block,
DeltaType delta_type,
cfile::ReaderOptions options,
std::shared_ptr<DeltaFileReader>* reader_out);
// Lazily opens a delta file using a previously opened block. A lazy open
// does not incur additional I/O, nor does it validate the contents of
// the delta file.
//
// Init() must be called before using the file's stats.
static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block,
DeltaType delta_type,
cfile::ReaderOptions options,
std::shared_ptr<DeltaFileReader>* reader_out);
virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
virtual bool Initted() OVERRIDE {
return init_once_.init_succeeded();
}
// See DeltaStore::NewDeltaIterator(...)
Status NewDeltaIterator(const RowIteratorOptions& opts,
DeltaIterator** iterator) const OVERRIDE;
// See DeltaStore::CheckRowDeleted
virtual Status CheckRowDeleted(rowid_t row_idx,
const fs::IOContext* io_context,
bool *deleted) const OVERRIDE;
virtual uint64_t EstimateSize() const OVERRIDE;
const BlockId& block_id() const { return reader_->block_id(); }
virtual const DeltaStats& delta_stats() const OVERRIDE {
DCHECK(init_once_.init_succeeded());
return *delta_stats_;
}
virtual std::string ToString() const OVERRIDE {
if (!init_once_.init_succeeded()) return reader_->ToString();
return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
}
// Returns true if this delta file may include any deltas which need to be
// applied when scanning the given snapshot, or if the file has not yet
// been fully initialized.
bool IsRelevantForSnapshot(const MvccSnapshot& snap) const;
// Clone this DeltaFileReader for testing and validation purposes (such as
// while in DEBUG mode). The resulting object will not be Initted().
Status CloneForDebugging(FsManager* fs_manager,
const std::shared_ptr<MemTracker>& parent_mem_tracker,
std::shared_ptr<DeltaFileReader>* out) const;
private:
friend class DeltaFileIterator;
DISALLOW_COPY_AND_ASSIGN(DeltaFileReader);
const std::shared_ptr<cfile::CFileReader> &cfile_reader() const {
return reader_;
}
DeltaFileReader(std::unique_ptr<cfile::CFileReader> cf_reader,
DeltaType delta_type);
// Callback used in 'init_once_' to initialize this delta file.
Status InitOnce(const fs::IOContext* io_context);
Status ReadDeltaStats();
std::shared_ptr<cfile::CFileReader> reader_;
gscoped_ptr<DeltaStats> delta_stats_;
// The type of this delta, i.e. UNDO or REDO.
const DeltaType delta_type_;
KuduOnceLambda init_once_;
};
// Iterator over the deltas contained in a delta file.
//
// See DeltaIterator for details.
class DeltaFileIterator : public DeltaIterator {
public:
Status Init(ScanSpec *spec) OVERRIDE;
Status SeekToOrdinal(rowid_t idx) OVERRIDE;
Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
std::vector<DeltaKeyAndUpdate>* out,
Arena* arena) OVERRIDE;
std::string ToString() const OVERRIDE;
virtual bool HasNext() OVERRIDE;
bool MayHaveDeltas() override;
private:
friend class DeltaFileReader;
friend struct ApplyingVisitor<REDO>;
friend struct ApplyingVisitor<UNDO>;
friend struct CollectingVisitor<REDO>;
friend struct CollectingVisitor<UNDO>;
friend struct LivenessVisitor<REDO>;
friend struct LivenessVisitor<UNDO>;
friend struct FilterAndAppendVisitor;
DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
// PrepareBatch() will read forward all blocks from the deltafile
// which overlap with the block being prepared, enqueueing them onto
// the 'delta_blocks_' deque. The prepared blocks are then used to
// actually apply deltas in ApplyUpdates().
struct PreparedDeltaBlock {
// The pointer from which this block was read. This is only used for
// logging, etc.
cfile::BlockPointer block_ptr_;
// Handle to the block, so it doesn't get freed from underneath us.
cfile::BlockHandle block_;
// The block decoder, to avoid having to re-parse the block header
// on every ApplyUpdates() call
gscoped_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
// The first row index for which there is an update in this delta block.
rowid_t first_updated_idx_;
// The last row index for which there is an update in this delta block.
rowid_t last_updated_idx_;
// Within this block, the index of the update which is the first one that
// needs to be consulted. This allows deltas to be skipped at the beginning
// of the block when the row block starts towards the end of the delta block.
// For example:
// <-- delta block ---->
// <--- prepared row block --->
// Here, we can skip a bunch of deltas at the beginning of the delta block
// which we know don't apply to the prepared row block.
rowid_t prepared_block_start_idx_;
// Return a string description of this prepared block, for logging.
std::string ToString() const;
};
// The pointers in 'opts' and 'dfr' must remain valid for the lifetime of the iterator.
DeltaFileIterator(std::shared_ptr<DeltaFileReader> dfr,
RowIteratorOptions opts,
DeltaType delta_type);
// Determine the row index of the first update in the block currently
// pointed to by index_iter_.
Status GetFirstRowIndexInCurrentBlock(rowid_t *idx);
// Determine the last updated row index contained in the given decoded block.
static Status GetLastRowIndexInDecodedBlock(
const cfile::BinaryPlainBlockDecoder &dec, rowid_t *idx);
// Read the current block of data from the current position in the file
// onto the end of the delta_blocks_ queue.
Status ReadCurrentBlockOntoQueue();
// Visit all mutations in the currently prepared row range with the specified
// visitor class.
template<class Visitor>
Status VisitMutations(Visitor *visitor);
// Log a FATAL error message about a bad delta.
void FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
const std::string &msg);
std::shared_ptr<DeltaFileReader> dfr_;
const RowIteratorOptions opts_;
gscoped_ptr<cfile::IndexTreeIterator> index_iter_;
// TODO: add better comments here.
rowid_t prepared_idx_;
uint32_t prepared_count_;
bool prepared_;
bool exhausted_;
bool initted_;
// After PrepareBatch(), the set of delta blocks in the delta file
// which correspond to prepared_block_.
std::deque<std::unique_ptr<PreparedDeltaBlock>> delta_blocks_;
// Temporary buffer used in seeking.
faststring tmp_buf_;
// Temporary buffer used for RowChangeList projection.
faststring delta_buf_;
// The type of this delta iterator, i.e. UNDO or REDO.
const DeltaType delta_type_;
cfile::CFileReader::CacheControl cache_blocks_;
};
} // namespace tablet
} // namespace kudu
#endif