// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/cfile/binary_plain_block.h"

#include <algorithm>
#include <cstdint>
#include <functional>
#include <ostream>
#include <vector>

#include <glog/logging.h>

#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/group_varint-inl.h"
#include "kudu/util/hexdump.h"

using std::vector;

namespace kudu {
namespace cfile {

BinaryPlainBlockBuilder::BinaryPlainBlockBuilder(const WriterOptions *options)
  : end_of_data_offset_(0),
    size_estimate_(0),
    options_(options) {
  Reset();
}
BinaryPlainBlockBuilder::~BinaryPlainBlockBuilder() = default;

void BinaryPlainBlockBuilder::Reset() {
  offsets_.clear();
  buffer_.clear();
  buffer_.reserve(options_->storage_attributes.cfile_block_size);
  buffer_.resize(kHeaderSize);

  size_estimate_ = kHeaderSize;
  end_of_data_offset_ = kHeaderSize;
  finished_ = false;
}

bool BinaryPlainBlockBuilder::IsBlockFull() const {
  return size_estimate_ > options_->storage_attributes.cfile_block_size;
}

void BinaryPlainBlockBuilder::Finish(rowid_t ordinal_pos, vector<Slice>* slices) {
  finished_ = true;

  size_t offsets_pos = buffer_.size();

  // Set up the header
  InlineEncodeFixed32(&buffer_[0], ordinal_pos);
  InlineEncodeFixed32(&buffer_[4], offsets_.size());
  InlineEncodeFixed32(&buffer_[8], offsets_pos);

  // append the offsets, if non-empty
  if (!offsets_.empty()) {
    coding::AppendGroupVarInt32Sequence(&buffer_, 0, &offsets_[0], offsets_.size());
  }

  *slices = { Slice(buffer_) };
}

int BinaryPlainBlockBuilder::Add(const uint8_t *vals, size_t count) {
  DCHECK(!finished_);
  DCHECK_GT(count, 0);
  size_t i = 0;

  // If the block is full, should stop adding more items.
  while (!IsBlockFull() && i < count) {

    // Every fourth entry needs a gvint selector byte
    // TODO(todd): does it cost a lot to account these things specifically?
    // maybe cheaper to just over-estimate - allocation is cheaper than math?
    if (offsets_.size() % 4 == 0) {
      size_estimate_++;
    }

    const Slice *src = reinterpret_cast<const Slice *>(vals);
    size_t offset = buffer_.size();
    offsets_.push_back(offset);
    size_estimate_ += coding::CalcRequiredBytes32(offset);

    buffer_.append(src->data(), src->size());
    size_estimate_ += src->size();

    i++;
    vals += sizeof(Slice);
  }

  end_of_data_offset_ = buffer_.size();

  return i;
}


size_t BinaryPlainBlockBuilder::Count() const {
  return offsets_.size();
}

Status BinaryPlainBlockBuilder::GetKeyAtIdx(void *key_void, int idx) const {
  Slice *slice = reinterpret_cast<Slice *>(key_void);

  if (idx >= offsets_.size()) {
    return Status::InvalidArgument("index too large");
  }

  if (offsets_.empty()) {
    return Status::NotFound("no keys in data block");
  }

  if (PREDICT_FALSE(offsets_.size() == 1)) {
    *slice = Slice(&buffer_[kHeaderSize],
                   end_of_data_offset_ - kHeaderSize);
  } else if (idx + 1 == offsets_.size()) {
    *slice = Slice(&buffer_[offsets_[idx]],
                   end_of_data_offset_ - offsets_[idx]);
  } else {
    *slice = Slice(&buffer_[offsets_[idx]],
                   offsets_[idx + 1] - offsets_[idx]);
  }
  return Status::OK();
}

Status BinaryPlainBlockBuilder::GetFirstKey(void *key_void) const {
  CHECK(finished_);
  return GetKeyAtIdx(key_void, 0);
}

Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
  CHECK(finished_);
  return GetKeyAtIdx(key_void, offsets_.size() - 1);
}

////////////////////////////////////////////////////////////
// Decoding
////////////////////////////////////////////////////////////

BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
    : block_(std::move(block)),
      data_(block_->data()),
      parsed_(false),
      num_elems_(0),
      ordinal_pos_base_(0),
      cur_idx_(0) {
}
BinaryPlainBlockDecoder::~BinaryPlainBlockDecoder() = default;

Status BinaryPlainBlockDecoder::ParseHeader() {
  CHECK(!parsed_);

  if (data_.size() < kMinHeaderSize) {
    return Status::Corruption(
      strings::Substitute("not enough bytes for header: string block header "
        "size ($0) less than minimum possible header length ($1)",
        data_.size(), kMinHeaderSize));
  }

  // Decode header.
  ordinal_pos_base_  = DecodeFixed32(&data_[0]);
  num_elems_         = DecodeFixed32(&data_[4]);
  size_t offsets_pos = DecodeFixed32(&data_[8]);

  // Sanity check.
  if (offsets_pos > data_.size()) {
    return Status::Corruption(
      StringPrintf("offsets_pos %ld > block size %ld in plain string block",
                   offsets_pos, data_.size()));
  }

  // Decode the string offsets themselves
  const uint8_t *p = data_.data() + offsets_pos;
  const uint8_t *limit = data_.data() + data_.size();

  // Reserve one extra element, which we'll fill in at the end
  // with an offset past the last element.
  offsets_buf_.resize(sizeof(uint32_t) * (num_elems_ + 1));
  uint32_t* dst_ptr = reinterpret_cast<uint32_t*>(offsets_buf_.data());
  size_t rem = num_elems_;
  while (rem >= 4) {
    if (PREDICT_TRUE(p + 16 < limit)) {
      #ifndef __aarch64__
      p = coding::DecodeGroupVarInt32_SSE(
          p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
      #else
      p = coding::DecodeGroupVarInt32_SlowButSafe(
          p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
      #endif //__aarch64__
      // The above function should add at most 17 (4 32-bit ints plus a selector byte) to
      // 'p'. Thus, since we checked that (p + 16 < limit) above, we are guaranteed that
      // (p <= limit) now.

      DCHECK_LE(p, limit);
    } else {
      p = coding::DecodeGroupVarInt32_SlowButSafe(
          p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
      if (PREDICT_FALSE(p > limit)) {
        // Only need to check 'p' overrun in the slow path, because 'p' may have
        // been within 16 bytes of 'limit'.
        LOG(WARNING) << "bad block: " << HexDump(data_);
        return Status::Corruption(StringPrintf("unable to decode offsets in block"));
      }
    }
    dst_ptr += 4;
    rem -= 4;
  }

  if (rem > 0) {
    uint32_t ints[4];
    p = coding::DecodeGroupVarInt32_SlowButSafe(p, &ints[0], &ints[1], &ints[2], &ints[3]);
    if (PREDICT_FALSE(p > limit)) {
      LOG(WARNING) << "bad block: " << HexDump(data_);
      return Status::Corruption(
        StringPrintf("unable to decode offsets in block"));
    }

    for (int i = 0; i < rem; i++) {
      *dst_ptr++ = ints[i];
    }
  }

  // Add one extra entry pointing after the last item to make the indexing easier.
  *dst_ptr++ = offsets_pos;

  parsed_ = true;

  return Status::OK();
}

void BinaryPlainBlockDecoder::SeekToPositionInBlock(uint pos) {
  if (PREDICT_FALSE(num_elems_ == 0)) {
    DCHECK_EQ(0, pos);
    return;
  }

  DCHECK_LE(pos, num_elems_);
  cur_idx_ = pos;
}

Status BinaryPlainBlockDecoder::SeekAtOrAfterValue(const void *value_void, bool *exact) {
  DCHECK(value_void != nullptr);

  const Slice &target = *reinterpret_cast<const Slice *>(value_void);

  // Binary search in restart array to find the first restart point
  // with a key >= target
  uint32_t left = 0;
  uint32_t right = num_elems_;
  while (left != right) {
    uint32_t mid = (left + right) / 2;
    Slice mid_key(string_at_index(mid));
    int c = mid_key.compare(target);
    if (c < 0) {
      left = mid + 1;
    } else if (c > 0) {
      right = mid;
    } else {
      cur_idx_ = mid;
      *exact = true;
      return Status::OK();
    }
  }
  *exact = false;
  cur_idx_ = left;
  if (cur_idx_ == num_elems_) {
    return Status::NotFound("after last key in block");
  }

  return Status::OK();
}

template <typename CellHandler>
Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, CellHandler c) {
  DCHECK(parsed_);
  CHECK_EQ(dst->type_info()->physical_type(), BINARY);
  DCHECK_LE(*n, dst->nrows());
  DCHECK_EQ(dst->stride(), sizeof(Slice));
  if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
    *n = 0;
    return Status::OK();
  }
  size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));

  Slice *out = reinterpret_cast<Slice*>(dst->data());
  for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) {
    Slice elem(string_at_index(cur_idx_));
    c(i, elem, out);
  }
  *n = max_fetch;
  return Status::OK();
}

Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
  dst->memory()->RetainReference(block_);
  return HandleBatch(n, dst, [&](size_t /*i*/, Slice elem, Slice* out) {
                               *out = elem;
  });
}

Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n,
                                                ColumnMaterializationContext* ctx,
                                                SelectionVectorView* sel,
                                                ColumnDataView* dst) {
  bool retain_block = false;
  ctx->SetDecoderEvalSupported();
  Status s = HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out) {
    if (!sel->TestBit(i)) {
      return;
    }
    if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
      *out = elem;
      retain_block = true;
    } else {
      sel->ClearBit(i);
    }
  });
  if (PREDICT_TRUE(s.ok() && retain_block)) {
    dst->memory()->RetainReference(block_);
  }
  return s;
}


} // namespace cfile
} // namespace kudu
