HBASE-16288 HFile intermediate block level indexes might recurse forever creating multi TB files
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 222e146..46e3261 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -77,6 +77,16 @@
   public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
 
   /**
+   * Minimum number of entries in a single index block. Even if we are above the
+   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
+   * entries. We should have at least a few entries so that we don't have too many levels in the
+   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
+   */
+  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
+
+  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
+
+  /**
    * The number of bytes stored in each "secondary index" entry in addition to
    * key bytes in the non-root index block format. The first long is the file
    * offset of the deeper-level block the entry points to, and the int that
@@ -120,6 +130,7 @@
       searchTreeLevel = treeLevel;
     }
 
+    @Override
     protected long calculateHeapSizeForBlockKeys(long heapSize) {
       // Calculating the size of blockKeys
       if (blockKeys != null) {
@@ -162,10 +173,12 @@
       return null;
     }
 
+    @Override
     protected void initialize(int numEntries) {
       blockKeys = new byte[numEntries][];
     }
 
+    @Override
     protected void add(final byte[] key, final long offset, final int dataSize) {
       blockOffsets[rootCount] = offset;
       blockKeys[rootCount] = key;
@@ -242,6 +255,7 @@
       searchTreeLevel = treeLevel;
     }
 
+    @Override
     protected long calculateHeapSizeForBlockKeys(long heapSize) {
       if (blockKeys != null) {
         heapSize += ClassSize.REFERENCE;
@@ -430,6 +444,7 @@
       return targetMidKey;
     }
 
+    @Override
     protected void initialize(int numEntries) {
       blockKeys = new Cell[numEntries];
     }
@@ -441,6 +456,7 @@
      * @param offset file offset where the block is stored
      * @param dataSize the uncompressed data size
      */
+    @Override
     protected void add(final byte[] key, final long offset, final int dataSize) {
       blockOffsets[rootCount] = offset;
       // Create the blockKeys as Cells once when the reader is opened
@@ -569,7 +585,7 @@
      * Return the BlockWithScanInfo which contains the DataBlock with other scan
      * info such as nextIndexedKey. This function will only be called when the
      * HFile version is larger than 1.
-     * 
+     *
      * @param key
      *          the key we are looking for
      * @param currentBlock
@@ -623,7 +639,7 @@
 
     /**
      * Finds the root-level index block containing the given key.
-     * 
+     *
      * @param key
      *          Key to find
      * @param comp
@@ -701,7 +717,7 @@
      * Performs a binary search over a non-root level index block. Utilizes the
      * secondary index, which records the offsets of (offset, onDiskSize,
      * firstKey) tuples of all entries.
-     * 
+     *
      * @param key
      *          the key we are searching for offsets to individual entries in
      *          the blockIndex buffer
@@ -985,6 +1001,9 @@
     /** The maximum size guideline of all multi-level index blocks. */
     private int maxChunkSize;
 
+    /** The maximum level of multi-level index blocks */
+    private int minIndexNumEntries;
+
     /** Whether we require this block index to always be single-level. */
     private boolean singleLevelOnly;
 
@@ -1017,15 +1036,23 @@
       this.cacheConf = cacheConf;
       this.nameForCaching = nameForCaching;
       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
+      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
     }
 
     public void setMaxChunkSize(int maxChunkSize) {
       if (maxChunkSize <= 0) {
-        throw new IllegalArgumentException("Invald maximum index block size");
+        throw new IllegalArgumentException("Invalid maximum index block size");
       }
       this.maxChunkSize = maxChunkSize;
     }
 
