/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * License); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "chunk_reader.h"

#include <limits>

#include "compress/compressor_factory.h"
#include "encoding/decoder_factory.h"

using namespace common;
namespace storage {

int ChunkReader::init(ReadFile* read_file, String m_name, TSDataType data_type,
                      Filter* time_filter) {
    read_file_ = read_file;
    measurement_name_.shallow_copy_from(m_name);
    time_decoder_ = DecoderFactory::alloc_time_decoder();
    value_decoder_ = nullptr;
    compressor_ = nullptr;
    time_filter_ = time_filter;
    uncompressed_buf_ = nullptr;
    if (IS_NULL(time_decoder_)) {
        return E_OOM;
    }
    return E_OK;
}

void ChunkReader::reset() {
    chunk_meta_ = nullptr;
    chunk_header_.reset();
    cur_page_header_.reset();

    if (uncompressed_buf_ != nullptr && compressor_ != nullptr) {
        compressor_->after_uncompress(uncompressed_buf_);
        uncompressed_buf_ = nullptr;
    }
    time_in_.reset();
    value_in_.reset();

    char* file_data_buf = in_stream_.get_wrapped_buf();
    if (file_data_buf != nullptr) {
        mem_free(file_data_buf);
    }
    in_stream_.reset();
    file_data_buf_size_ = 0;
    chunk_visit_offset_ = 0;
}

void ChunkReader::destroy() {
    if (uncompressed_buf_ != nullptr && compressor_ != nullptr) {
        compressor_->after_uncompress(uncompressed_buf_);
        uncompressed_buf_ = nullptr;
    }
    time_in_.reset();
    value_in_.reset();

    if (time_decoder_ != nullptr) {
        time_decoder_->~Decoder();
        DecoderFactory::free(time_decoder_);
        time_decoder_ = nullptr;
    }
    if (value_decoder_ != nullptr) {
        value_decoder_->~Decoder();
        DecoderFactory::free(value_decoder_);
        value_decoder_ = nullptr;
    }
    if (compressor_ != nullptr) {
        compressor_->~Compressor();
        CompressorFactory::free(compressor_);
        compressor_ = nullptr;
    }
    char* buf = in_stream_.get_wrapped_buf();
    if (buf != nullptr) {
        mem_free(buf);
        in_stream_.clear_wrapped_buf();
    }
    cur_page_header_.reset();
    chunk_header_.~ChunkHeader();
}

int ChunkReader::load_by_meta(ChunkMeta* meta) {
    int ret = E_OK;
    chunk_meta_ = meta;
#if DEBUG_SE
    std::cout << "ChunkReader::load_by_meta, meta=" << *meta << std::endl;
#endif
    /* ================ Step 1: reader data from file ================*/
    // at least, we can reader the chunk header and the first page header.
    // TODO configurable
    file_data_buf_size_ = 1024;
    int32_t ret_read_len = 0;
    char* file_data_buf =
        (char*)mem_alloc(file_data_buf_size_, MOD_CHUNK_READER);
    if (IS_NULL(file_data_buf)) {
        return E_OOM;
    }
    ret = read_file_->read(chunk_meta_->offset_of_chunk_header_, file_data_buf,
                           file_data_buf_size_, ret_read_len);
    if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
        ret = E_TSFILE_CORRUPTED;
        LOGE("file corrupted, ret=" << ret << ", offset="
                                    << chunk_meta_->offset_of_chunk_header_
                                    << "read_len=" << ret_read_len);
        mem_free(file_data_buf);
    }

    /* ================ Step 2: deserialize chunk_header ================*/
    if (IS_SUCC(ret)) {
        in_stream_.wrap_from(file_data_buf, ret_read_len);
        // std::cout << "in_stream_.wrap_from " << (void*)file_data_buf <<
        // std::endl;
        if (RET_FAIL(chunk_header_.deserialize_from(in_stream_))) {
        } else if (RET_FAIL(alloc_compressor_and_value_decoder(
                       chunk_header_.encoding_type_, chunk_header_.data_type_,
                       chunk_header_.compression_type_))) {
        } else {
            chunk_visit_offset_ =
                in_stream_.read_pos();  // point to end of chunk_header_
#if DEBUG_SE
            std::cout << "ChunkReader::load_by_meta, chunk_header="
                      << chunk_header_ << std::endl;
#endif
        }
    }
    return ret;
}

