// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/rowset/segment_v2/segment.h"

#include <gtest/gtest.h>

#include <boost/filesystem.hpp>
#include <functional>
#include <iostream>

#include "common/logging.h"
#include "gutil/strings/substitute.h"
#include "olap/comparison_predicate.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/in_list_predicate.h"
#include "olap/olap_common.h"
#include "olap/row_block.h"
#include "olap/row_block2.h"
#include "olap/row_cursor.h"
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_helper.h"
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "util/file_utils.h"
#include "test_util/test_util.h"

namespace doris {
namespace segment_v2 {

using std::string;
using std::shared_ptr;

using std::vector;

using ValueGenerator = std::function<void(size_t rid, int cid, int block_id, RowCursorCell& cell)>;

// 0,  1,  2,  3
// 10, 11, 12, 13
// 20, 21, 22, 23
static void DefaultIntGenerator(size_t rid, int cid, int block_id, RowCursorCell& cell) {
    cell.set_not_null();
    *(int*)cell.mutable_cell_ptr() = rid * 10 + cid;
}

static bool column_contains_index(ColumnMetaPB column_meta, ColumnIndexTypePB type) {
    for (int i = 0; i < column_meta.indexes_size(); ++i) {
        if (column_meta.indexes(i).type() == type) {
            return true;
        }
    }
    return false;
}

class SegmentReaderWriterTest : public ::testing::Test {
protected:
    void SetUp() override {
        if (FileUtils::check_exist(kSegmentDir)) {
            ASSERT_TRUE(FileUtils::remove_all(kSegmentDir).ok());
        }
        ASSERT_TRUE(FileUtils::create_dir(kSegmentDir).ok());
    }

    void TearDown() override {
        if (FileUtils::check_exist(kSegmentDir)) {
            ASSERT_TRUE(FileUtils::remove_all(kSegmentDir).ok());
        }
    }

    TabletSchema create_schema(const std::vector<TabletColumn>& columns,
                               int num_short_key_columns = -1) {
        TabletSchema res;
        int num_key_columns = 0;
        for (auto& col : columns) {
            if (col.is_key()) {
                num_key_columns++;
            }
            res._cols.push_back(col);
        }
        res._num_columns = columns.size();
        res._num_key_columns = num_key_columns;
        res._num_short_key_columns =
                num_short_key_columns != -1 ? num_short_key_columns : num_key_columns;
        res.init_field_index_for_test();
        return res;
    }

