/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#pragma once

#include <getopt.h>
#include <thread>
#include <iomanip>
#include <fstream>
#include <queue>
#include <boost/algorithm/string.hpp>
#include <rocksdb/db.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/env.h>
#include <rocksdb/statistics.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/dist/remote_command.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>
#include <dsn/utils/time_utils.h>

#include <rrdb/rrdb.code.definition.h>
#include <rrdb/rrdb_types.h>
#include <pegasus/version.h>
#include <pegasus/git_commit.h>
#include <pegasus/error.h>
#include <geo/lib/geo_client.h>

#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"

#include "command_executor.h"
#include "command_utils.h"

using namespace dsn::replication;

#define STR_I(var) #var
#define STR(var) STR_I(var)
#ifndef DSN_BUILD_TYPE
#define PEGASUS_BUILD_TYPE ""
#else
#define PEGASUS_BUILD_TYPE STR(DSN_BUILD_TYPE)
#endif

DEFINE_TASK_CODE(LPC_SCAN_DATA, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
enum scan_data_operator
{
    SCAN_COPY,
    SCAN_CLEAR,
    SCAN_COUNT,
    SCAN_GEN_GEO
};
class top_container
{
public:
    struct top_heap_item
    {
        std::string hash_key;
        std::string sort_key;
        long row_size;
        top_heap_item(std::string &&hash_key_, std::string &&sort_key_, long row_size_)
            : hash_key(std::move(hash_key_)), sort_key(std::move(sort_key_)), row_size(row_size_)
        {
        }
    };
    struct top_heap_compare
    {
        bool operator()(top_heap_item i1, top_heap_item i2) { return i1.row_size < i2.row_size; }
    };
    typedef std::priority_queue<top_heap_item, std::vector<top_heap_item>, top_heap_compare>
        top_heap;

    top_container(int count) : _count(count) {}

    void push(std::string &&hash_key, std::string &&sort_key, long row_size)
    {
        dsn::utils::auto_lock<dsn::utils::ex_lock_nr> l(_lock);
        if (_heap.size() < _count) {
            _heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
        } else {
            const top_heap_item &top = _heap.top();
            if (top.row_size < row_size) {
                _heap.pop();
                _heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
            }
        }
    }

    top_heap &all() { return _heap; }

private:
    int _count;
    top_heap _heap;
    dsn::utils::ex_lock_nr _lock;
};

enum class histogram_type
{
    HASH_KEY_SIZE,
    SORT_KEY_SIZE,
    VALUE_SIZE,
    ROW_SIZE
};

struct scan_data_context
{
    scan_data_operator op;
    int split_id;
    int max_batch_count;
    int timeout_ms;
    bool no_overwrite; // if set true, then use check_and_set() instead of set()
                       // when inserting data to destination table for copy_data,
                       // to not overwrite old data if it aleady exist.
    pegasus::pegasus_client::filter_type sort_key_filter_type;
    std::string sort_key_filter_pattern;
    pegasus::pegasus_client::filter_type value_filter_type;
    std::string value_filter_pattern;
    pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
    pegasus::pegasus_client *client;
    pegasus::geo::geo_client *geoclient;
    std::atomic_bool *error_occurred;
    std::atomic_long split_rows;
    std::atomic_long split_request_count;
    std::atomic_bool split_completed;
    bool stat_size;
    std::shared_ptr<rocksdb::Statistics> statistics;
    int top_count;
    top_container top_rows;
    bool count_hash_key;
    std::string last_hash_key;
    std::atomic_long split_hash_key_count;
    scan_data_context(scan_data_operator op_,
                      int split_id_,
                      int max_batch_count_,
                      int timeout_ms_,
                      pegasus::pegasus_client::pegasus_scanner_wrapper scanner_,
                      pegasus::pegasus_client *client_,
                      pegasus::geo::geo_client *geoclient_,
                      std::atomic_bool *error_occurred_,
                      bool stat_size_ = false,
                      std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
                      int top_count_ = 0,
                      bool count_hash_key_ = false)
        : op(op_),
          split_id(split_id_),
          max_batch_count(max_batch_count_),
          timeout_ms(timeout_ms_),
          no_overwrite(false),
          sort_key_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
          value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
          scanner(scanner_),
          client(client_),
          geoclient(geoclient_),
          error_occurred(error_occurred_),
          split_rows(0),
          split_request_count(0),
          split_completed(false),
          stat_size(stat_size_),
          statistics(statistics_),
          top_count(top_count_),
          top_rows(top_count_),
          count_hash_key(count_hash_key_),
          split_hash_key_count(0)
    {
        // max_batch_count should > 1 because scan may be terminated
        // when split_request_count = 1
        dassert(max_batch_count > 1, "");
    }
    void set_sort_key_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
    {
        sort_key_filter_type = type;
        sort_key_filter_pattern = pattern;
    }
    void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
    {
        value_filter_type = type;
        value_filter_pattern = pattern;
    }
    void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
    while (true) {
        long old = max.load();
        if (value <= old || max.compare_exchange_weak(old, value)) {
            break;
        }
    }
}
inline pegasus::pegasus_client::filter_type parse_filter_type(const std::string &name,
                                                              bool include_exact)
{
    if (include_exact && name == "exact")
        return pegasus::pegasus_client::FT_MATCH_EXACT;
    else
        return (pegasus::pegasus_client::filter_type)type_from_string(
            dsn::apps::_filter_type_VALUES_TO_NAMES,
            std::string("ft_match_") + name,
            ::dsn::apps::filter_type::FT_NO_FILTER);
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
                            const std::string &filter_pattern,
                            const std::string &value)
{
    switch (filter_type) {
    case pegasus::pegasus_client::FT_NO_FILTER:
        return true;
    case pegasus::pegasus_client::FT_MATCH_EXACT:
        return filter_pattern == value;
    case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
    case pegasus::pegasus_client::FT_MATCH_PREFIX:
    case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
        if (filter_pattern.length() == 0)
            return true;
        if (value.length() < filter_pattern.length())
            return false;
        if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
            return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
        } else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
            return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
        } else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
            return ::memcmp(value.data() + value.length() - filter_pattern.length(),
                            filter_pattern.data(),
                            filter_pattern.length()) == 0;
        }
    }
    default:
        dassert(false, "unsupported filter type: %d", filter_type);
    }
    return false;
}
// return true if the data is valid for the filter
inline bool
validate_filter(scan_data_context *context, const std::string &sort_key, const std::string &value)
{
    // for sort key, we only need to check MATCH_EXACT, because it is not supported
    // on the server side, but MATCH_PREFIX is already satisified.
    if (context->sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT &&
        sort_key.length() > context->sort_key_filter_pattern.length())
        return false;
    return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
}
inline void scan_data_next(scan_data_context *context)
{
    while (!context->split_completed.load() && !context->error_occurred->load() &&
           context->split_request_count.load() < context->max_batch_count) {
        context->split_request_count++;
        context->scanner->async_next([context](int ret,
                                               std::string &&hash_key,
                                               std::string &&sort_key,
                                               std::string &&value,
                                               pegasus::pegasus_client::internal_info &&info) {
            if (ret == pegasus::PERR_OK) {
                if (validate_filter(context, sort_key, value)) {
                    switch (context->op) {
                    case SCAN_COPY:
                        context->split_request_count++;
                        if (context->no_overwrite) {
                            auto callback = [context](
                                int err,
                                pegasus::pegasus_client::check_and_set_results &&results,
                                pegasus::pegasus_client::internal_info &&info) {
                                if (err != pegasus::PERR_OK) {
                                    if (!context->split_completed.exchange(true)) {
                                        fprintf(stderr,
                                                "ERROR: split[%d] async check and set failed: %s\n",
                                                context->split_id,
                                                context->client->get_error_string(err));
                                        context->error_occurred->store(true);
                                    }
                                } else {
                                    if (results.set_succeed) {
                                        context->split_rows++;
                                    }
                                    scan_data_next(context);
                                }
                                // should put "split_request_count--" at end of the scope,
                                // to prevent that split_request_count becomes 0 in the middle.
                                context->split_request_count--;
                            };
                            pegasus::pegasus_client::check_and_set_options options;
                            context->client->async_check_and_set(
                                hash_key,
                                sort_key,
                                pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
                                "",
                                sort_key,
                                value,
                                options,
                                std::move(callback),
                                context->timeout_ms);
                        } else {
                            auto callback =
                                [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                    if (err != pegasus::PERR_OK) {
                                        if (!context->split_completed.exchange(true)) {
                                            fprintf(stderr,
                                                    "ERROR: split[%d] async set failed: %s\n",
                                                    context->split_id,
                                                    context->client->get_error_string(err));
                                            context->error_occurred->store(true);
                                        }
                                    } else {
                                        context->split_rows++;
                                        scan_data_next(context);
                                    }
                                    // should put "split_request_count--" at end of the scope,
                                    // to prevent that split_request_count becomes 0 in the middle.
                                    context->split_request_count--;
                                };
                            context->client->async_set(hash_key,
                                                       sort_key,
                                                       value,
                                                       std::move(callback),
                                                       context->timeout_ms);
                        }
                        break;
                    case SCAN_CLEAR:
                        context->split_request_count++;
                        context->client->async_del(
                            hash_key,
                            sort_key,
                            [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                if (err != pegasus::PERR_OK) {
                                    if (!context->split_completed.exchange(true)) {
                                        fprintf(stderr,
                                                "ERROR: split[%d] async del failed: %s\n",
                                                context->split_id,
                                                context->client->get_error_string(err));
                                        context->error_occurred->store(true);
                                    }
                                } else {
                                    context->split_rows++;
                                    scan_data_next(context);
                                }
                                // should put "split_request_count--" at end of the scope,
                                // to prevent that split_request_count becomes 0 in the middle.
                                context->split_request_count--;
                            },
                            context->timeout_ms);
                        break;
                    case SCAN_COUNT:
                        context->split_rows++;
                        if (context->stat_size && context->statistics) {
                            long hash_key_size = hash_key.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
                                hash_key_size);

                            long sort_key_size = sort_key.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
                                sort_key_size);

                            long value_size = value.size();
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);

                            long row_size = hash_key_size + sort_key_size + value_size;
                            context->statistics->measureTime(
                                static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);

                            if (context->top_count > 0) {
                                context->top_rows.push(
                                    std::move(hash_key), std::move(sort_key), row_size);
                            }
                        }
                        if (context->count_hash_key) {
                            if (hash_key != context->last_hash_key) {
                                context->split_hash_key_count++;
                                context->last_hash_key = std::move(hash_key);
                            }
                        }
                        scan_data_next(context);
                        break;
                    case SCAN_GEN_GEO:
                        context->split_request_count++;
                        context->geoclient->async_set(
                            hash_key,
                            sort_key,
                            value,
                            [context](int err, pegasus::pegasus_client::internal_info &&info) {
                                if (err != pegasus::PERR_OK) {
                                    if (!context->split_completed.exchange(true)) {
                                        fprintf(stderr,
                                                "ERROR: split[%d] async set failed: %s\n",
                                                context->split_id,
                                                context->client->get_error_string(err));
                                        context->error_occurred->store(true);
                                    }
                                } else {
                                    context->split_rows++;
                                    scan_data_next(context);
                                }
                                // should put "split_request_count--" at end of the scope,
                                // to prevent that split_request_count becomes 0 in the middle.
                                context->split_request_count--;
                            },
                            context->timeout_ms);
                        break;
                    default:
                        dassert(false, "op = %d", context->op);
                        break;
                    }
                } else {
                    scan_data_next(context);
                }
            } else if (ret == pegasus::PERR_SCAN_COMPLETE) {
                context->split_completed.store(true);
            } else {
                if (!context->split_completed.exchange(true)) {
                    fprintf(stderr,
                            "ERROR: split[%d] scan next failed: %s\n",
                            context->split_id,
                            context->client->get_error_string(ret));
                    context->error_occurred->store(true);
                }
            }
            // should put "split_request_count--" at end of the scope,
            // to prevent that split_request_count becomes 0 in the middle.
            context->split_request_count--;
        });

        if (context->count_hash_key) {
            // disable parallel scan if count_hash_key == true
            break;
        }
    }
}