int ChunkReader::alloc_compressor_and_value_decoder(
    TSEncoding encoding, TSDataType data_type, CompressionType compression) {
    if (value_decoder_ != nullptr) {
        value_decoder_->reset();
    } else {
        value_decoder_ =
            DecoderFactory::alloc_value_decoder(encoding, data_type);
        if (IS_NULL(value_decoder_)) {
            return E_OOM;
        }
    }

    if (compressor_ != nullptr) {
        compressor_->reset(/*for_compress*/ false);
    } else {
        compressor_ = CompressorFactory::alloc_compressor(compression);
        if (compressor_ == nullptr) {
            return E_OOM;
        }
    }
    return E_OK;
}

int ChunkReader::get_next_page(TsBlock* ret_tsblock, Filter* oneshoot_filter,
                               PageArena& pa) {
    int ret = E_OK;
    Filter* filter =
        (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);

    if (prev_page_not_finish()) {
        ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
                                                     ret_tsblock, filter, &pa);
        if (ret == E_OVERFLOW) {
            ret = E_OK;
        } else {
            if (uncompressed_buf_ != nullptr) {
                compressor_->after_uncompress(uncompressed_buf_);
                uncompressed_buf_ = nullptr;
            }
            time_in_.reset();
            value_in_.reset();
        }
        return ret;
    }

    while (IS_SUCC(ret)) {
        if (!has_more_data()) {
            return E_NO_MORE_DATA;
        }
        if (RET_FAIL(get_cur_page_header())) {
        } else if (cur_page_statisify_filter(filter)) {
            break;
        } else if (RET_FAIL(skip_cur_page())) {
        }
    }

    if (IS_SUCC(ret)) {
        ret = decode_cur_page_data(ret_tsblock, filter, pa);
    }
    return ret;
}

int ChunkReader::get_cur_page_header() {
    int ret = E_OK;
    bool retry = true;
    int cur_page_header_serialized_size = 0;
    do {
        in_stream_.mark_read_pos();
        cur_page_header_.reset();
        ret = cur_page_header_.deserialize_from(
            in_stream_, !chunk_has_only_one_page(), chunk_header_.data_type_);
        cur_page_header_serialized_size = in_stream_.get_mark_len();
        if (deserialize_buf_not_enough(ret) && retry) {
            retry = false;
            ret = read_from_file_and_rewrap();
            if (E_OK == ret) {
                continue;
            }
        }
        break;
    } while (true);
    if (IS_SUCC(ret)) {
        // visit a header
        chunk_visit_offset_ += cur_page_header_serialized_size;
    }
#if DEBUG_SE
    std::cout << "get_cur_page_header, ret=" << ret << ", retry=" << retry
              << ", cur_page_header_=" << cur_page_header_
              << ", chunk_meta_->offset_of_chunk_header_="
              << chunk_meta_->offset_of_chunk_header_
              << ", cur_page_header_serialized_size="
              << cur_page_header_serialized_size << std::endl;
#endif
    return ret;
}