    void build_segment(SegmentWriterOptions opts, const TabletSchema& build_schema,
                       const TabletSchema& query_schema, size_t nrows,
                       const ValueGenerator& generator, shared_ptr<Segment>* res) {
        static int seg_id = 0;
        // must use unique filename for each segment, otherwise page cache kicks in and produces
        // the wrong answer (it use (filename,offset) as cache key)
        std::string filename = strings::Substitute("$0/seg_$1.dat", kSegmentDir, seg_id++);
        std::unique_ptr<fs::WritableBlock> wblock;
        fs::CreateBlockOptions block_opts({filename});
        Status st = fs::fs_util::block_manager()->create_block(block_opts, &wblock);
        ASSERT_TRUE(st.ok());
        SegmentWriter writer(wblock.get(), 0, &build_schema, opts);
        st = writer.init(10);
        ASSERT_TRUE(st.ok());

        RowCursor row;
        auto olap_st = row.init(build_schema);
        ASSERT_EQ(OLAP_SUCCESS, olap_st);

        for (size_t rid = 0; rid < nrows; ++rid) {
            for (int cid = 0; cid < build_schema.num_columns(); ++cid) {
                int row_block_id = rid / opts.num_rows_per_block;
                RowCursorCell cell = row.cell(cid);
                generator(rid, cid, row_block_id, cell);
            }
            ASSERT_TRUE(writer.append_row(row).ok());
        }

        uint64_t file_size, index_size;
        st = writer.finalize(&file_size, &index_size);
        ASSERT_TRUE(st.ok());
        ASSERT_TRUE(wblock->close().ok());

        st = Segment::open(filename, 0, &query_schema, res);
        ASSERT_TRUE(st.ok());
        ASSERT_EQ(nrows, (*res)->num_rows());
    }

private:
    const std::string kSegmentDir = "./ut_dir/segment_test";
};

TEST_F(SegmentReaderWriterTest, normal) {
    TabletSchema tablet_schema = create_schema(
            {create_int_key(1), create_int_key(2), create_int_value(3), create_int_value(4)});

    SegmentWriterOptions opts;
    opts.num_rows_per_block = 10;

    shared_ptr<Segment> segment;
    build_segment(opts, tablet_schema, tablet_schema, 4096, DefaultIntGenerator, &segment);

    // reader
    {
        Schema schema(tablet_schema);
        OlapReaderStatistics stats;
        // scan all rows
        {
            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            int left = 4096;

            int rowid = 0;
            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                ASSERT_TRUE(iter->next_batch(&block).ok());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        ASSERT_FALSE(column_block.is_null(i));
                        ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                    }
                }
                rowid += rows_read;
            }
        }
        // test seek, key
        {
            // lower bound
            std::unique_ptr<RowCursor> lower_bound(new RowCursor());
            lower_bound->init(tablet_schema, 2);
            {
                auto cell = lower_bound->cell(0);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = 100;
            }
            {
                auto cell = lower_bound->cell(1);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = 100;
            }

            // upper bound
            std::unique_ptr<RowCursor> upper_bound(new RowCursor());
            upper_bound->init(tablet_schema, 1);
            {
                auto cell = upper_bound->cell(0);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = 200;
            }

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), true);
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 100);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
            ASSERT_EQ(11, block.num_rows());
            auto column_block = block.column_block(0);
            for (int i = 0; i < 11; ++i) {
                ASSERT_EQ(100 + i * 10, *(int*)column_block.cell_ptr(i));
            }
        }
        // test seek, key
        {
            // lower bound
            std::unique_ptr<RowCursor> lower_bound(new RowCursor());
            lower_bound->init(tablet_schema, 1);
            {
                auto cell = lower_bound->cell(0);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = 40970;
            }

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 100);
            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
        // test seek, key (-2, -1)
        {
            // lower bound
            std::unique_ptr<RowCursor> lower_bound(new RowCursor());
            lower_bound->init(tablet_schema, 1);
            {
                auto cell = lower_bound->cell(0);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = -2;
            }

            std::unique_ptr<RowCursor> upper_bound(new RowCursor());
            upper_bound->init(tablet_schema, 1);
            {
                auto cell = upper_bound->cell(0);
                cell.set_not_null();
                *(int*)cell.mutable_cell_ptr() = -1;
            }

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 100);
            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
    }
}

