blob: 066c7df5666a1f2f1f9ae5ca7ed05a0fdcffed13 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <dsn/dist/replication/replication_types.h>
#include <concurrentqueue/concurrentqueue.h>
#include <dsn/dist/replication/replica_base.h>
#include "hotkey_collector_state.h"
namespace pegasus {
namespace server {
class internal_collector_base;
struct detect_hotkey_result
{
int coarse_bucket_index = -1;
std::string hot_hash_key;
};
extern int get_bucket_id(dsn::string_view data);
extern bool
find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index);
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
//
// +--------------------+ +----------------------------------------------------+
// | Replcia server | | Hotkey collector |
// | | | +-----------------------------------------------+ |
// | +----------------+ | | | Corase capture | |
// | | | |--> | +----------+ | |
// | | RPC received | || | | | Data | | |
// | | | || | | +-----+----+ | |
// | +-------+--------+ || | | | | |
// | | || | | +---------------+----v--+-------+---------+ | |
// | v || | | | |Hot | | | | | |
// | +-------+--------+ || | | |Bucket |Bucket |Bucket |Bucket |Bucket | | |
// | | Replication | || | | +-----------+-----------------------------+ | |
// | | (only on the | || | | | | |
// | | write path)) | || | +--------------|--------------------------------+ |
// | +-------+--------+ || | +--v---+ |
// | | || | | Data | |
// | v || | +------+ |
// | +-------+--------+ || | +-----|-------+-------------+ |
// | | | || | +------|-------------|-------------|---------+ |
// | | Capture data ---| | | Fine |capture | | | |
// | | | | | | | | | | |
// | +-------+--------+ | | | +----v----+ +----v----+ +----v----+ | |
// | | | | | | queue | | queue | | queue | | |
// | v | | | +----+----+ +----+----+ +----+----+ | |
// | +-------+--------+ | | | | | | | |
// | | | | | | +----v-------------v-------------v------+ | |
// | | Place data | | | | | Analsis pool | | |
// | | to the disk | | | | +-----------------|---------------------+ | |
// | | | | | +-------------------|------------------------+ |
// | +----------------+ | | v |
// | | | Hotkey |
// +--------------------+ +----------------------------------------------------+
class hotkey_collector : public dsn::replication::replica_base
{
public:
hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base);
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
void analyse_data();
void handle_rpc(const dsn::replication::detect_hotkey_request &req,
/*out*/ dsn::replication::detect_hotkey_response &resp);
private:
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
void terminate();
bool terminate_if_timeout();
detect_hotkey_result _result;
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};
// Be sure every function in internal_collector_base should be thread safe
class internal_collector_base : public dsn::replication::replica_base
{
public:
explicit internal_collector_base(replica_base *base) : replica_base(base){};
virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
virtual void analyse_data(detect_hotkey_result &result) = 0;
virtual void clear() = 0;
};
// used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers
class hotkey_empty_data_collector : public internal_collector_base
{
public:
explicit hotkey_empty_data_collector(replica_base *base) : internal_collector_base(base) {}
void capture_data(const dsn::blob &hash_key, uint64_t weight) override {}
void analyse_data(detect_hotkey_result &result) override {}
void clear() override {}
};
// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector
class hotkey_coarse_data_collector : public internal_collector_base
{
public:
explicit hotkey_coarse_data_collector(replica_base *base);
void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
void analyse_data(detect_hotkey_result &result) override;
void clear() override;
private:
hotkey_coarse_data_collector() = delete;
std::vector<std::atomic<uint64_t>> _hash_buckets;
friend class coarse_collector_test;
};
class hotkey_fine_data_collector : public internal_collector_base
{
public:
hotkey_fine_data_collector(replica_base *base, int target_bucket_index, int max_queue_size);
void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
void analyse_data(detect_hotkey_result &result) override;
void clear() override;
private:
hotkey_fine_data_collector() = delete;
const uint32_t _max_queue_size;
const uint32_t _target_bucket_index;
// ConcurrentQueue is a lock-free queue to capture keys
moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>> _capture_key_queue;
friend class fine_collector_test;
};
} // namespace server
} // namespace pegasus