// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/common/wire_protocol.h"

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <list>
#include <numeric>
#include <optional>
#include <ostream>
#include <random>
#include <string>
#include <tuple>
#include <type_traits>
#include <vector>

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/column_predicate.h"
#include "kudu/common/columnar_serialization.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/int128.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"  // IWYU pragma: keep
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

using std::optional;
using std::string;
using std::tuple;
using std::vector;
using strings::Substitute;

namespace kudu {

class WireProtocolTest : public KuduTest {
 public:
  WireProtocolTest()
      : schema_({ ColumnSchema("string", STRING),
                  ColumnSchema("nullable_string", STRING, /* is_nullable=*/true),
                  ColumnSchema("int", INT32),
                  ColumnSchema("nullable_int", INT32, /* is_nullable=*/true),
                  ColumnSchema("int64", INT64) },
        1),
        test_data_arena_(4096) {
  }

  static void FillRowBlockWithTestRows(RowBlock* block) {
    Random rng(SeedRandom());

    block->selection_vector()->SetAllTrue();

    for (int i = 0; i < block->nrows(); i++) {
      if (rng.OneIn(10)) {
        block->selection_vector()->SetRowUnselected(i);
        continue;
      }

      RowBlockRow row = block->row(i);

      // We make new copies of these strings into the Arena for each row so that
      // the workload is more realistic. If we just re-use the same Slice object
      // for each row, the memory accesses fit entirely into a smaller number of
      // cache lines and we may micro-optimize for the wrong thing.
      CHECK(block->arena()->RelocateSlice(
          "hello world col0",
          reinterpret_cast<Slice*>(row.mutable_cell_ptr(0))));

      if (rng.OneIn(3)) {
        row.cell(1).set_null(true);
      } else {
        row.cell(1).set_null(false);
        CHECK(block->arena()->RelocateSlice(
            "hello world col1",
            reinterpret_cast<Slice*>(row.mutable_cell_ptr(1))));
      }

      *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(2)) = i;
      *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
      row.cell(3).set_null(rng.OneIn(7));

      *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
    }
  }

 protected:
  Schema schema_;
  Arena test_data_arena_;
};

TEST_F(WireProtocolTest, TestOKStatus) {
  Status s = Status::OK();
  AppStatusPB pb;
  StatusToPB(s, &pb);
  EXPECT_EQ(AppStatusPB::OK, pb.code());
  EXPECT_FALSE(pb.has_message());
  EXPECT_FALSE(pb.has_posix_code());

  Status s2 = StatusFromPB(pb);
  ASSERT_OK(s2);
}

TEST_F(WireProtocolTest, TestBadStatus) {
  Status s = Status::NotFound("foo", "bar");
  AppStatusPB pb;
  StatusToPB(s, &pb);
  EXPECT_EQ(AppStatusPB::NOT_FOUND, pb.code());
  EXPECT_TRUE(pb.has_message());
  EXPECT_EQ("foo: bar", pb.message());
  EXPECT_FALSE(pb.has_posix_code());

  Status s2 = StatusFromPB(pb);
  EXPECT_TRUE(s2.IsNotFound());
  EXPECT_EQ(s.ToString(), s2.ToString());
}

TEST_F(WireProtocolTest, TestBadStatusWithPosixCode) {
  Status s = Status::NotFound("foo", "bar", 1234);
  AppStatusPB pb;
  StatusToPB(s, &pb);
  EXPECT_EQ(AppStatusPB::NOT_FOUND, pb.code());
  EXPECT_TRUE(pb.has_message());
  EXPECT_EQ("foo: bar", pb.message());
  EXPECT_TRUE(pb.has_posix_code());
  EXPECT_EQ(1234, pb.posix_code());

  Status s2 = StatusFromPB(pb);
  EXPECT_TRUE(s2.IsNotFound());
  EXPECT_EQ(1234, s2.posix_code());
  EXPECT_EQ(s.ToString(), s2.ToString());
}