struct node_desc
{
    std::string desc;
    dsn::rpc_address address;
    node_desc(const std::string &s, const dsn::rpc_address &n) : desc(s), address(n) {}
};
// type: all | replica-server | meta-server
inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector<node_desc> &nodes)
{
    if (type == "all" || type == "meta-server") {
        for (auto &addr : sc->meta_list) {
            nodes.emplace_back("meta-server", addr);
        }
    }

    if (type == "all" || type == "replica-server") {
        std::map<dsn::rpc_address, dsn::replication::node_status::type> rs_nodes;
        ::dsn::error_code err =
            sc->ddl_client->list_nodes(dsn::replication::node_status::NS_ALIVE, rs_nodes);
        if (err != ::dsn::ERR_OK) {
            fprintf(stderr, "ERROR: list node failed: %s\n", err.to_string());
            return false;
        }
        for (auto &kv : rs_nodes) {
            nodes.emplace_back("replica-server", kv.first);
        }
    }

    return true;
}

inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
                    const std::vector<node_desc> &nodes,
                    const std::string &cmd,
                    const std::vector<std::string> &arguments)
{
    std::vector<std::pair<bool, std::string>> results;
    std::vector<dsn::task_ptr> tasks;
    tasks.resize(nodes.size());
    results.resize(nodes.size());
    for (int i = 0; i < nodes.size(); ++i) {
        auto callback = [&results, i](::dsn::error_code err, const std::string &resp) {
            if (err == ::dsn::ERR_OK) {
                results[i].first = true;
                results[i].second = resp;
            } else {
                results[i].first = false;
                results[i].second = err.to_string();
            }
        };
        tasks[i] = dsn::dist::cmd::async_call_remote(
            nodes[i].address, cmd, arguments, callback, std::chrono::milliseconds(5000));
    }
    for (int i = 0; i < nodes.size(); ++i) {
        tasks[i]->wait();
    }
    return results;
}

inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
                                                int32_t &app_id,
                                                int32_t &partition_index,
                                                std::string &counter_name)
{
    std::string::size_type find = name.find_last_of('@');
    if (find == std::string::npos)
        return false;
    int n = sscanf(name.c_str() + find + 1, "%d.%d", &app_id, &partition_index);
    if (n != 2)
        return false;
    std::string::size_type find2 = name.find_last_of('*');
    if (find2 == std::string::npos)
        return false;
    counter_name = name.substr(find2 + 1, find - find2 - 1);
    return true;
}

inline bool parse_app_perf_counter_name(const std::string &name,
                                        std::string &app_name,
                                        std::string &counter_name)
{
    /**
     * name format:
     *   1.{node}*{section}*{counter_name}@{app_name}.{percent_line}
     *   2.{node}*{section}*{counter_name}@{app_name}
     */
    std::string::size_type find = name.find_last_of('@');
    if (find == std::string::npos)
        return false;

    std::string::size_type find2 = name.find_last_of('.');
    if (find2 == std::string::npos) {
        app_name = name.substr(find + 1);
    } else {
        app_name = name.substr(find + 1, find2 - find - 1);
    }

    std::string::size_type find3 = name.find_last_of('*');
    if (find3 == std::string::npos)
        return false;
    counter_name = name.substr(find3 + 1, find - find3 - 1);
    return true;
}

