KUDU-2671: Adds compatibility for per range hash schemas with unbounded ranges.

This patch updates the logic at the end of PartitionSchema::CreatePartiitons()
to allow per range hash schemas to be compatible with unbounded ranges. Some
additional context about the block of code is given below.

For the start partition key, it iterates in reverse order through the partition's
hash buckets. It breaks out of the loop at the first instance of a bucket not
equal to 0; If the bucket is equal to 0 it erases that part of the partition key.
Essentially, if all hash buckets are equal to 0, then it erases the entire key.

For the end partition key, it also iterates in reverse order through the
partition's hash buckets. It first erases the index portition of the
partition key. It then checks if the current hash bucket is the max
bucket of the current hash schema. If it is not the max, it encodes the
current hash bucket + 1 at the index portion of the key and breaks the loop.
If it is the max, it continues within the loop. Essentially, if all the
hash buckets are the max then it erases the entire key.

Prior to this change, this block of code assumed the same hash bucket
schema for each partition. With per range hash schemas, that may not
necessarily be the case. The vector 'partition_idx_to_hash_schemas_idx'
maps each partition to the index of 'bounds_with_hash_schemas' to ensure
the correct hash bucket schema is used. '-1' is used to signify the use
of the table wide hash schema.

Change-Id: I5f6c709e211359b04f7597af5f670c787bda7481
Reviewed-on: http://gerrit.cloudera.org:8080/17090
Reviewed-by: Andrew Wong <awong@cloudera.com>
Tested-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 03670a4..70b63f8 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -1089,4 +1089,143 @@
             "range hash schemas.", s1.ToString());
 
 }
+
+TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB schema_builder;
+  // Table-wide hash schema defined below.
+  AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+
+  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)",
+            partition_schema.DebugString(schema));
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::RangeHashSchema range_hash_schemas;
+
+  { // [(_, _, _), (a1, _, c1))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+    PartitionSchema::HashBucketSchemas hash_schema_4_buckets = {{{ColumnId(0)}, 4, 0}};
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_4_buckets);
+  }
+
+  { // [(a2, b2, _), (a3, b3, _))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2"));
+    ASSERT_OK(lower.SetStringCopy("b", "b2"));
+    ASSERT_OK(upper.SetStringCopy("a", "a3"));
+    ASSERT_OK(upper.SetStringCopy("b", "b3"));
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(PartitionSchema::HashBucketSchemas());
+  }
+
+  { // [(a4, b4, _), (_, _, _))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a4"));
+    ASSERT_OK(lower.SetStringCopy("b", "b4"));
+    PartitionSchema::HashBucketSchemas hash_schema_2_buckets_by_3 = {
+        {{ColumnId(0)}, 2, 0},
+        {{ColumnId(2)}, 3, 0}
+    };
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3);
+  }
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+  ASSERT_EQ(12, partitions.size());
+  // Partitions below sorted by range, can verify that the partition keyspace is filled by checking
+  // that the start key of the first partition and the end key of the last partition is cleared.
+
+  EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[0].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].range_key_end());
+  EXPECT_EQ("", partitions[0].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12), partitions[0].partition_key_end());
+
+  EXPECT_EQ(1, partitions[1].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[1].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1", 4),partitions[1].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12), partitions[1].partition_key_end());
+
+  EXPECT_EQ(2, partitions[2].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[2].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].range_key_end());
+  EXPECT_EQ(string("\0\0\0\2", 4), partitions[2].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12), partitions[2].partition_key_end());
+
+  EXPECT_EQ(3, partitions[3].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[3].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].range_key_end());
+  EXPECT_EQ(string("\0\0\0\3", 4), partitions[3].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12), partitions[3].partition_key_end());
+
+  EXPECT_EQ(0, partitions[4].hash_buckets()[0]);
+  EXPECT_EQ(string("a2\0\0b2\0\0", 8), partitions[4].range_key_start());
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "a2\0\0b2\0\0", 12), partitions[4].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "a3\0\0b3\0\0", 12), partitions[4].partition_key_end());
+
+  EXPECT_EQ(1, partitions[5].hash_buckets()[0]);
+  EXPECT_EQ(string("a2\0\0b2\0\0", 8), partitions[5].range_key_start());
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "a2\0\0b2\0\0", 12), partitions[5].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "a3\0\0b3\0\0", 12), partitions[5].partition_key_end());
+
+  EXPECT_EQ(0, partitions[6].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[6].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].range_key_start());
+  EXPECT_EQ("", partitions[6].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16), partitions[6].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1", 8), partitions[6].partition_key_end());
+
+  EXPECT_EQ(0, partitions[7].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[7].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].range_key_start());
+  EXPECT_EQ("", partitions[7].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[7].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2", 8), partitions[7].partition_key_end());
+
+  EXPECT_EQ(0, partitions[8].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[8].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].range_key_start());
+  EXPECT_EQ("", partitions[8].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[8].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1", 4), partitions[8].partition_key_end());
+
+  EXPECT_EQ(1, partitions[9].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[9].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].range_key_start());
+  EXPECT_EQ("", partitions[9].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16), partitions[9].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1", 8), partitions[9].partition_key_end());
+
+  EXPECT_EQ(1, partitions[10].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[10].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[10].range_key_start());
+  EXPECT_EQ("", partitions[10].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[10].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2", 8), partitions[10].partition_key_end());
+
+  EXPECT_EQ(1, partitions[11].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[11].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[11].range_key_start());
+  EXPECT_EQ("", partitions[11].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[11].partition_key_start());
+  EXPECT_EQ("", partitions[11].partition_key_end());
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 6b2018e..ae276c1 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -449,6 +449,11 @@
   RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
   RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), &bounds_with_hash_schemas));
 