TEST_F(WireProtocolTest, TestSchemaRoundTrip) {
  google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;

  ASSERT_OK(SchemaToColumnPBs(schema_, &pbs));
  ASSERT_EQ(5, pbs.size());

  // Column 0.
  EXPECT_TRUE(pbs.Get(0).is_key());
  EXPECT_EQ("string", pbs.Get(0).name());
  EXPECT_EQ(STRING, pbs.Get(0).type());
  EXPECT_FALSE(pbs.Get(0).is_nullable());

  // Column 1.
  EXPECT_FALSE(pbs.Get(1).is_key());
  EXPECT_EQ("nullable_string", pbs.Get(1).name());
  EXPECT_EQ(STRING, pbs.Get(1).type());
  EXPECT_TRUE(pbs.Get(1).is_nullable());

  // Column 2.
  EXPECT_FALSE(pbs.Get(2).is_key());
  EXPECT_EQ("int", pbs.Get(2).name());
  EXPECT_EQ(INT32, pbs.Get(2).type());
  EXPECT_FALSE(pbs.Get(2).is_nullable());


  // Column 3.
  EXPECT_FALSE(pbs.Get(3).is_key());
  EXPECT_EQ("nullable_int", pbs.Get(3).name());
  EXPECT_EQ(INT32, pbs.Get(3).type());
  EXPECT_TRUE(pbs.Get(3).is_nullable());

  // Column 4.
  EXPECT_FALSE(pbs.Get(4).is_key());
  EXPECT_EQ("int64", pbs.Get(4).name());
  EXPECT_EQ(INT64, pbs.Get(4).type());
  EXPECT_FALSE(pbs.Get(4).is_nullable());

  // Convert back to a Schema object and verify they're identical.
  Schema schema2;
  ASSERT_OK(ColumnPBsToSchema(pbs, &schema2));
  EXPECT_EQ(schema_.ToString(), schema2.ToString());
  EXPECT_EQ(schema_.num_key_columns(), schema2.num_key_columns());
}

// Test that, when non-contiguous key columns are passed, an error Status
// is returned.
TEST_F(WireProtocolTest, TestBadSchema_NonContiguousKey) {
  google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;

  // Column 0: key
  ColumnSchemaPB* col_pb = pbs.Add();
  col_pb->set_name("c0");
  col_pb->set_type(STRING);
  col_pb->set_is_key(true);

  // Column 1: not a key
  col_pb = pbs.Add();
  col_pb->set_name("c1");
  col_pb->set_type(STRING);
  col_pb->set_is_key(false);

  // Column 2: marked as key. This is an error.
  col_pb = pbs.Add();
  col_pb->set_name("c2");
  col_pb->set_type(STRING);
  col_pb->set_is_key(true);

  Schema schema;
  Status s = ColumnPBsToSchema(pbs, &schema);
  ASSERT_STR_CONTAINS(s.ToString(), "Got out-of-order key column");
}

// Test that, when multiple columns with the same name are passed, an
// error Status is returned.
TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) {
  google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;

  // Column 0:
  ColumnSchemaPB* col_pb = pbs.Add();
  col_pb->set_name("c0");
  col_pb->set_type(STRING);
  col_pb->set_is_key(true);

  // Column 1:
  col_pb = pbs.Add();
  col_pb->set_name("c1");
  col_pb->set_type(STRING);
  col_pb->set_is_key(false);

  // Column 2: same name as column 0
  col_pb = pbs.Add();
  col_pb->set_name("c0");
  col_pb->set_type(STRING);
  col_pb->set_is_key(false);

  Schema schema;
  Status s = ColumnPBsToSchema(pbs, &schema);
  ASSERT_EQ("Invalid argument: Duplicate column name: c0", s.ToString());
}

// Create a block of rows and ensure that it can be converted to and from protobuf.
TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
  RowBlockMemory mem(1024);
  RowBlock block(&schema_, 30, &mem);
  FillRowBlockWithTestRows(&block);

  // Convert to PB.
  faststring direct, indirect;
  int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
  SCOPED_TRACE("Row data: " + direct.ToString());
  SCOPED_TRACE("Indirect data: " + indirect.ToString());

  // Convert back to a row, ensure that the resulting row is the same
  // as the one we put in.
  RowwiseRowBlockPB pb;
  pb.set_num_rows(num_rows);

  vector<const uint8_t*> row_ptrs;
  Slice direct_sidecar = direct;
  ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect,
                                      &direct_sidecar, &row_ptrs));
  ASSERT_EQ(block.selection_vector()->CountSelected(), row_ptrs.size());
  int dst_row_idx = 0;
  for (int i = 0; i < block.nrows(); ++i) {
    if (!block.selection_vector()->IsRowSelected(i)) {
      continue;
    }
    ConstContiguousRow row_roundtripped(&schema_, row_ptrs[dst_row_idx]);
    EXPECT_EQ(schema_.DebugRow(block.row(i)),
              schema_.DebugRow(row_roundtripped));
    dst_row_idx++;
  }
}