struct row_data
{
    double get_total_qps() const
    {
        return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps +
               multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps +
               duplicate_qps;
    }

    double get_total_cu() const { return recent_read_cu + recent_write_cu; }

    double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }

    double get_total_write_qps() const
    {
        return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
               check_and_mutate_qps;
    }

    std::string row_name;
    int32_t app_id = 0;
    int32_t partition_count = 0;
    double get_qps = 0;
    double multi_get_qps = 0;
    double put_qps = 0;
    double multi_put_qps = 0;
    double remove_qps = 0;
    double multi_remove_qps = 0;
    double incr_qps = 0;
    double check_and_set_qps = 0;
    double check_and_mutate_qps = 0;
    double scan_qps = 0;
    double duplicate_qps = 0;
    double dup_shipped_ops = 0;
    double dup_failed_shipping_ops = 0;
    double recent_read_cu = 0;
    double recent_write_cu = 0;
    double recent_expire_count = 0;
    double recent_filter_count = 0;
    double recent_abnormal_count = 0;
    double recent_write_throttling_delay_count = 0;
    double recent_write_throttling_reject_count = 0;
    double storage_mb = 0;
    double storage_count = 0;
    double rdb_block_cache_hit_count = 0;
    double rdb_block_cache_total_count = 0;
    double rdb_index_and_filter_blocks_mem_usage = 0;
    double rdb_memtable_mem_usage = 0;
    double rdb_estimate_num_keys = 0;
    double rdb_bf_seek_negatives = 0;
    double rdb_bf_seek_total = 0;
    double rdb_bf_point_positive_true = 0;
    double rdb_bf_point_positive_total = 0;
    double rdb_bf_point_negatives = 0;
    double backup_request_qps = 0;
    double get_bytes = 0;
    double multi_get_bytes = 0;
    double scan_bytes = 0;
    double put_bytes = 0;
    double multi_put_bytes = 0;
    double check_and_set_bytes = 0;
    double check_and_mutate_bytes = 0;
};