+  // Maps each partition to its respective hash schemas within 'bounds_with_hash_schemas',
+  // needed for logic later in function for filling in holes in partition key space. Will be
+  // empty if no per range hash schemas are used.
+  vector<int> partition_idx_to_hash_schemas_idx;
+
   if (!range_hash_schemas.empty()) {
     // The number of ranges should match the size of range_hash_schemas.
     DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
@@ -456,7 +461,8 @@
     DCHECK(split_rows.empty());
     vector<Partition> result_partitions;
     // Iterate through each bound and its hash schemas to generate hash partitions.
-    for (const auto& bound : bounds_with_hash_schemas) {
+    for (int i = 0; i < bounds_with_hash_schemas.size(); i++) {
+      const auto& bound = bounds_with_hash_schemas[i];
       const auto& current_range_hash_schemas = bound.hash_schemas;
       vector<Partition> current_bound_hash_partitions;
       // If current bound's HashBucketSchema is empty, implies use of default table-wide schema.
@@ -467,20 +473,23 @@
         current_bound_hash_partitions = GenerateHashPartitions(current_range_hash_schemas,
                                                                hash_encoder);
       }
-      // Adds range part to partition key.
+      // Add range part to partition key.
       for (Partition& partition : current_bound_hash_partitions) {
         partition.partition_key_start_.append(bound.lower);
         partition.partition_key_end_.append(bound.upper);
+        int index = current_range_hash_schemas.empty() ? -1 : i;
+        partition_idx_to_hash_schemas_idx.emplace_back(index);
       }
       result_partitions.insert(result_partitions.end(),
                                std::make_move_iterator(current_bound_hash_partitions.begin()),
                                std::make_move_iterator(current_bound_hash_partitions.end()));
     }
+    DCHECK_EQ(partition_idx_to_hash_schemas_idx.size(), result_partitions.size());
     *partitions = std::move(result_partitions);
   } else {
     // Create a partition per range bound and hash bucket combination.
     vector<Partition> new_partitions;
-    for (const Partition &base_partition : base_hash_partitions) {
+    for (const Partition& base_partition : base_hash_partitions) {
       for (const auto& bound : bounds_with_hash_schemas) {
         Partition partition = base_partition;
         partition.partition_key_start_.append(bound.lower);
@@ -508,7 +517,11 @@
   // the absolute start and end case, these holes are filled by clearing the
   // partition key beginning at the hash component. For a concrete example,
   // see PartitionTest::TestCreatePartitions.
-  for (Partition& partition : *partitions) {
+  const HashBucketSchema* hash_bucket_schema;
+  for (int j = 0; j < partitions->size(); j++) {
+    Partition& partition = (*partitions)[j];
+    // Find the first zero-valued bucket from the end and truncate the partition key
+    // starting from that bucket onwards for zero-valued buckets.
     if (partition.range_key_start().empty()) {
       for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
         if (partition.hash_buckets()[i] != 0) {
@@ -517,11 +530,26 @@
         partition.partition_key_start_.erase(kEncodedBucketSize * i);
       }
     }
+    // Starting from the last hash bucket, truncate the partition key until we hit the first
+    // non-max-valued bucket, at which point, replace the encoding with the next-incremented
+    // bucket value. For example, the following range end partition keys should be transformed,
+    // where the key is (hash_comp1, hash_comp2, range_comp):
+    //
+    // [ (0, 0, "a2b2") -> (0, 1, "a2b2") ]
+    // [ (0, 1, "a2b2") -> (1, _, "a2b2") ]
+    // [ (1, 0, "a2b2") -> (1, 1, "a2b2") ]
+    // [ (1, 1, "a2b2") -> (_, _, "a2b2") ]
     if (partition.range_key_end().empty()) {
       for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
         partition.partition_key_end_.erase(kEncodedBucketSize * i);
         int32_t hash_bucket = partition.hash_buckets()[i] + 1;
-        if (hash_bucket != hash_bucket_schemas_[i].num_buckets) {
+        if (range_hash_schemas.empty() || partition_idx_to_hash_schemas_idx[j] == -1) {
+          hash_bucket_schema = &hash_bucket_schemas_[i];
+        } else {
+          const auto& hash_schemas_idx = partition_idx_to_hash_schemas_idx[j];
+          hash_bucket_schema = &bounds_with_hash_schemas[hash_schemas_idx].hash_schemas[i];
+        }
+        if (hash_bucket != hash_bucket_schema->num_buckets) {
           hash_encoder.Encode(&hash_bucket, &partition.partition_key_end_);
           break;
         }