| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #include "table/get_context.h" |
| #include "db/merge_helper.h" |
| #include "db/pinned_iterators_manager.h" |
| #include "monitoring/file_read_sample.h" |
| #include "monitoring/perf_context_imp.h" |
| #include "monitoring/statistics.h" |
| #include "rocksdb/env.h" |
| #include "rocksdb/merge_operator.h" |
| #include "rocksdb/statistics.h" |
| |
| namespace rocksdb { |
| |
| namespace { |
| |
| void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { |
| #ifndef ROCKSDB_LITE |
| if (replay_log) { |
| if (replay_log->empty()) { |
| // Optimization: in the common case of only one operation in the |
| // log, we allocate the exact amount of space needed. |
| replay_log->reserve(1 + VarintLength(value.size()) + value.size()); |
| } |
| replay_log->push_back(type); |
| PutLengthPrefixedSlice(replay_log, value); |
| } |
| #endif // ROCKSDB_LITE |
| } |
| |
| } // namespace |
| |
| GetContext::GetContext( |
| const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, |
| Statistics* statistics, GetState init_state, const Slice& user_key, |
| PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, |
| RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, |
| PinnedIteratorsManager* _pinned_iters_mgr, bool* is_blob_index) |
| : ucmp_(ucmp), |
| merge_operator_(merge_operator), |
| logger_(logger), |
| statistics_(statistics), |
| state_(init_state), |
| user_key_(user_key), |
| pinnable_val_(pinnable_val), |
| value_found_(value_found), |
| merge_context_(merge_context), |
| range_del_agg_(_range_del_agg), |
| env_(env), |
| seq_(seq), |
| replay_log_(nullptr), |
| pinned_iters_mgr_(_pinned_iters_mgr), |
| is_blob_index_(is_blob_index) { |
| if (seq_) { |
| *seq_ = kMaxSequenceNumber; |
| } |
| sample_ = should_sample_file_read(); |
| } |
| |
| // Called from TableCache::Get and Table::Get when file/block in which |
| // key may exist are not there in TableCache/BlockCache respectively. In this |
| // case we can't guarantee that key does not exist and are not permitted to do |
| // IO to be certain.Set the status=kFound and value_found=false to let the |
| // caller know that key may exist but is not there in memory |
| void GetContext::MarkKeyMayExist() { |
| state_ = kFound; |
| if (value_found_ != nullptr) { |
| *value_found_ = false; |
| } |
| } |
| |
| void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { |
| assert(state_ == kNotFound); |
| appendToReplayLog(replay_log_, kTypeValue, value); |
| |
| state_ = kFound; |
| if (LIKELY(pinnable_val_ != nullptr)) { |
| pinnable_val_->PinSelf(value); |
| } |
| } |
| |
| bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, |
| const Slice& value, Cleanable* value_pinner) { |
| assert((state_ != kMerge && parsed_key.type != kTypeMerge) || |
| merge_context_ != nullptr); |
| if (ucmp_->Equal(parsed_key.user_key, user_key_)) { |
| appendToReplayLog(replay_log_, parsed_key.type, value); |
| |
| if (seq_ != nullptr) { |
| // Set the sequence number if it is uninitialized |
| if (*seq_ == kMaxSequenceNumber) { |
| *seq_ = parsed_key.sequence; |
| } |
| } |
| |
| auto type = parsed_key.type; |
| // Key matches. Process it |
| if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && |
| range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { |
| type = kTypeRangeDeletion; |
| } |
| switch (type) { |
| case kTypeValue: |
| case kTypeBlobIndex: |
| assert(state_ == kNotFound || state_ == kMerge); |
| if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { |
| // Blob value not supported. Stop. |
| state_ = kBlobIndex; |
| return false; |
| } |
| if (kNotFound == state_) { |
| state_ = kFound; |
| if (LIKELY(pinnable_val_ != nullptr)) { |
| if (LIKELY(value_pinner != nullptr)) { |
| // If the backing resources for the value are provided, pin them |
| pinnable_val_->PinSlice(value, value_pinner); |
| } else { |
| // Otherwise copy the value |
| pinnable_val_->PinSelf(value); |
| } |
| } |
| } else if (kMerge == state_) { |
| assert(merge_operator_ != nullptr); |
| state_ = kFound; |
| if (LIKELY(pinnable_val_ != nullptr)) { |
| Status merge_status = MergeHelper::TimedFullMerge( |
| merge_operator_, user_key_, &value, |
| merge_context_->GetOperands(), pinnable_val_->GetSelf(), |
| logger_, statistics_, env_); |
| pinnable_val_->PinSelf(); |
| if (!merge_status.ok()) { |
| state_ = kCorrupt; |
| } |
| } |
| } |
| if (is_blob_index_ != nullptr) { |
| *is_blob_index_ = (type == kTypeBlobIndex); |
| } |
| return false; |
| |
| case kTypeDeletion: |
| case kTypeSingleDeletion: |
| case kTypeRangeDeletion: |
| // TODO(noetzli): Verify correctness once merge of single-deletes |
| // is supported |
| assert(state_ == kNotFound || state_ == kMerge); |
| if (kNotFound == state_) { |
| state_ = kDeleted; |
| } else if (kMerge == state_) { |
| state_ = kFound; |
| if (LIKELY(pinnable_val_ != nullptr)) { |
| Status merge_status = MergeHelper::TimedFullMerge( |
| merge_operator_, user_key_, nullptr, |
| merge_context_->GetOperands(), pinnable_val_->GetSelf(), |
| logger_, statistics_, env_); |
| pinnable_val_->PinSelf(); |
| if (!merge_status.ok()) { |
| state_ = kCorrupt; |
| } |
| } |
| } |
| return false; |
| |
| case kTypeMerge: |
| assert(state_ == kNotFound || state_ == kMerge); |
| state_ = kMerge; |
| // value_pinner is not set from plain_table_reader.cc for example. |
| if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && |
| value_pinner != nullptr) { |
| value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); |
| merge_context_->PushOperand(value, true /*value_pinned*/); |
| } else { |
| merge_context_->PushOperand(value, false); |
| } |
| return true; |
| |
| default: |
| assert(false); |
| break; |
| } |
| } |
| |
| // state_ could be Corrupt, merge or notfound |
| return false; |
| } |
| |
| void replayGetContextLog(const Slice& replay_log, const Slice& user_key, |
| GetContext* get_context, Cleanable* value_pinner) { |
| #ifndef ROCKSDB_LITE |
| Slice s = replay_log; |
| while (s.size()) { |
| auto type = static_cast<ValueType>(*s.data()); |
| s.remove_prefix(1); |
| Slice value; |
| bool ret = GetLengthPrefixedSlice(&s, &value); |
| assert(ret); |
| (void)ret; |
| |
| // Since SequenceNumber is not stored and unknown, we will use |
| // kMaxSequenceNumber. |
| get_context->SaveValue( |
| ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, |
| value_pinner); |
| } |
| #else // ROCKSDB_LITE |
| assert(false); |
| #endif // ROCKSDB_LITE |
| } |
| |
| } // namespace rocksdb |