inline bool
update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, double value)
{
    if (counter_name == "get_qps")
        row.get_qps += value;
    else if (counter_name == "multi_get_qps")
        row.multi_get_qps += value;
    else if (counter_name == "put_qps")
        row.put_qps += value;
    else if (counter_name == "multi_put_qps")
        row.multi_put_qps += value;
    else if (counter_name == "remove_qps")
        row.remove_qps += value;
    else if (counter_name == "multi_remove_qps")
        row.multi_remove_qps += value;
    else if (counter_name == "incr_qps")
        row.incr_qps += value;
    else if (counter_name == "check_and_set_qps")
        row.check_and_set_qps += value;
    else if (counter_name == "check_and_mutate_qps")
        row.check_and_mutate_qps += value;
    else if (counter_name == "scan_qps")
        row.scan_qps += value;
    else if (counter_name == "duplicate_qps")
        row.duplicate_qps += value;
    else if (counter_name == "dup_shipped_ops")
        row.dup_shipped_ops += value;
    else if (counter_name == "dup_failed_shipping_ops")
        row.dup_failed_shipping_ops += value;
    else if (counter_name == "recent.read.cu")
        row.recent_read_cu += value;
    else if (counter_name == "recent.write.cu")
        row.recent_write_cu += value;
    else if (counter_name == "recent.expire.count")
        row.recent_expire_count += value;
    else if (counter_name == "recent.filter.count")
        row.recent_filter_count += value;
    else if (counter_name == "recent.abnormal.count")
        row.recent_abnormal_count += value;
    else if (counter_name == "recent.write.throttling.delay.count")
        row.recent_write_throttling_delay_count += value;
    else if (counter_name == "recent.write.throttling.reject.count")
        row.recent_write_throttling_reject_count += value;
    else if (counter_name == "disk.storage.sst(MB)")
        row.storage_mb += value;
    else if (counter_name == "disk.storage.sst.count")
        row.storage_count += value;
    else if (counter_name == "rdb.block_cache.hit_count")
        row.rdb_block_cache_hit_count += value;
    else if (counter_name == "rdb.block_cache.total_count")
        row.rdb_block_cache_total_count += value;
    else if (counter_name == "rdb.index_and_filter_blocks.memory_usage")
        row.rdb_index_and_filter_blocks_mem_usage += value;
    else if (counter_name == "rdb.memtable.memory_usage")
        row.rdb_memtable_mem_usage += value;
    else if (counter_name == "rdb.estimate_num_keys")
        row.rdb_estimate_num_keys += value;
    else if (counter_name == "rdb.bf_seek_negatives")
        row.rdb_bf_seek_negatives += value;
    else if (counter_name == "rdb.bf_seek_total")
        row.rdb_bf_seek_total += value;
    else if (counter_name == "rdb.bf_point_positive_true")
        row.rdb_bf_point_positive_true += value;
    else if (counter_name == "rdb.bf_point_positive_total")
        row.rdb_bf_point_positive_total += value;
    else if (counter_name == "rdb.bf_point_negatives")
        row.rdb_bf_point_negatives += value;
    else if (counter_name == "backup_request_qps")
        row.backup_request_qps += value;
    else if (counter_name == "get_bytes")
        row.get_bytes += value;
    else if (counter_name == "multi_get_bytes")
        row.multi_get_bytes += value;
    else if (counter_name == "scan_bytes")
        row.scan_bytes += value;
    else if (counter_name == "put_bytes")
        row.put_bytes += value;
    else if (counter_name == "multi_put_bytes")
        row.multi_put_bytes += value;
    else if (counter_name == "check_and_set_bytes")
        row.check_and_set_bytes += value;
    else if (counter_name == "check_and_mutate_bytes")
        row.check_and_mutate_bytes += value;
    else
        return false;
    return true;
}

inline bool get_apps_and_nodes(shell_context *sc,
                               std::vector<::dsn::app_info> &apps,
                               std::vector<node_desc> &nodes)
{
    dsn::error_code err = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
    if (err != dsn::ERR_OK) {
        derror("list apps failed, error = %s", err.to_string());
        return false;
    }
    if (!fill_nodes(sc, "replica-server", nodes)) {
        derror("get replica server node list failed");
        return false;
    }
    return true;
}

