blob: 385d16fe695365b1913d170e89eaf1bb9d3e541c [file] [log] [blame]
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
#include "db/column_family.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace rocksdb {
class Env;
class Logger;
class Statistics;
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value, Slice* blob,
Slice* xid) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr || xid == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset == GetDataSize()) {
// reached end of batch.
return Status::NotFound();
}
if (data_offset > GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
blob, xid);
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
*type = kPutRecord;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
*type = kSingleDeleteRecord;
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
*type = kDeleteRangeRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;
break;
case kTypeLogData:
*type = kLogDataRecord;
break;
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
*type = kXIDRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {
if (entry1->column_family > entry2->column_family) {
return 1;
} else if (entry1->column_family < entry2->column_family) {
return -1;
}
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Slice key1, key2;
if (entry1->search_key == nullptr) {
key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
entry1->key_size);
} else {
key1 = *(entry1->search_key);
}
if (entry2->search_key == nullptr) {
key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
entry2->key_size);
} else {
key2 = *(entry2->search_key);
}
int cmp = CompareKey(entry1->column_family, key1, key2);
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {
return 1;
} else if (entry1->offset < entry2->offset) {
return -1;
}
return 0;
}
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
const Slice& key1,
const Slice& key2) const {
if (column_family < cf_comparators_.size() &&
cf_comparators_[column_family] != nullptr) {
return cf_comparators_[column_family]->Compare(key1, key2);
} else {
return default_comparator_->Compare(key1, key2);
}
}
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
std::string* value, bool overwrite_key, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family);
*s = Status::OK();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::Result::kNotFound;
std::unique_ptr<WBWIIterator> iter =
std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the end.
// TODO(agiardullo): consider adding support for reverse iteration
iter->Seek(key);
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
break;
}
iter->Next();
}
if (!(*s).ok()) {
return WriteBatchWithIndexInternal::Result::kError;
}
if (!iter->Valid()) {
// Read past end of results. Reposition on last result.
iter->SeekToLast();
} else {
iter->Prev();
}
Slice entry_value;
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
// Unexpected error or we've reached a different next key
break;
}
switch (entry.type) {
case kPutRecord: {
result = WriteBatchWithIndexInternal::Result::kFound;
entry_value = entry.value;
break;
}
case kMergeRecord: {
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
merge_context->PushOperand(entry.value);
break;
}
case kDeleteRecord:
case kSingleDeleteRecord: {
result = WriteBatchWithIndexInternal::Result::kDeleted;
break;
}
case kLogDataRecord:
case kXIDRecord: {
// ignore
break;
}
default: {
result = WriteBatchWithIndexInternal::Result::kError;
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
ToString(entry.type));
break;
}
}
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted ||
result == WriteBatchWithIndexInternal::Result::kError) {
// We can stop iterating once we find a PUT or DELETE
break;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
break;
}
iter->Prev();
}
if ((*s).ok()) {
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted) {
// Found a Put or Delete. Merge if necessary.
if (merge_context->GetNumOperands() > 0) {
const MergeOperator* merge_operator;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
merge_operator = cfh->cfd()->ioptions()->merge_operator;
} else {
*s = Status::InvalidArgument("Must provide a column_family");
result = WriteBatchWithIndexInternal::Result::kError;
return result;
}
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
if (merge_operator) {
*s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
merge_context->GetOperands(), value,
logger, statistics, env);
} else {
*s = Status::InvalidArgument("Options::merge_operator must be set");
}
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;
} else {
result = WriteBatchWithIndexInternal::Result::kError;
}
} else { // nothing to merge
if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
value->assign(entry_value.data(), entry_value.size());
}
}
}
}
return result;
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE