KUDU-2671 test for unbounded range with custom empty hash schema

This patch adds a couple of new test scenarios to cover a case
of a table with a range partition without the upper bound,
where the range partition has a custom empty hash schema in the presence
of non-empty table-wide hash schema.  This is to make sure:
  * it's possible to create such a table
  * the range and partitions keys for the table are encoded as expected
  * it's possible to insert rows into the table

This is a follow-up to 03451904a20123ca27eaa4e9773b94c0532fd342.

Change-Id: I2198714a8025ae4b132afc4cd4700de18df226cb
Reviewed-on: http://gerrit.cloudera.org:8080/17836
Reviewed-by: Mahesh Reddy <mreddy@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/client/flex_partitioning_client-test.cc b/src/kudu/client/flex_partitioning_client-test.cc
index cc00baa..9f23eda 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -181,17 +181,25 @@
     return table_creator->Create();
   }
 
-  RangePartition CreateRangePartition(int32_t lower_boundary = 0,
-                                      int32_t upper_boundary = 100) {
+  RangePartition CreateRangePartition(int32_t lower_bound = 0,
+                                      int32_t upper_bound = 100) {
     unique_ptr<KuduPartialRow> lower(schema_.NewRow());
-    CHECK_OK(lower->SetInt32(kKeyColumn, lower_boundary));
+    CHECK_OK(lower->SetInt32(kKeyColumn, lower_bound));
     unique_ptr<KuduPartialRow> upper(schema_.NewRow());
-    CHECK_OK(upper->SetInt32(kKeyColumn, upper_boundary));
+    CHECK_OK(upper->SetInt32(kKeyColumn, upper_bound));
     return unique_ptr<KuduTableCreator::KuduRangePartition>(
         new KuduTableCreator::KuduRangePartition(lower.release(),
                                                  upper.release()));
   }
 
+  RangePartition CreateRangePartitionNoUpperBound(int32_t lower_bound) {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    CHECK_OK(lower->SetInt32(kKeyColumn, lower_bound));
+    return unique_ptr<KuduTableCreator::KuduRangePartition>(
+        new KuduTableCreator::KuduRangePartition(lower.release(),
+                                                 schema_.NewRow()));
+  }
+
   void CheckTabletCount(const char* table_name, int expected_count) {
     shared_ptr<KuduTable> table;
     ASSERT_OK(client_->OpenTable(table_name, &table));
@@ -427,7 +435,6 @@
   // Make sure it's possible to insert rows into the table for all the existing
   // the paritions: first check the range of table-wide schema, then check
   // the ranges with custom hash schemas.
-  // TODO(aserbin): uncomment CheckTableRowsNum() once partition pruning works
   ASSERT_OK(InsertTestRows(kTableName, -111, 0));
   NO_FATALS(CheckLiveRowCount(kTableName, 111));
   ASSERT_OK(InsertTestRows(kTableName, 111, 555));
@@ -468,6 +475,59 @@
   }
 }
 
+// This test scenario creates a table with a range partition having no upper
+// bound. The range partition has a custom empty hash schema (i.e. no hash
+// bucketing for the range) in the presence of non-empty table-wide hash schema.
+TEST_F(FlexPartitioningCreateTableTest, NoUpperBoundRangeCustomHashSchema) {
+  // Create a table with the following partitions:
+  //
+  //            hash bucket
+  //   key    0           1           2
+  //         --------------------------------
+  //   0-111  x:{key}     x:{key}     x:{key}
+  // 111-222  x:{key}     x:{key}     -
+  //  >222    -           -           -
+  constexpr const char* const kTableName = "NoUpperBoundRangeCustomHashSchema";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ kKeyColumn }, 3)
+      .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 3 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(111, 222);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  // Add unbounded range partition with no hash bucketing.
+  {
+    auto p = CreateRangePartitionNoUpperBound(222);
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 6));
+
+  // Make sure it's possible to insert rows into the table for all the existing
+  // paritions.
+  ASSERT_OK(InsertTestRows(kTableName, 0, 555));
+  NO_FATALS(CheckLiveRowCount(kTableName, 555));
+}
+
 // Negative tests scenarios to cover non-OK status codes for various operations
 // related to custom hash bucket schema per range.
 TEST_F(FlexPartitioningCreateTableTest, Negatives) {
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index caa32bf..7071810 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -1371,6 +1371,162 @@
   EXPECT_EQ("", partitions[10].partition_key_end());
 }
 
+TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, PRIMARY KEY (a, b))
+  // PARTITION BY [HASH BUCKET (b), RANGE (a, b)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING) },
+                { ColumnId(0), ColumnId(1) }, 2);
+
+  PartitionSchemaPB schema_builder;
+  // Table-wide hash schema defined below.
+  AddHashDimension(&schema_builder, { "b" }, 2, 0);
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+  CheckSerializationFunctions(schema_builder, partition_schema, schema);
+
+  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)",
+            partition_schema.DebugString(schema));
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  vector<PartitionSchema::HashSchema> range_hash_schemas;
+
+  // [(_, _), (a1, _))
+  {
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    PartitionSchema::HashSchema hash_schema_3_buckets = {{{ColumnId(0)}, 3, 0}};
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_3_buckets);
+  }
+
+  // [(a2, _), (a3, _))
+  {
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2"));
+    ASSERT_OK(upper.SetStringCopy("a", "a3"));
+    bounds.emplace_back(lower, upper);
+    PartitionSchema::HashSchema hash_schema_3_buckets_by_2 = {
+        {{ColumnId(0)}, 3, 0},
+        {{ColumnId(1)}, 2, 0}
+    };
+    range_hash_schemas.emplace_back(hash_schema_3_buckets_by_2);
+  }
+
+  // [(a4, _), (_, _))
+  {
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a4"));
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(PartitionSchema::HashSchema());
+  }
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions(
+      {}, bounds, range_hash_schemas, schema, &partitions));
+  ASSERT_EQ(10, partitions.size());
+
+  {
+    const auto& p = partitions[0];
+    ASSERT_EQ(1, p.hash_buckets().size());
+    EXPECT_EQ(0, p.hash_buckets()[0]);
+    EXPECT_EQ("", p.range_key_start());
+    EXPECT_EQ(string("a1\0\0", 4), p.range_key_end());
+    EXPECT_EQ("", p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\0" "a1\0\0c1", 8), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[1];
+    ASSERT_EQ(1, p.hash_buckets().size());
+    ASSERT_EQ(1, p.hash_buckets().size());
+    EXPECT_EQ(1, p.hash_buckets()[0]);
+    EXPECT_EQ("", p.range_key_start());
+    EXPECT_EQ(string("a1\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\1", 4), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\1" "a1\0\0", 8), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[2];
+    ASSERT_EQ(1, p.hash_buckets().size());
+    EXPECT_EQ(2, p.hash_buckets()[0]);
+    EXPECT_EQ("", p.range_key_start());
+    EXPECT_EQ(string("a1\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\2", 4), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\2" "a1\0\0", 8), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[3];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(0, p.hash_buckets()[0]);
+    EXPECT_EQ(0, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[4];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(0, p.hash_buckets()[0]);
+    EXPECT_EQ(1, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[5];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(1, p.hash_buckets()[0]);
+    EXPECT_EQ(0, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[6];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(1, p.hash_buckets()[0]);
+    EXPECT_EQ(1, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[7];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(2, p.hash_buckets()[0]);
+    EXPECT_EQ(0, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[8];
+    ASSERT_EQ(2, p.hash_buckets().size());
+    EXPECT_EQ(2, p.hash_buckets()[0]);
+    EXPECT_EQ(1, p.hash_buckets()[1]);
+    EXPECT_EQ(string("a2\0\0", 4), p.range_key_start());
+    EXPECT_EQ(string("a3\0\0", 4), p.range_key_end());
+    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a2\0\0", 12), p.partition_key_start());
+    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0", 12), p.partition_key_end());
+  }
+  {
+    const auto& p = partitions[9];
+    ASSERT_EQ(0, p.hash_buckets().size());
+    EXPECT_EQ(string("a4\0\0", 4), p.range_key_start());
+    EXPECT_EQ("", p.range_key_end());
+    EXPECT_EQ(string("a4\0\0", 4), p.partition_key_start());
+    EXPECT_EQ("", p.partition_key_end());
+  }
+}
+
 TEST_F(PartitionTest, TestPartitionSchemaPB) {
   // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
   // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];