// Create blocks of rows and ensure that they can be converted to the columnar serialized
// layout.
TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
  // Generate several blocks of random data.
  static constexpr int kNumBlocks = 3;
  static constexpr int kBatchSizeBytes = 8192 * 1024;
  RowBlockMemory mem(1024);
  std::list<RowBlock> blocks;
  for (int i = 0; i < kNumBlocks; i++) {
    blocks.emplace_back(&schema_, 30, &mem);
    FillRowBlockWithTestRows(&blocks.back());
  }

  // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
  ColumnarSerializedBatch batch(schema_, schema_, kBatchSizeBytes);
  for (const auto& block : blocks) {
    batch.AddRowBlock(block);
  }

  // Verify that the resulting serialized data matches the concatenated original data blocks.
  ASSERT_EQ(5, batch.columns().size());
  int dst_row_idx = 0;
  for (const auto& block : blocks) {
    for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
      if (!block.selection_vector()->IsRowSelected(src_row_idx)) {
        continue;
      }
      SCOPED_TRACE(src_row_idx);
      SCOPED_TRACE(dst_row_idx);
      const auto& row = block.row(src_row_idx);
      for (int c = 0; c < schema_.num_columns(); c++) {
        SCOPED_TRACE(c);
        const auto& col = schema_.column(c);
        const auto& serialized_col = batch.columns()[c];
        if (col.is_nullable()) {
          bool expect_null = row.is_null(c);;
          EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
                    expect_null);
          if (expect_null) {
            continue;
          }
        }
        int type_size = col.type_info()->size();
        Slice serialized_val;
        Slice orig_val;
        if (col.type_info()->physical_type() == BINARY) {
          const uint8_t* offset_ptr = serialized_col.data.data() + sizeof(uint32_t) * dst_row_idx;
          uint32_t start_offset = UnalignedLoad<uint32_t>(offset_ptr);
          uint32_t end_offset = UnalignedLoad<uint32_t>(offset_ptr + sizeof(uint32_t));
          ASSERT_GE(end_offset, start_offset);
          serialized_val = Slice(serialized_col.varlen_data->data() + start_offset,
                                 end_offset - start_offset);
          memcpy(&orig_val, row.cell_ptr(c), type_size);
        } else {
          serialized_val = Slice(serialized_col.data.data() + type_size * dst_row_idx,
                                 type_size);
          orig_val = Slice(row.cell_ptr(c), type_size);
        }
        EXPECT_EQ(orig_val, serialized_val);
      }
      dst_row_idx++;
    }
  }
}


