blob: 5496de59a882f0bf8d3899ab1d4a1d41497e3286 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#pragma once
#include <rocksdb/status.h>
#include <optional>
#include <string>
#include <vector>
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
using rocksdb::Slice;
namespace redis {
class Stream : public SubKeyScanner {
public:
explicit Stream(engine::Storage *storage, const std::string &ns)
: SubKeyScanner(storage, ns), stream_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Stream)) {}
rocksdb::Status Add(engine::Context &ctx, const Slice &stream_name, const StreamAddOptions &options,
const std::vector<std::string> &values, StreamEntryID *id);
rocksdb::Status CreateGroup(engine::Context &ctx, const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name);
rocksdb::Status DestroyGroup(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
uint64_t *delete_cnt);
rocksdb::Status CreateConsumer(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, int *created_number);
rocksdb::Status DestroyConsumer(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, uint64_t &deleted_pel);
rocksdb::Status GroupSetId(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const StreamXGroupCreateOptions &options);
rocksdb::Status DeleteEntries(engine::Context &ctx, const Slice &stream_name, const std::vector<StreamEntryID> &ids,
uint64_t *deleted_cnt);
rocksdb::Status DeletePelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::vector<StreamEntryID> &entry_ids, uint64_t *acknowledged);
rocksdb::Status ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, uint64_t min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result);
rocksdb::Status AutoClaim(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, const StreamAutoClaimOptions &options,
StreamAutoClaimResult *result);
rocksdb::Status Len(engine::Context &ctx, const Slice &stream_name, const StreamLenOptions &options, uint64_t *size);
rocksdb::Status GetStreamInfo(engine::Context &ctx, const Slice &stream_name, bool full, uint64_t count,
StreamInfo *info);
rocksdb::Status GetGroupInfo(engine::Context &ctx, const Slice &stream_name,
std::vector<std::pair<std::string, StreamConsumerGroupMetadata>> &group_metadata);
rocksdb::Status GetConsumerInfo(engine::Context &ctx, const Slice &stream_name, const std::string &group_name,
std::vector<std::pair<std::string, StreamConsumerMetadata>> &consumer_metadata);
rocksdb::Status GetPendingEntries(engine::Context &ctx, StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos, std::vector<StreamNACK> &ext_results);
rocksdb::Status Range(engine::Context &ctx, const Slice &stream_name, const StreamRangeOptions &options,
std::vector<StreamEntry> *entries);
rocksdb::Status RangeWithPending(engine::Context &ctx, const Slice &stream_name, StreamRangeOptions &options,
std::vector<StreamEntry> *entries, std::string &group_name,
std::string &consumer_name, bool noack, bool latest);
rocksdb::Status Trim(engine::Context &ctx, const Slice &stream_name, const StreamTrimOptions &options,
uint64_t *delete_cnt);
rocksdb::Status GetMetadata(engine::Context &ctx, const Slice &stream_name, StreamMetadata *metadata);
rocksdb::Status GetLastGeneratedID(engine::Context &ctx, const Slice &stream_name, StreamEntryID *id);
rocksdb::Status SetId(engine::Context &ctx, const Slice &stream_name, const StreamEntryID &last_generated_id,
std::optional<uint64_t> entries_added, std::optional<StreamEntryID> max_deleted_id);
private:
rocksdb::ColumnFamilyHandle *stream_cf_handle_;
rocksdb::Status range(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata,
const StreamRangeOptions &options, std::vector<StreamEntry> *entries) const;
rocksdb::Status getEntryRawValue(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata,
const StreamEntryID &id, std::string *value) const;
StreamEntryID entryIDFromInternalKey(const rocksdb::Slice &key) const;
std::string internalKeyFromEntryID(const std::string &ns_key, const StreamMetadata &metadata,
const StreamEntryID &id) const;
rocksdb::Status trim(engine::Context &ctx, const std::string &ns_key, const StreamTrimOptions &options,
StreamMetadata *metadata, rocksdb::WriteBatch *batch, uint64_t &delete_cnt);
std::string internalKeyFromGroupName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name) const;
std::string groupNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerGroupMetadataValue(const StreamConsumerGroupMetadata &consumer_group_metadata);
static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value);
std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const std::string &consumer_name) const;
std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata);
static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value);
rocksdb::Status createConsumerWithoutLock(engine::Context &ctx, const Slice &stream_name,
const std::string &group_name, const std::string &consumer_name,
int *created_number);
std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const StreamEntryID &id);
StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std::string &group_name);
static std::string encodeStreamPelEntryValue(const StreamPelEntry &pel_entry);
static StreamPelEntry decodeStreamPelEntryValue(const std::string &value);
StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
};
} // namespace redis