// reader at least @want_size bytes from file and wrap the buffer into
// @in_stream_
int ChunkReader::read_from_file_and_rewrap(int want_size) {
    int ret = E_OK;
    const int DEFAULT_READ_SIZE = 4096;  // may use page_size + page_header_size
    char* file_data_buf = in_stream_.get_wrapped_buf();
    int64_t offset = chunk_meta_->offset_of_chunk_header_ + chunk_visit_offset_;
    int read_size =
        (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
    if (file_data_buf_size_ < read_size ||
        read_size < file_data_buf_size_ / 10) {
        file_data_buf = (char*)mem_realloc(file_data_buf, read_size);
        if (IS_NULL(file_data_buf)) {
            return E_OOM;
        }
        file_data_buf_size_ = read_size;
    }
    int ret_read_len = 0;
    if (RET_FAIL(
            read_file_->read(offset, file_data_buf, read_size, ret_read_len))) {
    } else {
        in_stream_.wrap_from(file_data_buf, ret_read_len);
        // DEBUG_hex_dump_buf("wrapped buf = ", file_data_buf, 256);
    }
    return ret;
}

bool ChunkReader::cur_page_statisify_filter(Filter* filter) {
    return filter == nullptr || cur_page_header_.statistic_ == nullptr ||
           filter->satisfy(cur_page_header_.statistic_);
}

int ChunkReader::skip_cur_page() {
    int ret = E_OK;
    // visit a page tv data
    chunk_visit_offset_ += cur_page_header_.compressed_size_;
    in_stream_.wrapped_buf_advance_read_pos(cur_page_header_.compressed_size_);
    return ret;
}

int ChunkReader::decode_cur_page_data(TsBlock*& ret_tsblock, Filter* filter,
                                      PageArena& pa) {
    int ret = E_OK;

    // Step 1: make sure we load the whole page data in @in_stream_
    if (in_stream_.remaining_size() < cur_page_header_.compressed_size_) {
        // std::cout << "decode_cur_page_data. in_stream_.remaining_size="<<
        // in_stream_.remaining_size() << ", cur_page_header_.compressed_size_="
        // << cur_page_header_.compressed_size_ << std::endl;
        if (RET_FAIL(
                read_from_file_and_rewrap(cur_page_header_.compressed_size_))) {
        }
    }

    char* compressed_buf = nullptr;
    char* uncompressed_buf = nullptr;
    uint32_t compressed_buf_size = 0;  // cppcheck-suppress unreadVariable
    uint32_t uncompressed_buf_size = 0;
    char* time_buf = nullptr;
    char* value_buf = nullptr;
    uint32_t time_buf_size = 0;
    uint32_t value_buf_size = 0;

    // Step 2: do uncompress
    if (IS_SUCC(ret)) {
        compressed_buf = in_stream_.get_wrapped_buf() + in_stream_.read_pos();
        // std::cout << "ChunkReader::decode_cur_page_data,
        // in_stream_.get_wrapped_buf="
        // <<(void*)(in_stream_.get_wrapped_buf())<< ", in_stream_.read_pos=" <<
        // in_stream_.read_pos() << std::endl;
        compressed_buf_size = cur_page_header_.compressed_size_;
        in_stream_.wrapped_buf_advance_read_pos(compressed_buf_size);
        chunk_visit_offset_ += compressed_buf_size;
        if (RET_FAIL(compressor_->reset(false))) {
        } else if (RET_FAIL(compressor_->uncompress(
                       compressed_buf, compressed_buf_size, uncompressed_buf,
                       uncompressed_buf_size))) {
        } else {
            uncompressed_buf_ = uncompressed_buf;
        }
        // DEBUG_hex_dump_buf("ChunkReader reader, uncompressed buf = ",
        // uncompressed_buf, uncompressed_buf_size);
        if (ret != E_OK ||
            uncompressed_buf_size != cur_page_header_.uncompressed_size_) {
            ret = E_TSFILE_CORRUPTED;
            ASSERT(false);
        }
    }

    // Step 3: get time_buf & value_buf
    if (IS_SUCC(ret)) {
        int var_size = 0;
        if (RET_FAIL(SerializationUtil::read_var_uint(
                time_buf_size, uncompressed_buf, uncompressed_buf_size,
                &var_size))) {
        } else {
            time_buf = uncompressed_buf + var_size;
            value_buf = time_buf + time_buf_size;
            value_buf_size = uncompressed_buf_size - var_size - time_buf_size;
#if DEBUG_SE
            std::cout << "ChunkReader uncompress: compressed_buf_size="
                      << compressed_buf_size
                      << ", uncompressed_buf_size=" << uncompressed_buf_size
                      << ", var_size=" << var_size
                      << ", time_buf_size=" << time_buf_size << std::endl;
#endif
            if (uncompressed_buf_size <= var_size + time_buf_size) {
                ret = E_TSFILE_CORRUPTED;
                ASSERT(false);
            }
        }
    }

    // Step 4: decode time-value buffer into @ret_tsblock
    if (IS_SUCC(ret)) {
        time_decoder_->reset();
        value_decoder_->reset();
        time_in_.wrap_from(time_buf, time_buf_size);
        value_in_.wrap_from(value_buf, value_buf_size);
        // ret = decode_tv_buf_into_tsblock(time_buf, value_buf, time_buf_size,
        //                                  value_buf_size, ret_tsblock,
        //                                  filter);
        ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
                                                     ret_tsblock, filter, &pa);
        // if we return during @decode_tv_buf_into_tsblock, we should keep
        // @uncompressed_buf_ valid until all TV pairs are decoded.
        if (ret != E_OVERFLOW) {
            if (uncompressed_buf_ != nullptr) {
                compressor_->after_uncompress(uncompressed_buf_);
                uncompressed_buf_ = nullptr;
            }
            time_in_.reset();
            value_in_.reset();
        } else {
            ret = E_OK;
        }
    }
    return ret;
}