inline bool
get_app_partitions(shell_context *sc,
                   const std::vector<::dsn::app_info> &apps,
                   std::map<int32_t, std::vector<dsn::partition_configuration>> &app_partitions)
{
    for (const ::dsn::app_info &app : apps) {
        int32_t app_id = 0;
        int32_t partition_count = 0;
        dsn::error_code err = sc->ddl_client->list_app(
            app.app_name, app_id, partition_count, app_partitions[app.app_id]);
        if (err != ::dsn::ERR_OK) {
            derror("list app %s failed, error = %s", app.app_name.c_str(), err.to_string());
            return false;
        }
        dassert(app_id == app.app_id, "%d VS %d", app_id, app.app_id);
        dassert(partition_count == app.partition_count,
                "%d VS %d",
                partition_count,
                app.partition_count);
    }
    return true;
}

inline bool decode_node_perf_counter_info(const dsn::rpc_address &node_addr,
                                          const std::pair<bool, std::string> &result,
                                          dsn::perf_counter_info &info)
{
    if (!result.first) {
        derror("query perf counter info from node %s failed", node_addr.to_string());
        return false;
    }
    dsn::blob bb(result.second.data(), 0, result.second.size());
    if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
        derror("decode perf counter info from node %s failed, result = %s",
               node_addr.to_string(),
               result.second.c_str());
        return false;
    }
    if (info.result != "OK") {
        derror("query perf counter info from node %s returns error, error = %s",
               node_addr.to_string(),
               info.result.c_str());
        return false;
    }
    return true;
}

// rows: key-app name, value-perf counters for each partition
inline bool get_app_partition_stat(shell_context *sc,
                                   std::map<std::string, std::vector<row_data>> &rows)
{
    // get apps and nodes
    std::vector<::dsn::app_info> apps;
    std::vector<node_desc> nodes;
    if (!get_apps_and_nodes(sc, apps, nodes)) {
        return false;
    }

    // get the relationship between app_id and app_name
    std::map<int32_t, std::string> app_id_name;
    std::map<std::string, int32_t> app_name_id;
    for (::dsn::app_info &app : apps) {
        app_id_name[app.app_id] = app.app_name;
        app_name_id[app.app_name] = app.app_id;
        rows[app.app_name].resize(app.partition_count);
    }

    // get app_id --> partitions
    std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
    if (!get_app_partitions(sc, apps, app_partitions)) {
        return false;
    }

    // get all of the perf counters with format ".*@.*"
    std::vector<std::pair<bool, std::string>> results =
        call_remote_command(sc, nodes, "perf-counters", {".*@.*"});

    for (int i = 0; i < nodes.size(); ++i) {
        // decode info of perf-counters on node i
        dsn::perf_counter_info info;
        if (!decode_node_perf_counter_info(nodes[i].address, results[i], info)) {
            return false;
        }

        for (dsn::perf_counter_metric &m : info.counters) {
            // get app_id/partition_id/counter_name/app_name from the name of perf-counter
            int32_t app_id_x, partition_index_x;
            std::string counter_name;
            std::string app_name;

            if (parse_app_pegasus_perf_counter_name(
                    m.name, app_id_x, partition_index_x, counter_name)) {
                // only primary partition will be counted
                auto find = app_partitions.find(app_id_x);
                if (find != app_partitions.end() &&
                    find->second[partition_index_x].primary == nodes[i].address) {
                    row_data &row = rows[app_id_name[app_id_x]][partition_index_x];
                    row.row_name = std::to_string(partition_index_x);
                    row.app_id = app_id_x;
                    update_app_pegasus_perf_counter(row, counter_name, m.value);
                }
            } else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) {
                // if the app_name from perf-counter isn't existed(maybe the app was dropped), it
                // will be ignored.
                if (app_name_id.find(app_name) == app_name_id.end()) {
                    continue;
                }
                // perf-counter value will be set into partition index 0.
                row_data &row = rows[app_name][0];
                row.app_id = app_name_id[app_name];
                update_app_pegasus_perf_counter(row, counter_name, m.value);
            }
        }
    }
    return true;
}