// Create a block of rows in columnar layout and ensure that it can be
// converted to and from protobuf.
TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
  int kNumRows = 10;
  RowBlockMemory mem(1024);
  // Create a schema with multiple UNIXTIME_MICROS columns in different
  // positions.
  Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS),
                         ColumnSchema("col1", STRING),
                         ColumnSchema("col2", UNIXTIME_MICROS),
                         ColumnSchema("col3", INT32, true /* nullable */),
                         ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1);
  RowBlock block(&tablet_schema, kNumRows, &mem);
  block.selection_vector()->SetAllTrue();

  for (int i = 0; i < block.nrows(); i++) {
    RowBlockRow row = block.row(i);

    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(0)) = i;
    Slice col1;
    // See: FillRowBlockWithTestRows() for the reason why we relocate these
    // to 'test_data_arena_'.
    CHECK(test_data_arena_.RelocateSlice("hello world col1", &col1));
    *reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)) = col1;
    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(2)) = i;
    *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
    row.cell(3).set_null(false);
    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
    row.cell(4).set_null(true);
  }

  // Have the projection schema have columns in a different order from the table schema.
  Schema proj_schema({ ColumnSchema("col1", STRING),
                       ColumnSchema("key",  UNIXTIME_MICROS),
                       ColumnSchema("col2", UNIXTIME_MICROS),
                       ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */),
                       ColumnSchema("col3", INT32, true /* nullable */)}, 0);

  // Convert to PB.
  faststring direct, indirect;
  int num_rows = SerializeRowBlock(block, &proj_schema, &direct, &indirect,
                                   true /* pad timestamps */);
  SCOPED_TRACE("Row data: " + HexDump(direct));
  SCOPED_TRACE("Indirect data: " + HexDump(indirect));

  // Convert back to a row, ensure that the resulting row is the same
  // as the one we put in. Can't reuse the decoding methods since we
  // won't support decoding padded rows within Kudu.
  RowwiseRowBlockPB pb;
  pb.set_num_rows(num_rows);
  vector<const uint8_t*> row_ptrs;
  Slice direct_sidecar = direct;
  Slice indirect_sidecar = indirect;
  ASSERT_OK(RewriteRowBlockPointers(proj_schema, pb, indirect_sidecar, &direct_sidecar, true));

  // Row stride is the normal size for the schema + the number of UNIXTIME_MICROS columns * 8,
  // the size of the padding per column.
  size_t row_stride = ContiguousRowHelper::row_size(proj_schema) + 3 * 8;
  ASSERT_EQ(direct_sidecar.size(), row_stride * kNumRows);
  const uint8_t* base_data;
  for (int i = 0; i < kNumRows; i++) {
    base_data = direct_sidecar.data() + i * row_stride;
    // With padding, the null bitmap is at offset 68.
    // See the calculations below to understand why.
    const uint8_t* non_null_bitmap = base_data + 68;

    // 'col1' comes at 0 bytes offset in the projection schema.
    const Slice* col1 = reinterpret_cast<const Slice*>(base_data);
    ASSERT_EQ(Slice("hello world col1"), *col1)
        << "Unexpected val for the " << i << "th row:" << col1->ToDebugString();
    // 'key' comes at 16 bytes offset.
    const int64_t key = *reinterpret_cast<const int64_t*>(base_data + 16);
    EXPECT_EQ(key, i);

    // 'col2' comes at 32 bytes offset: 16 bytes previous, 16 bytes 'key'
    const int64_t col2 = *reinterpret_cast<const int64_t*>(base_data + 32);
    EXPECT_EQ(col2, i);

    // 'col4' is supposed to be null, but should also read 0 since we memsetted the
    // memory to 0. It should come at 48 bytes offset:  32 bytes previous + 8 bytes 'col2' +
    // 8 bytes padding.
    const int64_t col4 = *reinterpret_cast<const int64_t*>(base_data + 48);
    EXPECT_EQ(col4, 0);
    EXPECT_TRUE(BitmapTest(non_null_bitmap, 3));

    // 'col3' comes at 64 bytes offset: 48 bytes previous, 8 bytes 'col4', 8 bytes padding
    const int32_t col3 = *reinterpret_cast<const int32_t*>(base_data + 64);
    EXPECT_EQ(col3, i);
    EXPECT_FALSE(BitmapTest(non_null_bitmap, 4));
  }
}

struct RowwiseConverter {
  static void Run(const RowBlock& block) {
    faststring direct;
    faststring indirect;
    SerializeRowBlock(block, nullptr, &direct, &indirect);
  }

  static constexpr const char* kName = "row-wise";
};


struct ColumnarConverter {
  static void Run(const RowBlock& block) {
    constexpr int kBatchSizeBytes = 8192 * 1024;
    ColumnarSerializedBatch batch(*block.schema(), *block.schema(), kBatchSizeBytes);
    batch.AddRowBlock(block);
  }

  static constexpr const char* kName = "columnar";
};

struct BenchmarkColumnsSpec {
  struct Col {
    DataType type;
    double null_fraction; // negative for non-null
  };
  vector<Col> columns;
  string name;
};