#define DECODE_TYPED_TV_INTO_TSBLOCK(CppType, ReadType, time_in, value_in,     \
                                     row_appender)                             \
    do {                                                                       \
        int64_t time = 0;                                                      \
        CppType value;                                                         \
        while (time_decoder_->has_remaining(time_in)) {                        \
            ASSERT(value_decoder_->has_remaining(value_in));                   \
            if (UNLIKELY(!row_appender.add_row())) {                           \
                ret = E_OVERFLOW;                                              \
                break;                                                         \
            } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {   \
            } else if (RET_FAIL(value_decoder_->read_##ReadType(value,         \
                                                                value_in))) {  \
            } else if (filter != nullptr && !filter->satisfy(time, value)) {   \
                row_appender.backoff_add_row();                                \
                continue;                                                      \
            } else {                                                           \
                /*std::cout << "decoder: time=" << time << ", value=" << value \
                 * << std::endl;*/                                             \
                row_appender.append(0, (char*)&time, sizeof(time));            \
                row_appender.append(1, (char*)&value, sizeof(value));          \
            }                                                                  \
        }                                                                      \
    } while (false)

int ChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream& time_in,
                                                  ByteStream& value_in,
                                                  RowAppender& row_appender,
                                                  Filter* filter) {
    int ret = E_OK;
    do {
        int64_t time = 0;
        int32_t value;
        while (time_decoder_->has_remaining(time_in)) {
            ASSERT(value_decoder_->has_remaining(value_in));
            if (UNLIKELY(!row_appender.add_row())) {
                ret = E_OVERFLOW;
                break;
            } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
            } else if (RET_FAIL(value_decoder_->read_int32(value, value_in))) {
            } else if (filter != nullptr && !filter->satisfy(time, value)) {
                row_appender.backoff_add_row();
                continue;
            } else {
                /*std::cout << "decoder: time=" << time << ", value=" << value
                 * << std::endl;*/
                row_appender.append(0, (char*)&time, sizeof(time));
                row_appender.append(1, (char*)&value, sizeof(value));
            }
        }
    } while (false);
    return ret;
}

int ChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream& time_in,
                                                     ByteStream& value_in,
                                                     RowAppender& row_appender,
                                                     PageArena& pa,
                                                     Filter* filter) {
    int ret = E_OK;
    int64_t time = 0;
    common::String value;
    while (time_decoder_->has_remaining(time_in)) {
        ASSERT(value_decoder_->has_remaining(value_in));
        if (UNLIKELY(!row_appender.add_row())) {
            ret = E_OVERFLOW;
            break;
        } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
        } else if (RET_FAIL(value_decoder_->read_String(value, pa, value_in))) {
        } else if (filter != nullptr && !filter->satisfy(time, value)) {
            row_appender.backoff_add_row();
            continue;
        } else {
            row_appender.append(0, (char*)&time, sizeof(time));
            row_appender.append(1, value.buf_, value.len_);
        }
    }
    return ret;
}