+    public void setMinIndexNumEntries(int minIndexNumEntries) {
+      if (minIndexNumEntries <= 1) {
+        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
+      }
+      this.minIndexNumEntries = minIndexNumEntries;
+    }
+
     /**
      * Writes the root level and intermediate levels of the block index into
      * the output stream, generating the tree from bottom up. Assumes that the
@@ -1057,7 +1084,11 @@
           : null;
 
       if (curInlineChunk != null) {
-        while (rootChunk.getRootSize() > maxChunkSize) {
+        while (rootChunk.getRootSize() > maxChunkSize
+            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
+            && rootChunk.getNumEntries() > minIndexNumEntries
+            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
+            && numLevels < 16) {
           rootChunk = writeIntermediateLevel(out, rootChunk);
           numLevels += 1;
         }
@@ -1153,8 +1184,12 @@
         curChunk.add(currentLevel.getBlockKey(i),
             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
 
-        if (curChunk.getRootSize() >= maxChunkSize)
+        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
+        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
+        // first key is larger than maxChunkSize this will cause infinite recursion.
+        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
           writeIntermediateBlock(out, parent, curChunk);
+        }
       }
 
       if (curChunk.getNumEntries() > 0) {
@@ -1647,4 +1682,8 @@
   public static int getMaxChunkSize(Configuration conf) {
     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
   }
+
+  public static int getMinIndexNumEntries(Configuration conf) {
+    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 5769744..c57ecf7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -287,6 +287,8 @@
         cacheIndexesOnWrite ? name : null);
     dataBlockIndexWriter.setMaxChunkSize(
         HFileBlockIndex.getMaxChunkSize(conf));
+    dataBlockIndexWriter.setMinIndexNumEntries(
+        HFileBlockIndex.getMinIndexNumEntries(conf));
     inlineBlockWriters.add(dataBlockIndexWriter);
 
     // Meta data block index writer
@@ -327,13 +329,13 @@
       doCacheOnWrite(lastDataBlockOffset);
     }
   }
-  
+
   /**
    * Try to return a Cell that falls between <code>left</code> and
    * <code>right</code> but that is shorter; i.e. takes up less space. This
    * trick is used building HFile block index. Its an optimization. It does not
    * always work. In this case we'll just return the <code>right</code> cell.
-   * 
+   *
    * @param comparator
    *          Comparator to use.
    * @param left
@@ -764,4 +766,4 @@
       outputStream = null;
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 470d483..810dd9f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -348,7 +348,7 @@
 
     for (int i = 0; i < numTotalKeys; ++i) {
       byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i * 2);
-      KeyValue cell = new KeyValue(k, Bytes.toBytes("f"), Bytes.toBytes("q"), 
+      KeyValue cell = new KeyValue(k, Bytes.toBytes("f"), Bytes.toBytes("q"),
           Bytes.toBytes("val"));
       //KeyValue cell = new KeyValue.KeyOnlyKeyValue(k, 0, k.length);
       keys.add(cell.getKey());
@@ -671,6 +671,48 @@
         valueRead);
   }
 
+  @Test(timeout=10000)
+  public void testIntermediateLevelIndicesWithLargeKeys() throws IOException {
+    testIntermediateLevelIndicesWithLargeKeys(16);
+  }
 
+  @Test(timeout=10000)
+  public void testIntermediateLevelIndicesWithLargeKeysWithMinNumEntries() throws IOException {
+    // because of the large rowKeys, we will end up with a 50-level block index without sanity check
+    testIntermediateLevelIndicesWithLargeKeys(2);
+  }
+
+  public void testIntermediateLevelIndicesWithLargeKeys(int minNumEntries) throws IOException {
+    Path hfPath = new Path(TEST_UTIL.getDataTestDir(),
+      "testIntermediateLevelIndicesWithLargeKeys.hfile");
+    int maxChunkSize = 1024;
+    FileSystem fs = FileSystem.get(conf);
+    CacheConfig cacheConf = new CacheConfig(conf);
+    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize);
+    conf.setInt(HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY, minNumEntries);
+    HFileContext context = new HFileContextBuilder().withBlockSize(16).build();
+    HFile.Writer hfw = new HFile.WriterFactory(conf, cacheConf)
+            .withFileContext(context)
+            .withPath(fs, hfPath).create();
+    List<byte[]> keys = new ArrayList<byte[]>();
+
+    // This should result in leaf-level indices and a root level index
+    for (int i=0; i < 100; i++) {
+      byte[] rowkey = new byte[maxChunkSize + 1];
+      byte[] b = Bytes.toBytes(i);
+      System.arraycopy(b, 0, rowkey, rowkey.length - b.length, b.length);
+      keys.add(rowkey);
+      hfw.append(CellUtil.createCell(rowkey));
+    }
+    hfw.close();
+
+    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf);
+    // Scanner doesn't do Cells yet.  Fix.
+    HFileScanner scanner = reader.getScanner(true, true);
+    for (int i = 0; i < keys.size(); ++i) {
+      scanner.seekTo(CellUtil.createCell(keys.get(i)));
+    }
+    reader.close();
+  }
 }