class WireProtocolBenchmark :
      public WireProtocolTest,
      public testing::WithParamInterface<tuple<BenchmarkColumnsSpec, double>> {
 public:

  void ResetBenchmarkSchema(const BenchmarkColumnsSpec& spec) {
    vector<ColumnSchema> column_schemas;
    int i = 0;
    for (const auto& c : spec.columns) {
      column_schemas.emplace_back(Substitute("col$0", i++),
                                  c.type,
                                  /*nullable=*/c.null_fraction >= 0);
    }
    CHECK_OK(benchmark_schema_.Reset(std::move(column_schemas), 0));
  }

  void FillRowBlockForBenchmark(const BenchmarkColumnsSpec& spec,
                                RowBlock* block) {
    Random rng(SeedRandom());

    test_data_arena_.Reset();
    for (int i = 0; i < block->nrows(); i++) {
      RowBlockRow row = block->row(i);
      for (int j = 0; j < benchmark_schema_.num_columns(); j++) {
        const ColumnSchema& column_schema = benchmark_schema_.column(j);
        DataType type = spec.columns[j].type;
        bool is_null = rng.NextDoubleFraction() <= spec.columns[j].null_fraction;
        if (column_schema.is_nullable()) {
          row.cell(j).set_null(is_null);
        }
        if (!is_null) {
          switch (type) {
            case STRING: {
              Slice col;
              CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0",
                                                              column_schema.name()), &col));
              memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice));
              break;
            }
            case INT128:
              UnalignedStore<int128_t>(row.mutable_cell_ptr(j), i);
              break;
            case INT64:
              UnalignedStore<int64_t>(row.mutable_cell_ptr(j), i);
              break;
            case INT32:
              UnalignedStore<int32_t>(row.mutable_cell_ptr(j), i);
              break;
            case INT16:
              UnalignedStore<int16_t>(row.mutable_cell_ptr(j), i);
              break;
            case INT8:
              UnalignedStore<int8_t>(row.mutable_cell_ptr(j), i);
              break;
            default:
              LOG(FATAL) << "Unexpected type: " << type;
          }
        }
      }
    }
  }

  static void SelectRandomRowsWithRate(RowBlock* block, double rate) {
    CHECK_LE(rate, 1.0);
    CHECK_GE(rate, 0.0);
    auto select_count = block->nrows() * rate;
    SelectionVector* select_vector = block->selection_vector();
    if (rate == 1.0) {
      select_vector->SetAllTrue();
    } else if (rate == 0.0) {
      select_vector->SetAllFalse();
    } else {
      vector<int> indexes(block->nrows());
      std::iota(indexes.begin(), indexes.end(), 0);
      std::mt19937 gen(SeedRandom());
      std::shuffle(indexes.begin(), indexes.end(), gen);
      indexes.resize(select_count);
      select_vector->SetAllFalse();
      for (auto index : indexes) {
        select_vector->SetRowSelected(index);
      }
    }
    CHECK_EQ(select_vector->CountSelected(), select_count);
  }


  // Use column_count to control the schema scale.
  // Use select_rate to control the number of selected rows.
  template<class Converter>
  double RunBenchmark(const BenchmarkColumnsSpec& spec,
                    double select_rate) {
    ResetBenchmarkSchema(spec);
    RowBlockMemory mem(1024);
    RowBlock block(&benchmark_schema_, 1000, &mem);
    // Regardless of the config, use a constant number of selected cells for the test by
    // looping the conversion an appropriate number of times.
    const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
    const int64_t kCellsPerBlock = block.nrows() * spec.columns.size();
    const double kSelectedCellsPerBlock = kCellsPerBlock * select_rate;
    const int kNumTrials = static_cast<int>(kNumCellsToConvert /  kSelectedCellsPerBlock);
    FillRowBlockForBenchmark(spec, &block);
    SelectRandomRowsWithRate(&block, select_rate);

    int64_t cycle_start = CycleClock::Now();
    for (int i = 0; i < kNumTrials; ++i) {
      Converter::Run(block);
    }
    int64_t cycle_end = CycleClock::Now();
    double cycles_per_cell = static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert;
    LOG(INFO) << Substitute(
        "Converting $0 to PB (method $3) row select rate $1: $2 cycles/cell",
        spec.name, select_rate, cycles_per_cell,
        Converter::kName);
    return cycles_per_cell;
  }

 protected:
  Schema benchmark_schema_;
};