int ChunkReader::decode_tv_buf_into_tsblock_by_datatype(ByteStream& time_in,
                                                        ByteStream& value_in,
                                                        TsBlock* ret_tsblock,
                                                        Filter* filter,
                                                        common::PageArena* pa) {
    int ret = E_OK;
    RowAppender row_appender(ret_tsblock);
    switch (chunk_header_.data_type_) {
        case common::BOOLEAN:
            DECODE_TYPED_TV_INTO_TSBLOCK(bool, boolean, time_in_, value_in_,
                                         row_appender);
            break;
        case common::DATE:
        case common::INT32:
            // DECODE_TYPED_TV_INTO_TSBLOCK(int32_t, int32, time_in_, value_in_,
            // row_appender);
            ret = i32_DECODE_TYPED_TV_INTO_TSBLOCK(time_in_, value_in_,
                                                   row_appender, filter);
            break;
        case TIMESTAMP:
        case common::INT64:
            DECODE_TYPED_TV_INTO_TSBLOCK(int64_t, int64, time_in_, value_in_,
                                         row_appender);
            break;
        case common::FLOAT:
            DECODE_TYPED_TV_INTO_TSBLOCK(float, float, time_in_, value_in_,
                                         row_appender);
            break;
        case common::DOUBLE:
            DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_,
                                         row_appender);
            break;
        case common::TEXT:
        case common::BLOB:
        case common::STRING:
            ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
                time_in, value_in, row_appender, *pa, filter);
            break;
        default:
            ret = E_NOT_SUPPORT;
            ASSERT(false);
    }
    if (ret_tsblock->get_row_count() == 0 && ret == E_OK) {
        ret = E_NO_MORE_DATA;
    }
    return ret;
}

bool ChunkReader::should_skip_page_by_time(int64_t min_time_hint) {
    if (min_time_hint == std::numeric_limits<int64_t>::min()) {
        return false;
    }
    if (cur_page_header_.statistic_ == nullptr) {
        return false;
    }
    return cur_page_header_.statistic_->end_time_ < min_time_hint;
}

bool ChunkReader::should_skip_page_by_offset(int& row_offset) {
    if (row_offset <= 0) {
        return false;
    }
    if (cur_page_header_.statistic_ == nullptr ||
        cur_page_header_.statistic_->count_ == 0) {
        return false;
    }
    int32_t count = cur_page_header_.statistic_->count_;
    if (row_offset >= count) {
        row_offset -= count;
        return true;
    }
    return false;
}

int ChunkReader::get_next_page(TsBlock* ret_tsblock, Filter* oneshoot_filter,
                               PageArena& pa, int64_t min_time_hint,
                               int& row_offset, int& row_limit) {
    int ret = E_OK;
    Filter* filter =
        (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);

    if (row_limit == 0) {
        return E_NO_MORE_DATA;
    }

    if (prev_page_not_finish()) {
        ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
                                                     ret_tsblock, filter, &pa);
        if (ret == E_OVERFLOW) {
            ret = E_OK;
        } else {
            if (uncompressed_buf_ != nullptr) {
                compressor_->after_uncompress(uncompressed_buf_);
                uncompressed_buf_ = nullptr;
            }
            time_in_.reset();
            value_in_.reset();
        }
        return ret;
    }

    while (IS_SUCC(ret)) {
        if (!has_more_data()) {
            return E_NO_MORE_DATA;
        }
        if (RET_FAIL(get_cur_page_header())) {
        } else if (!cur_page_statisify_filter(filter)) {
            if (RET_FAIL(skip_cur_page())) {
            }
        } else if (should_skip_page_by_time(min_time_hint)) {
            if (RET_FAIL(skip_cur_page())) {
            }
        } else if (should_skip_page_by_offset(row_offset)) {
            if (RET_FAIL(skip_cur_page())) {
            }
        } else {
            break;
        }
    }

    if (IS_SUCC(ret)) {
        ret = decode_cur_page_data(ret_tsblock, filter, pa);
    }
    return ret;
}

}  // end namespace storage