HBASE-28523 Use a single get call in REST multiget endpoint (#5862)

Signed-off-by: Ankit Singhal <ankit@apache.org>
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java
index 4c2f6c3..99fc0c8 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hbase.rest;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Base64.Decoder;
+import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.rest.model.CellModel;
@@ -98,8 +101,7 @@
         ParseFilter pf = new ParseFilter();
         parsedParamFilter = pf.parseFilterString(filterBytes);
       }
-      CellSetModel model = new CellSetModel();
-      // TODO map this to a Table.get(List<Get> gets) call instead of multiple get calls
+      List<RowSpec> rowSpecs = new ArrayList<>();
       for (String rk : params.get(ROW_KEYS_PARAM_NAME)) {
         RowSpec rowSpec = new RowSpec(rk, keyEncoding);
 
@@ -112,24 +114,24 @@
             rowSpec.addColumn(Bytes.toBytes(this.columns[i]));
           }
         }
-
-        ResultGenerator generator = ResultGenerator.fromRowSpec(this.tableResource.getName(),
-          rowSpec, parsedParamFilter, !params.containsKey(NOCACHE_PARAM_NAME));
-        Cell value = null;
-        RowModel rowModel = new RowModel(rowSpec.getRow());
-        if (generator.hasNext()) {
-          while ((value = generator.next()) != null) {
-            rowModel.addCell(new CellModel(CellUtil.cloneFamily(value),
-              CellUtil.cloneQualifier(value), value.getTimestamp(), CellUtil.cloneValue(value)));
-          }
-          model.addRow(rowModel);
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("The row : " + rk + " not found in the table.");
-          }
-        }
+        rowSpecs.add(rowSpec);
       }
 
+      MultiRowResultReader reader = new MultiRowResultReader(this.tableResource.getName(), rowSpecs,
+        parsedParamFilter, !params.containsKey(NOCACHE_PARAM_NAME));
+
+      CellSetModel model = new CellSetModel();
+      for (Result r : reader.getResults()) {
+        if (r.isEmpty()) {
+          continue;
+        }
+        RowModel rowModel = new RowModel(r.getRow());
+        for (Cell c : r.listCells()) {
+          rowModel.addCell(new CellModel(CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c),
+            c.getTimestamp(), CellUtil.cloneValue(c)));
+        }
+        model.addRow(rowModel);
+      }
       if (model.getRows().isEmpty()) {
         // If no rows found.
         servlet.getMetrics().incrementFailedGetRequests(1);
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResultReader.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResultReader.java
new file mode 100644
index 0000000..2903c37
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResultReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class MultiRowResultReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MultiRowResultReader.class);
+
+  private Result[] results;
+
+  public MultiRowResultReader(final String tableName, final Collection<RowSpec> rowspecs,
+    final Filter filter, final boolean cacheBlocks) throws IOException {
+    try (Table table = RESTServlet.getInstance().getTable(tableName)) {
+      List<Get> gets = new ArrayList<>(rowspecs.size());
+      for (RowSpec rowspec : rowspecs) {
+        Get get = new Get(rowspec.getRow());
+        if (rowspec.hasColumns()) {
+          for (byte[] col : rowspec.getColumns()) {
+            byte[][] split = CellUtil.parseColumn(col);
+            if (split.length == 1) {
+              get.addFamily(split[0]);
+            } else if (split.length == 2) {
+              get.addColumn(split[0], split[1]);
+            } else {
+              throw new IllegalArgumentException("Invalid column specifier.");
+            }
+          }
+        }
+        get.setTimeRange(rowspec.getStartTime(), rowspec.getEndTime());
+        get.readVersions(rowspec.getMaxVersions());
+        if (filter != null) {
+          get.setFilter(filter);
+        }
+        get.setCacheBlocks(cacheBlocks);
+        gets.add(get);
+      }
+      results = table.get(gets);
+    } catch (DoNotRetryIOException e) {
+      // TODO this is copied from RowResultGenerator, but we probably shouldn't swallow
+      // every type of exception but AccessDeniedException
+      LOG.warn(StringUtils.stringifyException(e));
+      // Lets get the exception rethrown to get a more meaningful error message than 404
+      if (e instanceof AccessDeniedException) {
+        throw e;
+      }
+    }
+  }
+
+  public Result[] getResults() {
+    return results;
+  }
+
+}