TEST_P(WireProtocolBenchmark, TestRowBlockToPBBenchmark) {
  const auto& spec = std::get<0>(GetParam());
  double select_rate = std::get<1>(GetParam());
  double cycles_per_cell_rowwise = RunBenchmark<RowwiseConverter>(spec, select_rate);
  double cycles_per_cell_columnar = RunBenchmark<ColumnarConverter>(spec, select_rate);
  double ratio = cycles_per_cell_rowwise / cycles_per_cell_columnar;
  LOG(INFO) << Substitute(
      "Converting $0 to PB row select rate $1: columnar/rowwise throughput ratio: $2x",
      spec.name, select_rate, ratio);
}

BenchmarkColumnsSpec UniformColumns(int n_cols, DataType type, double null_fraction) {
  vector<BenchmarkColumnsSpec::Col> cols(n_cols);
  for (int i = 0; i < n_cols; i++) {
    cols[i] = {type, null_fraction};
  }
  string null_str;
  if (null_fraction >= 0) {
    null_str = Substitute("$0pct_null", static_cast<int>(null_fraction * 100));
  } else {
    null_str = "non_null";
  }
  return {cols, Substitute("$0_$1_$2",
                           n_cols,
                           GetTypeInfo(type)->name(),
                           null_str) };
}

INSTANTIATE_TEST_SUITE_P(
    ColumnarRowBlockToPBBenchmarkParams, WireProtocolBenchmark,
    testing::Combine(
        testing::Values(
            UniformColumns(10, INT64, -1),
            UniformColumns(10, INT32, -1),
            UniformColumns(10, STRING, -1),

            UniformColumns(10, INT128, 0),
            UniformColumns(10, INT64, 0),
            UniformColumns(10, INT32, 0),
            UniformColumns(10, INT16, 0),
            UniformColumns(10, INT8, 0),
            UniformColumns(10, STRING, 0),

            UniformColumns(10, INT128, 0.1),
            UniformColumns(10, INT64, 0.1),
            UniformColumns(10, INT32, 0.1),
            UniformColumns(10, INT16, 0.1),
            UniformColumns(10, INT8, 0.1),
            UniformColumns(10, STRING, 0.1)),
        // Selection rates.
        testing::Values(1.0, 0.8, 0.5, 0.2)),
    [](const testing::TestParamInfo<WireProtocolBenchmark::ParamType>& info) {
      return Substitute("$0_sel_$1pct",
                        std::get<0>(info.param).name,
                        static_cast<int>(std::get<1>(info.param)*100));
    });


// Test that trying to extract rows from an invalid block correctly returns
// Corruption statuses.
TEST_F(WireProtocolTest, TestInvalidRowBlock) {
  Schema schema({ ColumnSchema("col1", STRING) }, 1);
  RowwiseRowBlockPB pb;
  vector<const uint8_t*> row_ptrs;

  // Too short to be valid data.
  const char* shortstr = "x";
  pb.set_num_rows(1);
  Slice direct = shortstr;
  Status s = ExtractRowsFromRowBlockPB(schema, pb, Slice(), &direct, &row_ptrs);
  ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Row block has 1 bytes of data");

  // Bad pointer into indirect data.
  shortstr = "xxxxxxxxxxxxxxxx";
  pb.set_num_rows(1);
  direct = Slice(shortstr);
  s = ExtractRowsFromRowBlockPB(schema, pb, Slice(), &direct, &row_ptrs);
  ASSERT_STR_CONTAINS(s.ToString(),
                      "Corruption: Row #0 contained bad indirect slice");
}

// Test serializing a block which has a selection vector but no columns.
// This is the sort of result that is returned from a scan with an empty
// projection (a COUNT(*) query).
TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
  Schema empty(std::vector<ColumnSchema>(), 0);
  RowBlockMemory mem(1024);
  RowBlock block(&empty, 1000, &mem);
  block.selection_vector()->SetAllTrue();
  // Unselect 100 rows
  for (int i = 0; i < 100; i++) {
    block.selection_vector()->SetRowUnselected(i * 2);
  }
  ASSERT_EQ(900, block.selection_vector()->CountSelected());

  // Convert it to protobuf, ensure that the results look right.
  faststring direct, indirect;
  int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
  ASSERT_EQ(900, num_rows);
}