TEST_F(SegmentReaderWriterTest, LazyMaterialization) {
    TabletSchema tablet_schema = create_schema({create_int_key(1), create_int_value(2)});
    ValueGenerator data_gen = [](size_t rid, int cid, int block_id, RowCursorCell& cell) {
        cell.set_not_null();
        if (cid == 0) {
            *(int*)(cell.mutable_cell_ptr()) = rid;
        } else if (cid == 1) {
            *(int*)(cell.mutable_cell_ptr()) = rid * 10;
        }
    };

    {
        shared_ptr<Segment> segment;
        build_segment(SegmentWriterOptions(), tablet_schema, tablet_schema, 100, data_gen,
                      &segment);
        {
            // lazy enabled when predicate is subset of returned columns:
            // select c1, c2 where c2 = 30;
            Schema read_schema(tablet_schema);
            std::unique_ptr<ColumnPredicate> predicate(new EqualPredicate<int32_t>(1, 30));
            const std::vector<ColumnPredicate*> predicates = {predicate.get()};

            OlapReaderStatistics stats;
            StorageReadOptions read_opts;
            read_opts.column_predicates = predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());

            RowBlockV2 block(read_schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_TRUE(iter->is_lazy_materialization_read());
            ASSERT_EQ(1, block.selected_size());
            ASSERT_EQ(99, stats.rows_vec_cond_filtered);
            auto row = block.row(block.selection_vector()[0]);
            ASSERT_EQ("[3,30]", row.debug_string());
        }
        {
            // lazy disabled when all return columns have predicates:
            // select c1, c2 where c1 = 10 and c2 = 100;
            Schema read_schema(tablet_schema);
            std::unique_ptr<ColumnPredicate> p0(new EqualPredicate<int32_t>(0, 10));
            std::unique_ptr<ColumnPredicate> p1(new EqualPredicate<int32_t>(1, 100));
            const std::vector<ColumnPredicate*> predicates = {p0.get(), p1.get()};

            OlapReaderStatistics stats;
            StorageReadOptions read_opts;
            read_opts.column_predicates = predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());

            RowBlockV2 block(read_schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_FALSE(iter->is_lazy_materialization_read());
            ASSERT_EQ(1, block.selected_size());
            ASSERT_EQ(99, stats.rows_vec_cond_filtered);
            auto row = block.row(block.selection_vector()[0]);
            ASSERT_EQ("[10,100]", row.debug_string());
        }
        {
            // lazy disabled when no predicate:
            // select c2
            std::vector<ColumnId> read_cols = {1};
            Schema read_schema(tablet_schema.columns(), read_cols);
            OlapReaderStatistics stats;
            StorageReadOptions read_opts;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());

            RowBlockV2 block(read_schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_FALSE(iter->is_lazy_materialization_read());
            ASSERT_EQ(100, block.selected_size());
            for (int i = 0; i < block.selected_size(); ++i) {
                auto row = block.row(block.selection_vector()[i]);
                ASSERT_EQ(strings::Substitute("[$0]", i * 10), row.debug_string());
            }
        }
    }

    {
        tablet_schema = create_schema({create_int_key(1, true, false, true), create_int_value(2)});
        shared_ptr<Segment> segment;
        SegmentWriterOptions write_opts;
        build_segment(write_opts, tablet_schema, tablet_schema, 100, data_gen, &segment);
        ASSERT_TRUE(column_contains_index(segment->footer().columns(0), BITMAP_INDEX));
        {
            // lazy disabled when all predicates are removed by bitmap index:
            // select c1, c2 where c2 = 30;
            Schema read_schema(tablet_schema);
            std::unique_ptr<ColumnPredicate> predicate(new EqualPredicate<int32_t>(0, 20));
            const std::vector<ColumnPredicate*> predicates = {predicate.get()};

            OlapReaderStatistics stats;
            StorageReadOptions read_opts;
            read_opts.column_predicates = predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());

            RowBlockV2 block(read_schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_FALSE(iter->is_lazy_materialization_read());
            ASSERT_EQ(1, block.selected_size());
            ASSERT_EQ(99, stats.rows_bitmap_index_filtered);
            ASSERT_EQ(0, stats.rows_vec_cond_filtered);
            auto row = block.row(block.selection_vector()[0]);
            ASSERT_EQ("[20,200]", row.debug_string());
        }
    }
}

