// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/common/schema.h"

#include <algorithm>
#include <unordered_set>

#include "kudu/common/row.h"
#include "kudu/common/rowblock.h" // IWYU pragma: keep
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/malloc.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"

using std::string;
using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {

// In a new schema, we typically would start assigning column IDs at 0. However, this
// makes it likely that in many test cases, the column IDs and the column indexes are
// equal to each other, and it's easy to accidentally pass an index where we meant to pass
// an ID, without having any issues. So, in DEBUG builds, we start assigning columns at ID
// 10, ensuring that if we accidentally mix up IDs and indexes, we're likely to fire an
// assertion or bad memory access.
#ifdef NDEBUG
static const ColumnId kFirstColumnId(0);
#else
static const ColumnId  kFirstColumnId(10);
#endif

namespace {

Status FindFirstIsDeletedVirtualColumnIdx(
    const vector<ColumnSchema>& cols, int* idx) {
  for (int i = 0; i < cols.size(); i++) {
    const auto& col = cols[i];
    if (col.type_info()->type() == IS_DELETED) {
      // Enforce some properties on the virtual column that simplify our
      // implementation.
      // TODO(KUDU-2692): Consider removing these requirements.
      if (col.is_nullable()) {
        return Status::InvalidArgument(Substitute(
            "Virtual column $0 $1 must not be nullable",
            col.name(), col.TypeToString()));
      }
      if (!col.has_read_default()) {
        return Status::InvalidArgument(Substitute(
            "Virtual column $0 $1 must have a default value for read",
            col.name(), col.TypeToString()));
      }
      *idx = i;
      return Status::OK();
    }
  }
  *idx = Schema::kColumnNotFound;
  return Status::OK();
}

} // anonymous namespace

bool ColumnTypeAttributes::EqualsForType(ColumnTypeAttributes other,
                                         DataType type) const {
  switch (type) {
    case DECIMAL32:
    case DECIMAL64:
    case DECIMAL128:
      return precision == other.precision && scale == other.scale;
    case VARCHAR:
      return length == other.length;
    default:
      return true; // true because unhandled types don't use ColumnTypeAttributes.
  }
}

string ColumnTypeAttributes::ToStringForType(DataType type) const {
  switch (type) {
    case DECIMAL32:
    case DECIMAL64:
    case DECIMAL128:
      return Substitute("($0, $1)", precision, scale);
    case VARCHAR:
      return Substitute("($0)", length);
    default:
      return "";
  }
}

string ColumnStorageAttributes::ToString() const {
  const string cfile_block_size_str =
      cfile_block_size == 0 ? "" : Substitute(" $0", cfile_block_size);
  return Substitute("$0 $1$2",
                    EncodingType_Name(encoding),
                    CompressionType_Name(compression),
                    cfile_block_size_str);
}

Status ColumnSchema::ApplyDelta(const ColumnSchemaDelta& col_delta) {
  // This method does all validation up-front before making any changes to
  // the schema, so that if we return an error then we are guaranteed to
  // have had no effect
  if (type_info()->physical_type() != BINARY) {
    if (col_delta.default_value && col_delta.default_value->size() < type_info()->size()) {
      return Status::InvalidArgument("wrong size for default value");
    }
  }

  if (col_delta.new_name) {
    name_ = *col_delta.new_name;
  }

  if (col_delta.default_value) {
    const void* value = type_info()->physical_type() == BINARY ?
                        reinterpret_cast<const void*>(&(*col_delta.default_value)) :
                        reinterpret_cast<const void*>(col_delta.default_value->data());
    write_default_ = std::make_shared<Variant>(type_info()->type(), value);
  }

  if (col_delta.remove_default) {
    write_default_ = nullptr;
  }

  if (col_delta.encoding) {
    attributes_.encoding = *col_delta.encoding;
  }
  if (col_delta.compression) {
    attributes_.compression = *col_delta.compression;
  }
  if (col_delta.cfile_block_size) {
    attributes_.cfile_block_size = *col_delta.cfile_block_size;
  }
  if (col_delta.new_comment) {
    comment_ = col_delta.new_comment.value();
  }
  if (col_delta.immutable) {
    is_immutable_ = col_delta.immutable.value();
  }
  return Status::OK();
}

string ColumnSchema::ToString(uint8_t mode) const {
  return Substitute("$0 $1$2$3",
                    name_,
                    TypeToString(),
                    mode & ToStringMode::WITH_ATTRIBUTES ?
                    " " + AttrToString() : "",
                    mode & ToStringMode::WITH_COMMENTS
                    && comment_.length() ? " " + comment_ : "");
}

string ColumnSchema::TypeToString() const {
  string type_name = type_info_->name();
  ToUpperCase(type_name, &type_name);
  return Substitute("$0$1 $2$3",
                    type_name,
                    type_attributes().ToStringForType(type_info()->type()),
                    is_nullable_ ? "NULLABLE" : "NOT NULL",
                    is_immutable_ ? " IMMUTABLE" : "");
}

string ColumnSchema::AttrToString() const {
  return Substitute("$0 $1 $2",
                    attributes_.ToString(),
                    has_read_default() ? Stringify(read_default_value()) : "-",
                    has_write_default() ? Stringify(write_default_value()) : "-");
}

size_t ColumnSchema::memory_footprint_excluding_this() const {
  // Rough approximation.
  return name_.capacity();
}

size_t ColumnSchema::memory_footprint_including_this() const {
  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
}

Schema::Schema(const Schema& other)
    : name_to_index_(other.num_columns()) {
      name_to_index_.set_empty_key(StringPiece());
  CopyFrom(other);
}

Schema& Schema::operator=(const Schema& other) {
  if (&other != this) {
    CopyFrom(other);
  }
  return *this;
}

void Schema::CopyFrom(const Schema& other) {
  DCHECK_NE(this, &other);
  num_key_columns_ = other.num_key_columns_;
  cols_ = other.cols_;
  col_ids_ = other.col_ids_;
  max_col_id_ = other.max_col_id_;
  col_offsets_ = other.col_offsets_;
  id_to_index_ = other.id_to_index_;

  // We can't simply copy name_to_index_ since the StringPiece keys
  // reference the other Schema's ColumnSchema objects.
  name_to_index_.clear_no_resize();
  size_t i = 0;
  for (const auto& col : cols_) {
    // The map uses the 'name' string from within the ColumnSchema object.
    name_to_index_[col.name()] = i++;
  }

  first_is_deleted_virtual_column_idx_ = other.first_is_deleted_virtual_column_idx_;
  has_nullables_ = other.has_nullables_;
}

Schema::Schema(Schema&& other) noexcept
    : cols_(std::move(other.cols_)),
      num_key_columns_(other.num_key_columns_),
      col_ids_(std::move(other.col_ids_)),
      max_col_id_(other.max_col_id_),
      col_offsets_(std::move(other.col_offsets_)),
      name_to_index_(std::move(other.name_to_index_)),
      id_to_index_(std::move(other.id_to_index_)),
      first_is_deleted_virtual_column_idx_(other.first_is_deleted_virtual_column_idx_),
      has_nullables_(other.has_nullables_) {
}

Schema& Schema::operator=(Schema&& other) noexcept {
  if (&other != this) {
    cols_ = std::move(other.cols_);
    num_key_columns_ = other.num_key_columns_;
    col_ids_ = std::move(other.col_ids_);
    max_col_id_ = other.max_col_id_;
    col_offsets_ = std::move(other.col_offsets_);
    id_to_index_ = std::move(other.id_to_index_);
    first_is_deleted_virtual_column_idx_ = other.first_is_deleted_virtual_column_idx_;
    has_nullables_ = other.has_nullables_;
    name_to_index_ = std::move(other.name_to_index_);
  }
  return *this;
}

Status Schema::Reset(vector<ColumnSchema> cols,
                     vector<ColumnId> ids,
                     int key_columns) {
  cols_ = std::move(cols);
  num_key_columns_ = key_columns;

  if (PREDICT_FALSE(key_columns > cols_.size())) {
    return Status::InvalidArgument(
      "Bad schema", "More key columns than columns");
  }

  if (PREDICT_FALSE(key_columns < 0)) {
    return Status::InvalidArgument(
      "Bad schema", "Cannot specify a negative number of key columns");
  }

  if (PREDICT_FALSE(!ids.empty() && ids.size() != cols_.size())) {
    return Status::InvalidArgument("Bad schema",
      "The number of ids does not match with the number of columns");
  }

  // Verify that the key columns are not nullable
  for (int i = 0; i < key_columns; ++i) {
    if (PREDICT_FALSE(cols_[i].is_nullable())) {
      return Status::InvalidArgument(
        "Bad schema", Substitute("Nullable key columns are not supported: $0",
                                 cols_[i].name()));
    }
  }

  // Calculate the offset of each column in the row format.
  col_offsets_.clear();
  col_offsets_.reserve(cols_.size() + 1);  // Include space for total byte size at the end.
  size_t off = 0;
  size_t i = 0;
  name_to_index_.clear_no_resize();
  for (const ColumnSchema& col : cols_) {
    if (col.name().empty()) {
      return Status::InvalidArgument("column names must be non-empty");
    }
    // The map uses the 'name' string from within the ColumnSchema object.
    if (!InsertIfNotPresent(&name_to_index_, col.name(), i++)) {
      return Status::InvalidArgument("Duplicate column name", col.name());
    }

    col_offsets_.push_back(off);
    off += col.type_info()->size();
  }

  // Add an extra element on the end for the total
  // byte size
  col_offsets_.push_back(off);

  // Initialize IDs mapping
  col_ids_ = std::move(ids);
  id_to_index_.clear();
  max_col_id_ = 0;
  for (size_t i = 0; i < col_ids_.size(); ++i) {
    if (col_ids_[i] > max_col_id_) {
      max_col_id_ = col_ids_[i];
    }
    id_to_index_.set(col_ids_[i], i);
  }

  RETURN_NOT_OK(FindFirstIsDeletedVirtualColumnIdx(
      cols_, &first_is_deleted_virtual_column_idx_));

  // Determine whether any column is nullable
  has_nullables_ = false;
  for (const ColumnSchema& col : cols_) {
    if (col.is_nullable()) {
      has_nullables_ = true;
      break;
    }
  }

  return Status::OK();
}

Status Schema::FindColumn(Slice col_name, int* idx) const {
  DCHECK(idx);
  StringPiece sp(reinterpret_cast<const char*>(col_name.data()), col_name.size());
  *idx = find_column(sp);
  if (PREDICT_FALSE(*idx == kColumnNotFound)) {
    return Status::NotFound("No such column", col_name);
  }
  return Status::OK();
}

Status Schema::CreateProjectionByNames(const vector<StringPiece>& col_names,
                                       Schema* out) const {
  vector<ColumnId> ids;
  vector<ColumnSchema> cols;
  for (const StringPiece& name : col_names) {
    const int idx = find_column(name);
    if (idx == kColumnNotFound) {
      return Status::NotFound("column not found", name);
    }
    if (has_column_ids()) {
      ids.push_back(column_id(idx));
    }
    cols.push_back(column(idx));
  }
  return out->Reset(std::move(cols), std::move(ids), 0);
}

Status Schema::CreateProjectionByIdsIgnoreMissing(const vector<ColumnId>& col_ids,
                                                  Schema* out) const {
  vector<ColumnSchema> cols;
  vector<ColumnId> filtered_col_ids;
  for (ColumnId id : col_ids) {
    const auto idx = find_column_by_id(id);
    if (idx == kColumnNotFound) {
      continue;
    }
    cols.push_back(column(idx));
    filtered_col_ids.push_back(id);
  }
  return out->Reset(std::move(cols), std::move(filtered_col_ids), 0);
}

Schema Schema::CopyWithColumnIds() const {
  CHECK(!has_column_ids());
  vector<ColumnId> ids;
  ids.reserve(num_columns());
  for (int32_t i = 0; i < num_columns(); i++) {
    ids.emplace_back(kFirstColumnId + i);
  }
  return Schema(cols_, ids, num_key_columns_);
}

Schema Schema::CopyWithoutColumnIds() const {
  return Schema(cols_, num_key_columns_);
}

Status Schema::VerifyProjectionCompatibility(const Schema& projection) const {
  DCHECK(has_column_ids()) "The server schema must have IDs";

  if (projection.has_column_ids()) {
    return Status::InvalidArgument("User requests should not have Column IDs");
  }

  vector<string> missing_columns;
  for (const ColumnSchema& pcol : projection.columns()) {
    if (pcol.type_info()->is_virtual()) {
      // Virtual columns may appear in a projection without appearing in the
      // schema being projected onto.
      continue;
    }
    int index = find_column(pcol.name());
    if (index < 0) {
      missing_columns.push_back(pcol.name());
    } else if (!pcol.EqualsType(cols_[index])) {
      // TODO(matteo): We don't support query with type adaptors yet.
      return Status::InvalidArgument("The column '" + pcol.name() + "' must have type " +
                                     cols_[index].TypeToString() + " found " + pcol.TypeToString());
    }
  }

  if (!missing_columns.empty()) {
    return Status::InvalidArgument("Some columns are not present in the current schema",
                                   JoinStrings(missing_columns, ", "));
  }
  return Status::OK();
}


Status Schema::GetMappedReadProjection(const Schema& projection,
                                       Schema* mapped_projection) const {
  // - The user projection may have different columns from the ones on the tablet
  // - User columns non present in the tablet are considered errors
  // - The user projection is not supposed to have the defaults or the nullable
  //   information on each field. The current tablet schema is supposed to.
  // - Each CFileSet may have a different schema and each CFileSet::Iterator
  //   must use projection from the CFileSet schema to the mapped user schema.
  RETURN_NOT_OK(VerifyProjectionCompatibility(projection));

  // Get the Projection Mapping
  vector<ColumnSchema> mapped_cols;
  vector<ColumnId> mapped_ids;

  mapped_cols.reserve(projection.num_columns());
  mapped_ids.reserve(projection.num_columns());

  int32_t proj_max_col_id = max_col_id_;
  for (const ColumnSchema& col : projection.columns()) {
    int index = find_column(col.name());
    if (col.type_info()->is_virtual()) {
      DCHECK_EQ(kColumnNotFound, index) << "virtual column not expected in tablet schema";
      DCHECK(!col.is_nullable()); // enforced by Schema constructor
      DCHECK(col.has_read_default()); // enforced by Schema constructor
      mapped_cols.push_back(col);
      // Generate a "fake" column id for virtual columns.
      mapped_ids.emplace_back(++proj_max_col_id);
      continue;
    }
    DCHECK_GE(index, 0) << col.name();
    mapped_cols.push_back(cols_[index]);
    mapped_ids.push_back(col_ids_[index]);
  }

  CHECK_OK(mapped_projection->Reset(mapped_cols, mapped_ids, projection.num_key_columns()));
  return Status::OK();
}

string Schema::ToString(uint8_t mode) const {
  if (cols_.empty()) return "()";

  vector<string> pk_strs;
  pk_strs.reserve(num_key_columns_);
  for (size_t i = 0; i < num_key_columns_; ++i) {
    pk_strs.push_back(cols_[i].name());
  }

  uint8_t col_mode = ColumnSchema::ToStringMode::WITHOUT_ATTRIBUTES;
  if (mode & ToStringMode::WITH_COLUMN_ATTRIBUTES) {
    col_mode |= ColumnSchema::ToStringMode::WITH_ATTRIBUTES;
  }
  if (mode & ToStringMode::WITH_COLUMN_COMMENTS) {
    col_mode |= ColumnSchema::ToStringMode::WITH_COMMENTS;
  }
  vector<string> col_strs;
  if (has_column_ids() && mode & ToStringMode::WITH_COLUMN_IDS) {
    for (size_t i = 0; i < cols_.size(); ++i) {
      col_strs.push_back(Substitute("$0:$1", col_ids_[i], cols_[i].ToString(col_mode)));
    }
  } else {
    for (const ColumnSchema& col : cols_) {
      col_strs.push_back(col.ToString(col_mode));
    }
  }

  return StrCat("(\n    ",
                JoinStrings(col_strs, ",\n    "),
                ",\n    ",
                "PRIMARY KEY (",
                JoinStrings(pk_strs, ", "),
                ")",
                "\n)");
}

template <class RowType>
Status Schema::DecodeRowKey(Slice encoded_key,
                            RowType* row,
                            Arena* arena) const {
  for (size_t col_idx = 0; col_idx < num_key_columns(); ++col_idx) {
    const ColumnSchema& col = column(col_idx);
    const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(col.type_info());
    bool is_last = col_idx == (num_key_columns() - 1);
    RETURN_NOT_OK_PREPEND(key_encoder.Decode(&encoded_key,
                                             is_last,
                                             arena,
                                             row->mutable_cell_ptr(col_idx)),
                          Substitute("Error decoding composite key component '$0'",
                                     col.name()));
  }
  return Status::OK();
}

string Schema::DebugEncodedRowKey(Slice encoded_key, StartOrEnd start_or_end) const {
  if (encoded_key.empty()) {
    switch (start_or_end) {
      case START_KEY: return "<start of table>";
      case END_KEY:   return "<end of table>";
    }
  }

  Arena arena(256);
  uint8_t* buf = reinterpret_cast<uint8_t*>(arena.AllocateBytes(key_byte_size()));
  ContiguousRow row(this, buf);
  Status s = DecodeRowKey(encoded_key, &row, &arena);
  if (!s.ok()) {
    return "<invalid key: " + s.ToString() + ">";
  }
  return DebugRowKey(row);
}

size_t Schema::memory_footprint_excluding_this() const {
  size_t size = 0;
  for (const ColumnSchema& col : cols_) {
    size += col.memory_footprint_excluding_this();
  }

  if (cols_.capacity() > 0) {
    size += kudu_malloc_usable_size(cols_.data());
  }
  if (col_ids_.capacity() > 0) {
    size += kudu_malloc_usable_size(col_ids_.data());
  }
  if (col_offsets_.capacity() > 0) {
    size += kudu_malloc_usable_size(col_offsets_.data());
  }
  size += name_to_index_.bucket_count() * sizeof(NameToIndexMap::value_type);
  size += id_to_index_.memory_footprint_excluding_this();

  return size;
}

size_t Schema::memory_footprint_including_this() const {
  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
}

// Explicit specialization for callers outside this compilation unit.
template
Status Schema::DecodeRowKey(Slice encoded_key, RowBlockRow* row, Arena* arena) const;

// ============================================================================
//  Schema Builder
// ============================================================================
void SchemaBuilder::Reset() {
  cols_.clear();
  col_ids_.clear();
  col_names_.clear();
  num_key_columns_ = 0;
  next_id_ = kFirstColumnId;
}

void SchemaBuilder::Reset(const Schema& schema) {
  cols_ = schema.cols_;
  col_ids_ = schema.col_ids_;
  num_key_columns_ = schema.num_key_columns_;
  for (const auto& column : cols_) {
    col_names_.insert(column.name());
  }

  if (col_ids_.empty()) {
    for (int32_t i = 0; i < cols_.size(); ++i) {
      col_ids_.emplace_back(kFirstColumnId + i);
    }
  }
  if (col_ids_.empty()) {
    next_id_ = kFirstColumnId;
  } else {
    next_id_ = *std::max_element(col_ids_.begin(), col_ids_.end()) + 1;
  }
}

Status SchemaBuilder::AddKeyColumn(const string& name, DataType type) {
  return AddColumn(ColumnSchema(name, type), true);
}

Status SchemaBuilder::AddColumn(const string& name,
                                DataType type,
                                bool is_nullable,
                                const void* read_default,
                                const void* write_default) {
  return AddColumn(name, type, is_nullable, false, read_default, write_default);
}

Status SchemaBuilder::AddColumn(const std::string& name,
                                DataType type,
                                bool is_nullable,
                                bool is_immutable,
                                const void* read_default,
                                const void* write_default) {
  if (name.empty()) {
    return Status::InvalidArgument("column name must be non-empty");
  }

  return AddColumn(ColumnSchema(name, type, is_nullable, is_immutable,
                                read_default, write_default), false);
}

Status SchemaBuilder::RemoveColumn(const string& name) {
  unordered_set<string>::const_iterator it_names = col_names_.find(name);
  if (it_names == col_names_.end()) {
    return Status::NotFound("The specified column does not exist", name);
  }

  col_names_.erase(it_names);
  for (int i = 0; i < cols_.size(); ++i) {
    if (name == cols_[i].name()) {
      cols_.erase(cols_.begin() + i);
      col_ids_.erase(col_ids_.begin() + i);
      if (i < num_key_columns_) {
        num_key_columns_--;
      }
      return Status::OK();
    }
  }

  LOG(FATAL) << "Should not reach here";
  return Status::Corruption("Unable to remove existing column");
}

Status SchemaBuilder::RenameColumn(const string& old_name, const string& new_name) {
  if (new_name.empty()) {
    return Status::InvalidArgument("column name must be non-empty");
  }
  // check if 'new_name' is already in use
  if (col_names_.find(new_name) != col_names_.end()) {
    return Status::AlreadyPresent("The column already exists", new_name);
  }

  // check if the 'old_name' column exists
  unordered_set<string>::const_iterator it_names = col_names_.find(old_name);
  if (it_names == col_names_.end()) {
    return Status::NotFound("The specified column does not exist", old_name);
  }

  col_names_.erase(it_names);   // TODO(wdb): Should this one stay and marked as alias?
  col_names_.insert(new_name);

  for (ColumnSchema& col_schema : cols_) {
    if (old_name == col_schema.name()) {
      col_schema.set_name(new_name);
      return Status::OK();
    }
  }

  LOG(FATAL) << "Should not reach here";
  return Status::IllegalState("Unable to rename existing column");
}

Status SchemaBuilder::AddColumn(const ColumnSchema& column, bool is_key) {
  if (ContainsKey(col_names_, column.name())) {
    return Status::AlreadyPresent("The column already exists", column.name());
  }

  col_names_.insert(column.name());
  if (is_key) {
    cols_.insert(cols_.begin() + num_key_columns_, column);
    col_ids_.insert(col_ids_.begin() + num_key_columns_, next_id_);
    ++num_key_columns_;
  } else {
    cols_.push_back(column);
    col_ids_.push_back(next_id_);
  }

  next_id_ = ColumnId(next_id_ + 1);
  return Status::OK();
}

Status SchemaBuilder::ApplyColumnSchemaDelta(const ColumnSchemaDelta& col_delta) {
  // if the column will be renamed, check if 'new_name' is already in use
  if (col_delta.new_name && ContainsKey(col_names_, *col_delta.new_name)) {
    return Status::AlreadyPresent("The column already exists", *col_delta.new_name);
  }

  // check if the column exists
  unordered_set<string>::const_iterator it_names = col_names_.find(col_delta.name);
  if (it_names == col_names_.end()) {
    return Status::NotFound("The specified column does not exist", col_delta.name);
  }

  for (ColumnSchema& col_schema : cols_) {
    if (col_delta.name == col_schema.name()) {
      RETURN_NOT_OK(col_schema.ApplyDelta(col_delta));
      if (col_delta.new_name) {
        // TODO(wdb): Should the old one stay, marked as an alias?
        col_names_.erase(it_names);
        col_names_.insert(*col_delta.new_name);
      }
      return Status::OK();
    }
  }

  LOG(FATAL) << "Should not reach here";
}

} // namespace kudu