inline bool
get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_data> &rows)
{
    std::vector<::dsn::app_info> apps;
    std::vector<node_desc> nodes;
    if (!get_apps_and_nodes(sc, apps, nodes))
        return false;

    ::dsn::app_info *app_info = nullptr;
    if (!app_name.empty()) {
        for (auto &app : apps) {
            if (app.app_name == app_name) {
                app_info = &app;
                break;
            }
        }
        if (app_info == nullptr) {
            derror("app %s not found", app_name.c_str());
            return false;
        }
    }

    std::vector<std::string> arguments;
    char tmp[256];
    if (app_name.empty()) {
        sprintf(tmp, ".*@.*");
    } else {
        sprintf(tmp, ".*@%d\\..*", app_info->app_id);
    }
    arguments.emplace_back(tmp);
    std::vector<std::pair<bool, std::string>> results =
        call_remote_command(sc, nodes, "perf-counters", arguments);

    if (app_name.empty()) {
        std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
        if (!get_app_partitions(sc, apps, app_partitions))
            return false;

        rows.resize(app_partitions.size());
        int idx = 0;
        std::map<int32_t, int> app_row_idx; // app_id --> row_idx
        for (::dsn::app_info &app : apps) {
            rows[idx].row_name = app.app_name;
            rows[idx].app_id = app.app_id;
            rows[idx].partition_count = app.partition_count;
            app_row_idx[app.app_id] = idx;
            idx++;
        }

        for (int i = 0; i < nodes.size(); ++i) {
            dsn::rpc_address node_addr = nodes[i].address;
            dsn::perf_counter_info info;
            if (!decode_node_perf_counter_info(node_addr, results[i], info))
                return false;
            for (dsn::perf_counter_metric &m : info.counters) {
                int32_t app_id_x, partition_index_x;
                std::string counter_name;
                if (!parse_app_pegasus_perf_counter_name(
                        m.name, app_id_x, partition_index_x, counter_name)) {
                    continue;
                }
                auto find = app_partitions.find(app_id_x);
                if (find == app_partitions.end())
                    continue;
                dsn::partition_configuration &pc = find->second[partition_index_x];
                if (pc.primary != node_addr)
                    continue;
                update_app_pegasus_perf_counter(rows[app_row_idx[app_id_x]], counter_name, m.value);
            }
        }
    } else {
        rows.resize(app_info->partition_count);
        for (int i = 0; i < app_info->partition_count; i++)
            rows[i].row_name = std::to_string(i);
        int32_t app_id = 0;
        int32_t partition_count = 0;
        std::vector<dsn::partition_configuration> partitions;
        dsn::error_code err =
            sc->ddl_client->list_app(app_name, app_id, partition_count, partitions);
        if (err != ::dsn::ERR_OK) {
            derror("list app %s failed, error = %s", app_name.c_str(), err.to_string());
            return false;
        }
        dassert(app_id == app_info->app_id, "%d VS %d", app_id, app_info->app_id);
        dassert(partition_count == app_info->partition_count,
                "%d VS %d",
                partition_count,
                app_info->partition_count);

        for (int i = 0; i < nodes.size(); ++i) {
            dsn::rpc_address node_addr = nodes[i].address;
            dsn::perf_counter_info info;
            if (!decode_node_perf_counter_info(node_addr, results[i], info))
                return false;
            for (dsn::perf_counter_metric &m : info.counters) {
                int32_t app_id_x, partition_index_x;
                std::string counter_name;
                bool parse_ret = parse_app_pegasus_perf_counter_name(
                    m.name, app_id_x, partition_index_x, counter_name);
                dassert(parse_ret, "name = %s", m.name.c_str());
                dassert(app_id_x == app_id, "name = %s", m.name.c_str());
                dassert(partition_index_x < partition_count, "name = %s", m.name.c_str());
                if (partitions[partition_index_x].primary != node_addr)
                    continue;
                update_app_pegasus_perf_counter(rows[partition_index_x], counter_name, m.value);
            }
        }
    }
    return true;
}

struct node_capacity_unit_stat
{
    // timestamp when node perf_counter_info has updated.
    std::string timestamp;
    std::string node_address;
    // mapping: app_id --> (read_cu, write_cu)
    std::map<int32_t, std::pair<int64_t, int64_t>> cu_value_by_app;

    std::string dump_to_json() const
    {
        std::map<int32_t, std::vector<int64_t>> values;
        for (auto &kv : cu_value_by_app) {
            auto &pair = kv.second;
            if (pair.first != 0 || pair.second != 0)
                values.emplace(kv.first, std::vector<int64_t>{pair.first, pair.second});
        }
        std::stringstream out;
        rapidjson::OStreamWrapper wrapper(out);
        dsn::json::JsonWriter writer(wrapper);
        dsn::json::json_encode(writer, values);
        return out.str();
    }
};