TEST_F(SegmentReaderWriterTest, TestIndex) {
    TabletSchema tablet_schema = create_schema({create_int_key(1), create_int_key(2, true, true),
                                                create_int_key(3), create_int_value(4)});

    SegmentWriterOptions opts;
    opts.num_rows_per_block = 10;

    std::shared_ptr<Segment> segment;
    // 0, 1, 2, 3
    // 10, 11, 12, 13
    // 20, 21, 22, 23
    // ...
    // 64k int will generate 4 pages
    build_segment(
            opts, tablet_schema, tablet_schema, 64 * 1024,
            [](size_t rid, int cid, int block_id, RowCursorCell& cell) {
                cell.set_not_null();
                if (rid >= 16 * 1024 && rid < 32 * 1024) {
                    // make second page all rows equal
                    *(int*)cell.mutable_cell_ptr() = 164000 + cid;

                } else {
                    *(int*)cell.mutable_cell_ptr() = rid * 10 + cid;
                }
            },
            &segment);

    // reader with condition
    {
        Schema schema(tablet_schema);
        OlapReaderStatistics stats;
        // test empty segment iterator
        {
            // the first two page will be read by this condition
            TCondition condition;
            condition.__set_column_name("3");
            condition.__set_condition_op("<");
            std::vector<std::string> vals = {"2"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(&tablet_schema);
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.conditions = conditions.get();

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1);

            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
        // scan all rows
        {
            TCondition condition;
            condition.__set_column_name("2");
            condition.__set_condition_op("<");
            std::vector<std::string> vals = {"100"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(&tablet_schema);
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.conditions = conditions.get();

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            // only first page will be read because of zone map
            int left = 16 * 1024;

            int rowid = 0;
            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                ASSERT_TRUE(iter->next_batch(&block).ok());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        ASSERT_FALSE(column_block.is_null(i));
                        ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i))
                                << "rid:" << rid << ", i:" << i;
                    }
                }
                rowid += rows_read;
            }
            ASSERT_EQ(16 * 1024, rowid);
            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
        // test zone map with query predicate an delete predicate
        {
            // the first two page will be read by this condition
            TCondition condition;
            condition.__set_column_name("2");
            condition.__set_condition_op("<");
            std::vector<std::string> vals = {"165000"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(&tablet_schema);
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

            // the second page read will be pruned by the following delete predicate
            TCondition delete_condition;
            delete_condition.__set_column_name("2");
            delete_condition.__set_condition_op("=");
            std::vector<std::string> vals2 = {"164001"};
            delete_condition.__set_condition_values(vals2);
            std::shared_ptr<Conditions> delete_conditions(new Conditions());
            delete_conditions->set_tablet_schema(&tablet_schema);
            ASSERT_EQ(OLAP_SUCCESS, delete_conditions->append_condition(delete_condition));

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.conditions = conditions.get();
            read_opts.delete_conditions.push_back(delete_conditions.get());

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            // so the first page will be read because of zone map
            int left = 16 * 1024;

            int rowid = 0;
            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                auto s = iter->next_batch(&block);
                ASSERT_TRUE(s.ok()) << s.to_string();
                ASSERT_EQ(rows_read, block.num_rows());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        ASSERT_FALSE(column_block.is_null(i));
                        ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i))
                                << "rid:" << rid << ", i:" << i;
                    }
                }
                rowid += rows_read;
            }
            ASSERT_EQ(16 * 1024, rowid);
            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
        // test bloom filter
        {
            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            TCondition condition;
            condition.__set_column_name("2");
            condition.__set_condition_op("=");
            // 102 is not in page 1
            std::vector<std::string> vals = {"102"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(&tablet_schema);
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));
            read_opts.conditions = conditions.get();
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
    }
}

TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
    size_t num_rows_per_block = 10;

    std::shared_ptr<TabletSchema> tablet_schema(new TabletSchema());
    tablet_schema->_num_columns = 4;
    tablet_schema->_num_key_columns = 3;
    tablet_schema->_num_short_key_columns = 2;
    tablet_schema->_num_rows_per_row_block = num_rows_per_block;
    tablet_schema->_cols.push_back(create_int_key(1));
    tablet_schema->_cols.push_back(create_int_key(2));
    tablet_schema->_cols.push_back(create_int_key(3));
    tablet_schema->_cols.push_back(create_int_value(4));

    // segment write
    std::string dname = "./ut_dir/segment_write_size";
    FileUtils::remove_all(dname);
    FileUtils::create_dir(dname);

    SegmentWriterOptions opts;
    opts.num_rows_per_block = num_rows_per_block;

    std::string fname = dname + "/int_case";
    std::unique_ptr<fs::WritableBlock> wblock;
    fs::CreateBlockOptions wblock_opts({fname});
    Status st = fs::fs_util::block_manager()->create_block(wblock_opts, &wblock);
    ASSERT_TRUE(st.ok()) << st.to_string();
    SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
    st = writer.init(10);
    ASSERT_TRUE(st.ok()) << st.to_string();

    RowCursor row;
    auto olap_st = row.init(*tablet_schema);
    ASSERT_EQ(OLAP_SUCCESS, olap_st);

    // 0, 1, 2, 3
    // 10, 11, 12, 13
    // 20, 21, 22, 23
    for (int i = 0; i < LOOP_LESS_OR_MORE(1024, 1048576); ++i) {
        for (int j = 0; j < 4; ++j) {
            auto cell = row.cell(j);
            cell.set_not_null();
            *(int*)cell.mutable_cell_ptr() = i * 10 + j;
        }
        writer.append_row(row);
    }

    uint32_t segment_size = writer.estimate_segment_size();
    LOG(INFO) << "estimate segment size is:" << segment_size;

    uint64_t file_size = 0;
    uint64_t index_size;
    ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok());
    ASSERT_TRUE(wblock->close().ok());

    file_size = boost::filesystem::file_size(fname);
    LOG(INFO) << "segment file size is:" << file_size;

    ASSERT_NE(segment_size, 0);

    FileUtils::remove_all(dname);
}

TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
    std::vector<TabletColumn> columns = {create_int_key(1), create_int_key(2), create_int_value(3),
                                         create_int_value(4)};
    TabletSchema build_schema = create_schema(columns);

    // add a column with null default value
    {
        std::vector<TabletColumn> read_columns = columns;
        read_columns.push_back(create_int_value(5, OLAP_FIELD_AGGREGATION_SUM, true, "NULL"));
        TabletSchema query_schema = create_schema(read_columns);

        std::shared_ptr<Segment> segment;
        build_segment(SegmentWriterOptions(), build_schema, query_schema, 4096, DefaultIntGenerator,
                      &segment);

        Schema schema(query_schema);
        OlapReaderStatistics stats;
        // scan all rows
        {
            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            int left = 4096;

            int rowid = 0;
            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                ASSERT_TRUE(iter->next_batch(&block).ok());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        if (cid == 4) {
                            ASSERT_TRUE(column_block.is_null(i));
                        } else {
                            ASSERT_FALSE(column_block.is_null(i));
                            ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                        }
                    }
                }
                rowid += rows_read;
            }
        }
    }

    // add a column with non-null default value
    {
        std::vector<TabletColumn> read_columns = columns;
        read_columns.push_back(create_int_value(5, OLAP_FIELD_AGGREGATION_SUM, true, "10086"));
        TabletSchema query_schema = create_schema(read_columns);

        std::shared_ptr<Segment> segment;
        build_segment(SegmentWriterOptions(), build_schema, query_schema, 4096, DefaultIntGenerator,
                      &segment);

        Schema schema(query_schema);
        OlapReaderStatistics stats;
        // scan all rows
        {
            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            int left = 4096;

            int rowid = 0;
            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                ASSERT_TRUE(iter->next_batch(&block).ok());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        if (cid == 4) {
                            ASSERT_FALSE(column_block.is_null(i));
                            ASSERT_EQ(10086, *(int*)column_block.cell_ptr(i));
                        } else {
                            ASSERT_FALSE(column_block.is_null(i));
                            ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i));
                        }
                    }
                }
                rowid += rows_read;
            }
        }
    }
}