TEST_F(WireProtocolTest, TestColumnDefaultValue) {
  Slice write_default_str("Hello Write");
  Slice read_default_str("Hello Read");
  uint32_t write_default_u32 = 512;
  uint32_t read_default_u32 = 256;
  ColumnSchemaPB pb;

  ColumnSchema col1("col1", STRING);
  ColumnSchemaToPB(col1, &pb);
  optional<ColumnSchema> col1fpb;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col1fpb));
  ASSERT_FALSE(col1fpb->has_read_default());
  ASSERT_FALSE(col1fpb->has_write_default());
  ASSERT_TRUE(col1fpb->read_default_value() == nullptr);

  ColumnSchema col2("col2", STRING, false, false, &read_default_str);
  ColumnSchemaToPB(col2, &pb);
  optional<ColumnSchema> col2fpb;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col2fpb));
  ASSERT_TRUE(col2fpb->has_read_default());
  ASSERT_FALSE(col2fpb->has_write_default());
  ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col2fpb->read_default_value()));
  ASSERT_EQ(nullptr, static_cast<const Slice *>(col2fpb->write_default_value()));

  ColumnSchema col3("col3", STRING, false, false, &read_default_str, &write_default_str);
  ColumnSchemaToPB(col3, &pb);
  optional<ColumnSchema> col3fpb;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col3fpb));
  ASSERT_TRUE(col3fpb->has_read_default());
  ASSERT_TRUE(col3fpb->has_write_default());
  ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col3fpb->read_default_value()));
  ASSERT_EQ(write_default_str, *static_cast<const Slice *>(col3fpb->write_default_value()));

  ColumnSchema col4("col4", UINT32, false, false, &read_default_u32);
  ColumnSchemaToPB(col4, &pb);
  optional<ColumnSchema> col4fpb;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col4fpb));
  ASSERT_TRUE(col4fpb->has_read_default());
  ASSERT_FALSE(col4fpb->has_write_default());
  ASSERT_EQ(read_default_u32, *static_cast<const uint32_t *>(col4fpb->read_default_value()));
  ASSERT_EQ(nullptr, static_cast<const uint32_t *>(col4fpb->write_default_value()));

  ColumnSchema col5("col5", UINT32, false, false, &read_default_u32, &write_default_u32);
  ColumnSchemaToPB(col5, &pb);
  optional<ColumnSchema> col5fpb;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col5fpb));
  ASSERT_TRUE(col5fpb->has_read_default());
  ASSERT_TRUE(col5fpb->has_write_default());
  ASSERT_EQ(read_default_u32, *static_cast<const uint32_t *>(col5fpb->read_default_value()));
  ASSERT_EQ(write_default_u32, *static_cast<const uint32_t *>(col5fpb->write_default_value()));
}

// Regression test for KUDU-2378; the call to ColumnSchemaFromPB yielded a crash.
TEST_F(WireProtocolTest, TestCrashOnAlignedLoadOf128BitReadDefault) {
  ColumnSchemaPB pb;
  pb.set_name("col");
  pb.set_type(DECIMAL128);
  pb.set_read_default_value(string(16, 'a'));
  optional<ColumnSchema> col;
  ASSERT_OK(ColumnSchemaFromPB(pb, &col));
}

// Regression test for KUDU-2622; Validate read and write default value sizes.
TEST_F(WireProtocolTest, TestInvalidReadAndWriteDefault) {
  {
    ColumnSchemaPB pb;
    pb.set_name("col");
    pb.set_type(DECIMAL128);
    pb.set_read_default_value(string(8, 'a'));
    optional<ColumnSchema> col;
    Status s = ColumnSchemaFromPB(pb, &col);
    EXPECT_TRUE(s.IsCorruption());
    ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Not enough bytes for decimal: read default");
  }
  {
    ColumnSchemaPB pb;
    pb.set_name("col");
    pb.set_type(DECIMAL128);
    pb.set_write_default_value(string(8, 'a'));
    optional<ColumnSchema> col;
    Status s = ColumnSchemaFromPB(pb, &col);
    EXPECT_TRUE(s.IsCorruption());
    ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Not enough bytes for decimal: write default");
  }
}

