blob: 07d5c78294383acdf4906da8459ce0e41f488f10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "redis_set.h"
#include <map>
#include <memory>
#include <optional>
#include "db_util.h"
#include "sample_helper.h"
namespace redis {
rocksdb::Status Set::GetMetadata(engine::Context &ctx, const Slice &ns_key, SetMetadata *metadata) {
return Database::GetMetadata(ctx, {kRedisSet}, ns_key, metadata);
}
// Make sure members are uniq before use Overwrite
rocksdb::Status Set::Overwrite(engine::Context &ctx, Slice user_key, const std::vector<std::string> &members) {
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisSet);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
for (const auto &member : members) {
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Put(sub_key, Slice());
if (!s.ok()) return s;
}
metadata.size = static_cast<uint32_t>(members.size());
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status Set::Add(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &members,
uint64_t *added_cnt) {
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::string value;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
if (!mset.insert(member.ToStringView()).second) {
continue;
}
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (s.ok()) continue;
s = batch->Put(sub_key, Slice());
if (!s.ok()) return s;
*added_cnt += 1;
}
if (*added_cnt > 0) {
metadata.size += *added_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status Set::Remove(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &members,
uint64_t *removed_cnt) {
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string value;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
if (!mset.insert(member.ToStringView()).second) {
continue;
}
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (!s.ok()) continue;
s = batch->Delete(sub_key);
if (!s.ok()) return s;
*removed_cnt += 1;
}
if (*removed_cnt > 0) {
if (metadata.size != *removed_cnt) {
metadata.size -= *removed_cnt;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
} else {
s = batch->Delete(metadata_cf_handle_, ns_key);
if (!s.ok()) return s;
}
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status Set::Card(engine::Context &ctx, const Slice &user_key, uint64_t *size) {
*size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
*size = metadata.size;
return rocksdb::Status::OK();
}
rocksdb::Status Set::Members(engine::Context &ctx, const Slice &user_key, std::vector<std::string> *members) {
members->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string prefix = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(next_version_prefix);
read_options.iterate_upper_bound = &upper_bound;
auto iter = util::UniqueIterator(ctx, read_options);
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
members->emplace_back(ikey.GetSubKey().ToString());
}
return rocksdb::Status::OK();
}
rocksdb::Status Set::IsMember(engine::Context &ctx, const Slice &user_key, const Slice &member, bool *flag) {
std::vector<int> exists;
rocksdb::Status s = MIsMember(ctx, user_key, {member}, &exists);
if (!s.ok()) return s;
*flag = exists[0];
return s;
}
rocksdb::Status Set::MIsMember(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &members,
std::vector<int> *exists) {
exists->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
std::string value;
for (const auto &member : members) {
std::string sub_key = InternalKey(ns_key, member, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
exists->emplace_back(0);
} else {
exists->emplace_back(1);
}
}
return rocksdb::Status::OK();
}
rocksdb::Status Set::Take(engine::Context &ctx, const Slice &user_key, std::vector<std::string> *members, int count,
bool pop) {
members->clear();
bool unique = true;
if (count == 0) return rocksdb::Status::OK();
if (count < 0) {
CHECK(!pop);
count = -count;
unique = false;
}
std::string ns_key = AppendNamespacePrefix(user_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
ObserverOrUniquePtr<rocksdb::WriteBatchBase> batch = storage_->GetWriteBatchBase();
if (pop) {
WriteBatchLogData log_data(kRedisSet);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
}
members->clear();
s = ExtractRandMemberFromSet<std::string>(
unique, count,
[this, user_key, &ctx](std::vector<std::string> *samples) { return this->Members(ctx, user_key, samples); },
members);
if (!s.ok()) {
return s;
}
// Avoid to write an empty op-log if just random select some members.
if (!pop) return rocksdb::Status::OK();
// Avoid to write an empty op-log if the set is empty.
if (members->empty()) return rocksdb::Status::OK();
for (std::string &user_sub_key : *members) {
std::string sub_key = InternalKey(ns_key, user_sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
s = batch->Delete(sub_key);
if (!s.ok()) return s;
}
metadata.size -= members->size();
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status Set::Move(engine::Context &ctx, const Slice &src, const Slice &dst, const Slice &member, bool *flag) {
RedisType type = kRedisNone;
rocksdb::Status s = Type(ctx, dst, &type);
if (!s.ok()) return s;
if (type != kRedisNone && type != kRedisSet) {
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
uint64_t ret = 0;
std::vector<Slice> members{member};
s = Remove(ctx, src, members, &ret);
*flag = (ret != 0);
if (!s.ok() || !*flag) {
return s;
}
s = Add(ctx, dst, members, &ret);
*flag = (ret != 0);
return s;
}
rocksdb::Status Set::Scan(engine::Context &ctx, const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members) {
return SubKeyScanner::Scan(ctx, kRedisSet, user_key, cursor, limit, member_prefix, members);
}
/*
* Returns the members of the set resulting from the difference between
* the first set and all the successive sets. For example:
* key1 = {a,b,c,d}
* key2 = {c}
* key3 = {a,c,e}
* DIFF key1 key2 key3 = {b,d}
*/
rocksdb::Status Set::Diff(engine::Context &ctx, const std::vector<Slice> &keys, std::vector<std::string> *members) {
members->clear();
std::vector<std::string> source_members;
auto s = Members(ctx, keys[0], &source_members);
if (!s.ok()) return s;
std::map<std::string, bool> exclude_members;
std::vector<std::string> target_members;
for (size_t i = 1; i < keys.size(); i++) {
s = Members(ctx, keys[i], &target_members);
if (!s.ok()) return s;
for (const auto &member : target_members) {
exclude_members[member] = true;
}
}
for (const auto &member : source_members) {
if (exclude_members.find(member) == exclude_members.end()) {
members->push_back(member);
}
}
return rocksdb::Status::OK();
}
/*
* Returns the members of the set resulting from the union of all the given sets.
* For example:
* key1 = {a,b,c,d}
* key2 = {c}
* key3 = {a,c,e}
* UNION key1 key2 key3 = {a,b,c,d,e}
*/
rocksdb::Status Set::Union(engine::Context &ctx, const std::vector<Slice> &keys, std::vector<std::string> *members) {
members->clear();
std::map<std::string, bool> union_members;
std::vector<std::string> target_members;
for (const auto &key : keys) {
auto s = Members(ctx, key, &target_members);
if (!s.ok()) return s;
for (const auto &member : target_members) {
union_members[member] = true;
}
}
for (const auto &iter : union_members) {
members->emplace_back(iter.first);
}
return rocksdb::Status::OK();
}
/*
* Returns the members of the set resulting from the intersection of all the given sets.
* For example:
* key1 = {a,b,c,d}
* key2 = {c}
* key3 = {a,c,e}
* INTER key1 key2 key3 = {c}
*/
rocksdb::Status Set::Inter(engine::Context &ctx, const std::vector<Slice> &keys, std::vector<std::string> *members) {
members->clear();
std::map<std::string, size_t> member_counters;
std::vector<std::string> target_members;
auto s = Members(ctx, keys[0], &target_members);
if (!s.ok() || target_members.empty()) return s;
for (const auto &member : target_members) {
member_counters[member] = 1;
}
for (size_t i = 1; i < keys.size(); i++) {
s = Members(ctx, keys[i], &target_members);
if (!s.ok() || target_members.empty()) return s;
for (const auto &member : target_members) {
if (member_counters.find(member) == member_counters.end()) continue;
member_counters[member]++;
}
}
for (const auto &iter : member_counters) {
if (iter.second == keys.size()) { // all the sets contain this member
members->emplace_back(iter.first);
}
}
return rocksdb::Status::OK();
}
rocksdb::Status Set::InterCard(engine::Context &ctx, const std::vector<Slice> &keys, uint64_t limit,
uint64_t *cardinality) {
*cardinality = 0;
std::map<std::string, size_t> member_counters;
std::vector<std::string> target_members;
auto s = Members(ctx, keys[0], &target_members);
if (!s.ok() || target_members.empty()) return s;
for (const auto &member : target_members) {
member_counters[member] = 1;
}
if (limit == 0) {
limit = target_members.size();
}
size_t keys_size = keys.size();
if (keys_size == 1) {
*cardinality = std::min(static_cast<uint64_t>(target_members.size()), limit);
return rocksdb::Status::OK();
}
bool limit_reached = false;
for (size_t i = 1; i < keys_size; i++) {
s = Members(ctx, keys[i], &target_members);
if (!s.ok() || target_members.empty()) {
return s;
}
for (const auto &member : target_members) {
auto iter = member_counters.find(member);
if (iter == member_counters.end()) continue;
if (++iter->second == keys_size) {
*cardinality += 1;
if (--limit == 0) {
limit_reached = true;
break;
}
}
}
if (limit_reached) break;
}
return rocksdb::Status::OK();
}
rocksdb::Status Set::DiffStore(engine::Context &ctx, const Slice &dst, const std::vector<Slice> &keys,
uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<std::string> members;
auto s = Diff(ctx, keys, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(ctx, dst, members);
}
rocksdb::Status Set::UnionStore(engine::Context &ctx, const Slice &dst, const std::vector<Slice> &keys,
uint64_t *save_cnt) {
*save_cnt = 0;
std::vector<std::string> members;
auto s = Union(ctx, keys, &members);
if (!s.ok()) return s;
*save_cnt = members.size();
return Overwrite(ctx, dst, members);
}
rocksdb::Status Set::InterStore(engine::Context &ctx, const Slice &dst, const std::vector<Slice> &keys,
uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<std::string> members;
auto s = Inter(ctx, keys, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(ctx, dst, members);
}
} // namespace redis