TEST_F(SegmentReaderWriterTest, TestStringDict) {
    size_t num_rows_per_block = 10;
    auto tracker = std::make_shared<MemTracker>();
    MemPool pool(tracker.get());

    std::shared_ptr<TabletSchema> tablet_schema(new TabletSchema());
    tablet_schema->_num_columns = 4;
    tablet_schema->_num_key_columns = 3;
    tablet_schema->_num_short_key_columns = 2;
    tablet_schema->_num_rows_per_row_block = num_rows_per_block;
    tablet_schema->_cols.push_back(create_char_key(1));
    tablet_schema->_cols.push_back(create_char_key(2));
    tablet_schema->_cols.push_back(create_varchar_key(3));
    tablet_schema->_cols.push_back(create_varchar_key(4));
    tablet_schema->init_field_index_for_test();

    //    segment write
    std::string dname = "./ut_dir/segment_test";
    FileUtils::create_dir(dname);

    SegmentWriterOptions opts;
    opts.num_rows_per_block = num_rows_per_block;

    std::string fname = dname + "/string_case";
    std::unique_ptr<fs::WritableBlock> wblock;
    fs::CreateBlockOptions wblock_opts({fname});
    Status st = fs::fs_util::block_manager()->create_block(wblock_opts, &wblock);
    ASSERT_TRUE(st.ok());
    SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
    st = writer.init(10);
    ASSERT_TRUE(st.ok());

    RowCursor row;
    auto olap_st = row.init(*tablet_schema);
    ASSERT_EQ(OLAP_SUCCESS, olap_st);

    // 0, 1, 2, 3
    // 10, 11, 12, 13
    // 20, 21, 22, 23
    // convert int to string
    for (int i = 0; i < 4096; ++i) {
        for (int j = 0; j < 4; ++j) {
            auto cell = row.cell(j);
            cell.set_not_null();
            set_column_value_by_type(tablet_schema->_cols[j]._type, i * 10 + j,
                                     (char*)cell.mutable_cell_ptr(), &pool,
                                     tablet_schema->_cols[j]._length);
        }
        Status status = writer.append_row(row);
        ASSERT_TRUE(status.ok());
    }

    uint64_t file_size = 0;
    uint64_t index_size;
    ASSERT_TRUE(writer.finalize(&file_size, &index_size).ok());
    ASSERT_TRUE(wblock->close().ok());

    {
        std::shared_ptr<Segment> segment;
        st = Segment::open(fname, 0, tablet_schema.get(), &segment);
        ASSERT_TRUE(st.ok());
        ASSERT_EQ(4096, segment->num_rows());
        Schema schema(*tablet_schema);
        OlapReaderStatistics stats;
        // scan all rows
        {
            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            int left = 4096;
            int rowid = 0;

            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                st = iter->next_batch(&block);
                ASSERT_TRUE(st.ok());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        ASSERT_FALSE(column_block.is_null(i));
                        const Slice* actual =
                                reinterpret_cast<const Slice*>(column_block.cell_ptr(i));

                        Slice expect;
                        set_column_value_by_type(tablet_schema->_cols[j]._type, rid * 10 + cid,
                                                 reinterpret_cast<char*>(&expect), &pool,
                                                 tablet_schema->_cols[j]._length);
                        ASSERT_EQ(expect.to_string(), actual->to_string());
                    }
                }
                rowid += rows_read;
            }
        }

        // test seek, key
        {
            // lower bound
            std::unique_ptr<RowCursor> lower_bound(new RowCursor());
            lower_bound->init(*tablet_schema, 1);
            {
                auto cell = lower_bound->cell(0);
                cell.set_not_null();
                set_column_value_by_type(OLAP_FIELD_TYPE_CHAR, 40970,
                                         (char*)cell.mutable_cell_ptr(), &pool,
                                         tablet_schema->_cols[0]._length);
            }

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 100);
            st = iter->next_batch(&block);
            ASSERT_TRUE(st.is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }

        // test seek, key (-2, -1)
        {
            // lower bound
            std::unique_ptr<RowCursor> lower_bound(new RowCursor());
            lower_bound->init(*tablet_schema, 1);
            {
                auto cell = lower_bound->cell(0);
                cell.set_not_null();
                set_column_value_by_type(OLAP_FIELD_TYPE_CHAR, -2, (char*)cell.mutable_cell_ptr(),
                                         &pool, tablet_schema->_cols[0]._length);
            }

            std::unique_ptr<RowCursor> upper_bound(new RowCursor());
            upper_bound->init(*tablet_schema, 1);
            {
                auto cell = upper_bound->cell(0);
                cell.set_not_null();
                set_column_value_by_type(OLAP_FIELD_TYPE_CHAR, -1, (char*)cell.mutable_cell_ptr(),
                                         &pool, tablet_schema->_cols[0]._length);
            }

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 100);
            st = iter->next_batch(&block);
            ASSERT_TRUE(st.is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }

        // test char zone_map query hit;should read whole page
        {
            TCondition condition;
            condition.__set_column_name("1");
            condition.__set_condition_op(">");
            std::vector<std::string> vals = {"100"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(tablet_schema.get());
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.conditions = conditions.get();

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            int left = 4 * 1024;
            int rowid = 0;

            while (left > 0) {
                int rows_read = left > 1024 ? 1024 : left;
                block.clear();
                st = iter->next_batch(&block);
                ASSERT_TRUE(st.ok());
                ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
                ASSERT_EQ(rows_read, block.num_rows());
                left -= rows_read;

                for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
                    auto cid = block.schema()->column_ids()[j];
                    auto column_block = block.column_block(j);
                    for (int i = 0; i < rows_read; ++i) {
                        int rid = rowid + i;
                        ASSERT_FALSE(column_block.is_null(i));

                        const Slice* actual =
                                reinterpret_cast<const Slice*>(column_block.cell_ptr(i));
                        Slice expect;
                        set_column_value_by_type(tablet_schema->_cols[j]._type, rid * 10 + cid,
                                                 reinterpret_cast<char*>(&expect), &pool,
                                                 tablet_schema->_cols[j]._length);
                        ASSERT_EQ(expect.to_string(), actual->to_string())
                                << "rid:" << rid << ", i:" << i;
                        ;
                    }
                }
                rowid += rows_read;
            }
            ASSERT_EQ(4 * 1024, rowid);
            st = iter->next_batch(&block);
            ASSERT_TRUE(st.is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }

        // test char zone_map query miss;col < -1
        {
            TCondition condition;
            condition.__set_column_name("1");
            condition.__set_condition_op("<");
            std::vector<std::string> vals = {"-2"};
            condition.__set_condition_values(vals);
            std::shared_ptr<Conditions> conditions(new Conditions());
            conditions->set_tablet_schema(tablet_schema.get());
            ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

            StorageReadOptions read_opts;
            read_opts.stats = &stats;
            read_opts.conditions = conditions.get();

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            st = iter->next_batch(&block);
            ASSERT_TRUE(st.is_end_of_file());
            ASSERT_EQ(0, block.num_rows());
        }
    }

    FileUtils::remove_all(dname);
}

TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
    TabletSchema tablet_schema = create_schema({create_int_key(1, true, false, true),
                                                create_int_key(2, true, false, true),
                                                create_int_value(3), create_int_value(4)});

    SegmentWriterOptions opts;
    shared_ptr<Segment> segment;
    build_segment(opts, tablet_schema, tablet_schema, 4096, DefaultIntGenerator, &segment);
    ASSERT_TRUE(column_contains_index(segment->footer().columns(0), BITMAP_INDEX));
    ASSERT_TRUE(column_contains_index(segment->footer().columns(1), BITMAP_INDEX));

    {
        Schema schema(tablet_schema);

        // test where v1=10
        {
            std::vector<ColumnPredicate*> column_predicates;
            std::unique_ptr<ColumnPredicate> predicate(new EqualPredicate<int32_t>(0, 10));
            column_predicates.emplace_back(predicate.get());

            StorageReadOptions read_opts;
            OlapReaderStatistics stats;
            read_opts.column_predicates = column_predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_EQ(block.num_rows(), 1);
            ASSERT_EQ(read_opts.stats->raw_rows_read, 1);
        }

        // test where v1=10 and v2=11
        {
            std::vector<ColumnPredicate*> column_predicates;
            std::unique_ptr<ColumnPredicate> predicate(new EqualPredicate<int32_t>(0, 10));
            std::unique_ptr<ColumnPredicate> predicate2(new EqualPredicate<int32_t>(1, 11));
            column_predicates.emplace_back(predicate.get());
            column_predicates.emplace_back(predicate2.get());

            StorageReadOptions read_opts;
            OlapReaderStatistics stats;
            read_opts.column_predicates = column_predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_EQ(block.num_rows(), 1);
            ASSERT_EQ(read_opts.stats->raw_rows_read, 1);
        }

        // test where v1=10 and v2=15
        {
            std::vector<ColumnPredicate*> column_predicates;
            std::unique_ptr<ColumnPredicate> predicate(new EqualPredicate<int32_t>(0, 10));
            std::unique_ptr<ColumnPredicate> predicate2(new EqualPredicate<int32_t>(1, 15));
            column_predicates.emplace_back(predicate.get());
            column_predicates.emplace_back(predicate2.get());

            StorageReadOptions read_opts;
            OlapReaderStatistics stats;
            read_opts.column_predicates = column_predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            ASSERT_FALSE(iter->next_batch(&block).ok());
            ASSERT_EQ(read_opts.stats->raw_rows_read, 0);
        }

        // test where v1 in (10,20,1)
        {
            std::vector<ColumnPredicate*> column_predicates;
            std::set<int32_t> values;
            values.insert(10);
            values.insert(20);
            values.insert(1);
            std::unique_ptr<ColumnPredicate> predicate(
                    new InListPredicate<int32_t>(0, std::move(values)));
            column_predicates.emplace_back(predicate.get());

            StorageReadOptions read_opts;
            OlapReaderStatistics stats;
            read_opts.column_predicates = column_predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);
            ASSERT_TRUE(iter->next_batch(&block).ok());
            ASSERT_EQ(read_opts.stats->raw_rows_read, 2);
        }

        // test where v1 not in (10,20)
        {
            std::vector<ColumnPredicate*> column_predicates;
            std::set<int32_t> values;
            values.insert(10);
            values.insert(20);
            std::unique_ptr<ColumnPredicate> predicate(
                    new NotInListPredicate<int32_t>(0, std::move(values)));
            column_predicates.emplace_back(predicate.get());

            StorageReadOptions read_opts;
            OlapReaderStatistics stats;
            read_opts.column_predicates = column_predicates;
            read_opts.stats = &stats;

            std::unique_ptr<RowwiseIterator> iter;
            segment->new_iterator(schema, read_opts, &iter);

            RowBlockV2 block(schema, 1024);

            Status st;
            do {
                block.clear();
                st = iter->next_batch(&block);
            } while (st.ok());
            ASSERT_EQ(read_opts.stats->raw_rows_read, 4094);
        }
    }
}

TEST_F(SegmentReaderWriterTest, TestBloomFilterIndexUniqueModel) {
    TabletSchema schema =
            create_schema({create_int_key(1), create_int_key(2), create_int_key(3),
                           create_int_value(4, OLAP_FIELD_AGGREGATION_REPLACE, true, "", true)});

    // for not base segment
    SegmentWriterOptions opts1;
    shared_ptr<Segment> seg1;
    build_segment(opts1, schema, schema, 100, DefaultIntGenerator, &seg1);
    ASSERT_TRUE(column_contains_index(seg1->footer().columns(3), BLOOM_FILTER_INDEX));

    // for base segment
    SegmentWriterOptions opts2;
    shared_ptr<Segment> seg2;
    build_segment(opts2, schema, schema, 100, DefaultIntGenerator, &seg2);
    ASSERT_TRUE(column_contains_index(seg2->footer().columns(3), BLOOM_FILTER_INDEX));
}

} // namespace segment_v2
} // namespace doris

int main(int argc, char** argv) {
    doris::StoragePageCache::create_global_cache(1 << 30, 0.1);
    ::testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}