TEST_F(WireProtocolTest, TestColumnPredicateInList) {
  ColumnSchema col1("col1", INT32);
  vector<ColumnSchema> cols = { col1 };
  Schema schema(cols, 1);
  RowBlockMemory mem(1024);
  optional<ColumnPredicate> predicate;

  { // col1 IN (5, 6, 10)
    int five = 5;
    int six = 6;
    int ten = 10;
    vector<const void*> values { &five, &six, &ten };

    kudu::ColumnPredicate cp = kudu::ColumnPredicate::InList(col1, &values);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(cp, &pb));

    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InList);
    ASSERT_EQ(3, predicate->raw_values().size());
  }

  { // col1 IN (0, 0)
    // We can't construct a single element IN list directly since it would be
    // simplified to an equality predicate, so we hack around it by directly
    // constructing it as a protobuf message.
    ColumnPredicatePB pb;
    pb.set_column("col1");
    *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
    *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);

    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
    ASSERT_EQ(PredicateType::Equality, predicate->predicate_type());
  }

  { // col1 IN ()
    ColumnPredicatePB pb;
    pb.set_column("col1");
    pb.mutable_in_list();

    RowBlockMemory mem(1024);
    optional<ColumnPredicate> predicate;
    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
    ASSERT_EQ(PredicateType::None, predicate->predicate_type());
  }

  { // IN list corruption
    ColumnPredicatePB pb;
    pb.set_column("col1");
    pb.mutable_in_list();
    *pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1);

    RowBlockMemory mem(1024);
    optional<ColumnPredicate> predicate;
    ASSERT_TRUE(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate).IsInvalidArgument());
  }
}

class BFWireProtocolTest : public KuduTest {
 public:
  BFWireProtocolTest()
      : schema_({ ColumnSchema("col1", INT32)}, 1),
        arena_(1024),
        allocator_(&arena_),
        n_keys_(100),
        b1_(&allocator_),
        b2_(&allocator_) {}

  void SetUp() override {
    int log_space_bytes1 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01);
    ASSERT_OK(b1_.Init(log_space_bytes1, FAST_HASH, 0));
    ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes1), 0.01);

    int log_space_bytes2 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01);
    ASSERT_OK(b2_.Init(log_space_bytes2, FAST_HASH, 0));
    ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes2), 0.01);

    for (int i = 0; i < n_keys_; ++i) {
      Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i));
      b1_.Insert(key_slice);
      b2_.Insert(key_slice);
    }
  }

protected:
  Schema schema_;
  Arena arena_;
  ArenaBlockBloomFilterBufferAllocator allocator_;
  int n_keys_;
  BlockBloomFilter b1_;
  BlockBloomFilter b2_;
};

TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) {
  optional<ColumnPredicate> predicate;
  ColumnSchema col1 = schema_.column(0);
  { // Single BloomFilter predicate.
    kudu::ColumnPredicate ibf =
        kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, nullptr);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate, ibf);
  }

  { // Multi BloomFilter predicate.
    kudu::ColumnPredicate ibf =
        kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, nullptr, nullptr);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate, ibf);
  }
}

TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) {
  optional<ColumnPredicate> predicate;
  ColumnSchema col1 = schema_.column(0);
  { // Simply BloomFilter with lower bound.
    int lower = 1;
    auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, nullptr);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate, ibf);
  }

  { // Single bloom filter with upper bound.
    int upper = 4;
    auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, &upper);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate, ibf);
  }

  { // Single bloom filter with both lower and upper bound.
    int lower = 1;
    int upper = 4;
    auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, &upper);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate, ibf);
  }

  { // Multi bloom filter with both lower and upper bound.
    int lower = 1;
    int upper = 4;
    auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, &lower,
                                                    &upper);
    ColumnPredicatePB pb;
    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
    ASSERT_EQ(predicate->bloom_filters().size(), ibf.bloom_filters().size());
    ASSERT_EQ(predicate, ibf);
  }
}

} // namespace kudu