inline bool get_capacity_unit_stat(shell_context *sc,
                                   std::vector<node_capacity_unit_stat> &nodes_stat)
{
    std::vector<node_desc> nodes;
    if (!fill_nodes(sc, "replica-server", nodes)) {
        derror("get replica server node list failed");
        return false;
    }

    std::vector<std::pair<bool, std::string>> results =
        call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"});

    nodes_stat.resize(nodes.size());
    for (int i = 0; i < nodes.size(); ++i) {
        dsn::rpc_address node_addr = nodes[i].address;
        dsn::perf_counter_info info;
        if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
            dwarn("decode perf counter from node(%s) failed, just ignore it",
                  node_addr.to_string());
            continue;
        }
        nodes_stat[i].timestamp = info.timestamp_str;
        nodes_stat[i].node_address = node_addr.to_string();
        for (dsn::perf_counter_metric &m : info.counters) {
            int32_t app_id, pidx;
            std::string counter_name;
            bool r = parse_app_pegasus_perf_counter_name(m.name, app_id, pidx, counter_name);
            dassert(r, "name = %s", m.name.c_str());
            if (counter_name == "recent.read.cu") {
                nodes_stat[i].cu_value_by_app[app_id].first += (int64_t)m.value;
            } else if (counter_name == "recent.write.cu") {
                nodes_stat[i].cu_value_by_app[app_id].second += (int64_t)m.value;
            }
        }
    }
    return true;
}

struct app_storage_size_stat
{
    // timestamp when this stat is generated.
    std::string timestamp;
    // mapping: app_id --> [app_partition_count, stat_partition_count, storage_size_in_mb]
    std::map<int32_t, std::vector<int64_t>> st_value_by_app;

    std::string dump_to_json() const
    {
        std::stringstream out;
        rapidjson::OStreamWrapper wrapper(out);
        dsn::json::JsonWriter writer(wrapper);
        dsn::json::json_encode(writer, st_value_by_app);
        return out.str();
    }
};

inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_stat)
{
    std::vector<::dsn::app_info> apps;
    std::vector<node_desc> nodes;
    if (!get_apps_and_nodes(sc, apps, nodes)) {
        derror("get apps and nodes failed");
        return false;
    }

    std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
    if (!get_app_partitions(sc, apps, app_partitions)) {
        derror("get app partitions failed");
        return false;
    }
    for (auto &kv : app_partitions) {
        auto &v = kv.second;
        for (auto &c : v) {
            // use partition_flags to record if this partition's storage size is calculated,
            // because `app_partitions' is a temporary variable, so we can re-use partition_flags.
            c.partition_flags = 0;
        }
    }

    std::vector<std::pair<bool, std::string>> results = call_remote_command(
        sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"});

    for (int i = 0; i < nodes.size(); ++i) {
        dsn::rpc_address node_addr = nodes[i].address;
        dsn::perf_counter_info info;
        if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
            dwarn("decode perf counter from node(%s) failed, just ignore it",
                  node_addr.to_string());
            continue;
        }
        for (dsn::perf_counter_metric &m : info.counters) {
            int32_t app_id_x, partition_index_x;
            std::string counter_name;
            bool parse_ret = parse_app_pegasus_perf_counter_name(
                m.name, app_id_x, partition_index_x, counter_name);
            dassert(parse_ret, "name = %s", m.name.c_str());
            if (counter_name != "disk.storage.sst(MB)")
                continue;
            auto find = app_partitions.find(app_id_x);
            if (find == app_partitions.end()) // app id not found
                continue;
            dsn::partition_configuration &pc = find->second[partition_index_x];
            if (pc.primary != node_addr) // not primary replica
                continue;
            if (pc.partition_flags != 0) // already calculated
                continue;
            pc.partition_flags = 1;
            int64_t app_partition_count = find->second.size();
            auto st_it = st_stat.st_value_by_app
                             .emplace(app_id_x, std::vector<int64_t>{app_partition_count, 0, 0})
                             .first;
            st_it->second[1]++;          // stat_partition_count
            st_it->second[2] += m.value; // storage_size_in_mb
        }
    }

    char buf[20];
    dsn::utils::time_ms_to_date_time(dsn_now_ms(), buf, sizeof(buf));
    st_stat.timestamp = buf;
    return true;
}

inline configuration_proposal_action new_proposal_action(const dsn::rpc_address &target,
                                                         const dsn::rpc_address &node,
                                                         config_type::type type)
{
    configuration_proposal_action act;
    act.__set_target(target);
    act.__set_node(node);
    act.__set_type(type);
    return act;
}
