TAJO-1940: Implement HBaseTablespace::getTableVolume() method.

Closes #910
diff --git a/CHANGES b/CHANGES
index 008b987..62c0504 100644
--- a/CHANGES
+++ b/CHANGES
@@ -7,6 +7,9 @@
 
   IMPROVEMENT
 
+    TAJO-1940: Implement HBaseTablespace::getTableVolume() method.
+    (hyunsik)
+
     TAJO-1991: Tablespace::getVolume should take filter predication.
     (hyunsik)
 
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 7454927..ffa48aa 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -54,7 +54,6 @@
 import org.apache.tajo.storage.hbase.*;
 import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.TUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -65,10 +64,7 @@
 import java.net.URI;
 import java.sql.ResultSet;
 import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.*;
 
@@ -221,7 +217,62 @@
     } finally {
       TablespaceManager.addTableSpaceForTest(existing.get());
     }
+  }
 
+  private void putData(HTableInterface htable, int rownum) throws IOException {
+    for (int i = 0; i < rownum; i++) {
+      Put put = new Put(String.valueOf(i).getBytes());
+      put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+      put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+      put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+      put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+      put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+      htable.put(put);
+    }
+  }
+
+  @Test
+  public void testGetTableVolume() throws Exception {
+    final String tableName = "external_hbase_table";
+
+    Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1");
+    assertTrue(existing.isPresent());
+
+    try {
+      HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+      hTableDesc.addFamily(new HColumnDescriptor("col1"));
+      hTableDesc.addFamily(new HColumnDescriptor("col2"));
+      hTableDesc.addFamily(new HColumnDescriptor("col3"));
+      testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+      String sql = String.format(
+          "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+              "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+              "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+      executeString(sql).close();
+
+      assertTableExists("external_hbase_mapped_table");
+
+      HBaseTablespace tablespace = (HBaseTablespace)existing.get();
+      HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
+
+      try (HTableInterface htable = hconn.getTable(tableName)) {
+        htable.setAutoFlushTo(true);
+        putData(htable, 4000);
+      }
+      hconn.close();
+
+      Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster.
+
+      TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table");
+      assertNotNull(tablespace);
+      long volume = tablespace.getTableVolume(createdTable, Optional.<EvalNode>absent());
+      assertTrue(volume > 0 || volume == -1);
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+
+    } finally {
+      TablespaceManager.addTableSpaceForTest(existing.get());
+    }
   }
 
   @Test
@@ -248,22 +299,14 @@
       HTableInterface htable = hconn.getTable("external_hbase_table");
 
       try {
-        for (int i = 0; i < 100; i++) {
-          Put put = new Put(String.valueOf(i).getBytes());
-          put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
-          put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
-          put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
-          put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
-          put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
-          htable.put(put);
-        }
-
+        putData(htable, 100);
         ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
         assertResultSet(res);
         cleanupQuery(res);
       } finally {
         executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
         htable.close();
+        hconn.close();
       }
     } finally {
       TablespaceManager.addTableSpaceForTest(existing.get());
@@ -793,13 +836,13 @@
     Schema schema = new Schema();
     schema.addColumn("id", Type.TEXT);
     schema.addColumn("name", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     DecimalFormat df = new DecimalFormat("000");
     for (int i = 99; i >= 0; i--) {
       datas.add(df.format(i) + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select id, name from base_table ").close();
@@ -849,12 +892,12 @@
     Schema schema = new Schema();
     schema.addColumn("id", Type.TEXT);
     schema.addColumn("name", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     for (int i = 99; i >= 0; i--) {
       datas.add(i + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select id, name from base_table ").close();
@@ -907,13 +950,13 @@
     Schema schema = new Schema();
     schema.addColumn("id", Type.TEXT);
     schema.addColumn("name", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     DecimalFormat df = new DecimalFormat("000");
     for (int i = 99; i >= 0; i--) {
       datas.add(df.format(i) + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select id, name from base_table ").close();
@@ -967,12 +1010,12 @@
     schema.addColumn("id2", Type.TEXT);
     schema.addColumn("name", Type.TEXT);
     DecimalFormat df = new DecimalFormat("000");
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     for (int i = 99; i >= 0; i--) {
       datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select id1, id2, name from base_table ").close();
@@ -1022,12 +1065,12 @@
     Schema schema = new Schema();
     schema.addColumn("id", Type.INT4);
     schema.addColumn("name", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     for (int i = 99; i >= 0; i--) {
       datas.add(i + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select id, name from base_table ").close();
@@ -1080,14 +1123,14 @@
     schema.addColumn("col2_key", Type.TEXT);
     schema.addColumn("col2_value", Type.TEXT);
     schema.addColumn("col3", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     for (int i = 20; i >= 0; i--) {
       for (int j = 0; j < 3; j++) {
         datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
       }
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString("insert into hbase_mapped_table " +
         "select rk, col2_key, col2_value, col3 from base_table ").close();
@@ -1167,12 +1210,12 @@
     Schema schema = new Schema();
     schema.addColumn("id", Type.INT4);
     schema.addColumn("name", Type.TEXT);
-    List<String> datas = new ArrayList<String>();
+    List<String> datas = new ArrayList<>();
     for (int i = 99; i >= 0; i--) {
       datas.add(i + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     try {
       executeString("insert into hbase_mapped_table " +
@@ -1245,7 +1288,7 @@
       datas.add(df.format(i) + "|value" + i);
     }
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-        schema, tableOptions, datas.toArray(new String[]{}), 2);
+        schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
     executeString(
         "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
@@ -1330,7 +1373,7 @@
     } finally {
       executeString("DROP TABLE hbase_mapped_table PURGE").close();
 
-      client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
+      client.unsetSessionVariables(Arrays.asList(HBaseStorageConstants.INSERT_PUT_MODE));
 
       if (scanner != null) {
         scanner.close();
@@ -1367,7 +1410,7 @@
         datas.add(df.format(i) + "|value" + i + "|comment-" + i);
       }
       TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
-          schema, tableOptions, datas.toArray(new String[]{}), 2);
+          schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
 
       executeString("insert into location '/tmp/hfile_test' " +
           "select id, name, comment from base_table ").close();
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
index f27aeff..3720dca 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan;
 
 import com.google.common.base.Optional;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 
@@ -40,5 +41,5 @@
    */
   URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
 
-  long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+  long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
 }
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
index a1e9a6d..df65482 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
@@ -110,7 +110,7 @@
     private long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
       try {
         if (table.getStats() != null) {
-          return storage.getTableVolumn(table.getUri(), filter);
+          return storage.getTableVolumn(table, filter);
         }
       } catch (UnsupportedException t) {
         LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()");
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 01ef9de..8919d87 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -99,7 +99,7 @@
     return name + "=" + uri.toString();
   }
 
-  public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+  public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
 
   /**
    * if {@link StorageProperty#isArbitraryPathAllowed} is true,
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 33cd7a3..798de1d 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.MetadataProvider;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UndefinedTablespaceException;
@@ -439,9 +440,9 @@
   }
 
   @Override
-  public long getTableVolumn(URI tableUri, Optional<EvalNode> filter)
+  public long getTableVolumn(TableDesc table, Optional<EvalNode> filter)
       throws UnsupportedException {
-    return get(tableUri).getTableVolume(tableUri, filter);
+    return get(table.getUri()).getTableVolume(table, filter);
   }
 
   public static Iterable<Tablespace> getAllTablespaces() {
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 8aa51e6..d522fe2 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage.hbase;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import net.minidev.json.JSONObject;
 import org.apache.commons.logging.Log;
@@ -34,7 +35,6 @@
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
@@ -74,6 +74,8 @@
       new StorageProperty("hbase", false, true, false, false);
   public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
   public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
+  public static final List<byte []> EMPTY_START_ROW_KEY = Arrays.asList(new byte [0]);
+  public static final List<byte []> EMPTY_END_ROW_KEY   = Arrays.asList(new byte [0]);
 
   private Configuration hbaseConf;
 
@@ -99,8 +101,20 @@
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
-    throw new UnsupportedException();
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+    long totalVolume = 0;
+    try {
+      for (HBaseFragment f : getRawSplits("", table, filter.orNull())) {
+        if (f.getLength() > 0) {
+          totalVolume += f.getLength();
+        }
+      }
+    } catch (TajoException e) {
+      throw new TajoRuntimeException(e);
+    } catch (Throwable ioe) {
+      throw new TajoInternalError(ioe);
+    }
+    return totalVolume;
   }
 
   @Override
@@ -141,8 +155,8 @@
     ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
     int numRowKeys = 0;
     boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    for (int i = 0; i < isRowKeyMappings.length; i++) {
-      if (isRowKeyMappings[i]) {
+    for (boolean isRowKeyMapping : isRowKeyMappings) {
+      if (isRowKeyMapping) {
         numRowKeys++;
       }
     }
@@ -180,7 +194,7 @@
           tableColumnFamilies.add(eachColumn.getNameAsString());
         }
 
-        Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames();
+        Collection<String> mappingColumnFamilies = columnMapping.getColumnFamilyNames();
         if (mappingColumnFamilies.isEmpty()) {
           throw new MissingTablePropertyException(HBaseStorageConstants.META_COLUMNS_KEY, hbaseTableName);
         }
@@ -272,13 +286,15 @@
       // If there is many split keys, Tajo allows to define in the file.
       Path path = new Path(splitRowKeysFile);
       FileSystem fs = path.getFileSystem(conf);
+
       if (!fs.exists(path)) {
         throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.",
             hbaseTableName);
       }
 
-      SortedSet<String> splitKeySet = new TreeSet<String>();
+      SortedSet<String> splitKeySet = new TreeSet<>();
       BufferedReader reader = null;
+
       try {
         reader = new BufferedReader(new InputStreamReader(fs.open(path)));
         String line = null;
@@ -415,147 +431,166 @@
     return new Column[]{indexColumn};
   }
 
-  @Override
-  public List<Fragment> getSplits(String inputSourceId,
-                                  TableDesc tableDesc,
-                                  @Nullable EvalNode filterCondition)
-      throws IOException, TajoException {
+  private Pair<List<byte []>, List<byte []>> getSelectedKeyRange(
+      ColumnMapping columnMap,
+      List<IndexPredication> predicates) {
 
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
+    final List<byte[]> startRows;
+    final List<byte[]> stopRows;
 
-    List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition);
-    HTable htable = null;
+    if (predicates != null && !predicates.isEmpty()) {
+      startRows = new ArrayList<>();
+      stopRows = new ArrayList<>();
 
-    try {
-
-      htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
-      RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
-
-      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
-      if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-        HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-        if (null == regLoc) {
-          throw new IOException("Expecting at least one region.");
-        }
-
-        List<Fragment> fragments = new ArrayList<Fragment>(1);
-        HBaseFragment fragment = new HBaseFragment(
-            tableDesc.getUri(),
-            inputSourceId, htable.getName().getNameAsString(),
-            HConstants.EMPTY_BYTE_ARRAY,
-            HConstants.EMPTY_BYTE_ARRAY,
-            regLoc.getHostname());
-        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-        if (regionSize == 0) {
-          fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+      // indexPredications is Disjunctive set
+      for (IndexPredication pred: predicates) {
+        if (pred.getStartValue() != null) {
+          startRows.add(serialize(columnMap, pred, pred.getStartValue()));
         } else {
-          fragment.setLength(regionSize);
+          startRows.add(HConstants.EMPTY_START_ROW);
         }
-        fragments.add(fragment);
-        return fragments;
+
+        if (pred.getStopValue() != null) {
+          stopRows.add(serialize(columnMap, pred, pred.getStopValue()));
+        } else {
+          stopRows.add(HConstants.EMPTY_START_ROW);
+        }
+      }
+    } else {
+      startRows = EMPTY_START_ROW_KEY;
+      stopRows  = EMPTY_END_ROW_KEY;
+    }
+
+    return new Pair(startRows, stopRows);
+  }
+
+  private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) {
+    return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0;
+  }
+
+  private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) {
+    long regionSize = calculator.getRegionSize(regionName);
+    if (regionSize == 0) {
+      return TajoConstants.UNKNOWN_LENGTH;
+    } else {
+      return regionSize;
+    }
+  }
+
+  private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable,
+                                                  RegionSizeCalculator sizeCalculator) throws IOException {
+    HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+    if (null == regLoc) {
+      throw new IOException("Expecting at least one region.");
+    }
+
+    HBaseFragment fragment = new HBaseFragment(
+        table.getUri(),
+        sourceId, htable.getName().getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY,
+        HConstants.EMPTY_BYTE_ARRAY,
+        regLoc.getHostname());
+
+    fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName()));
+    return ImmutableList.of(fragment);
+  }
+
+  private Collection<HBaseFragment> convertRangeToFragment(
+      TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator,
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange,
+      Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException {
+
+    final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
+
+    for (int i = 0; i < tableRange.getFirst().length; i++) {
+      HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false);
+      if (location == null) {
+        throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i]));
       }
 
-      List<byte[]> startRows;
-      List<byte[]> stopRows;
+      final byte[] regionStartKey = tableRange.getFirst()[i];
+      final byte[] regionStopKey = tableRange.getSecond()[i];
 
-      if (indexPredications != null && !indexPredications.isEmpty()) {
-        // indexPredications is Disjunctive set
-        startRows = new ArrayList<byte[]>();
-        stopRows = new ArrayList<byte[]>();
-        for (IndexPredication indexPredication: indexPredications) {
-          byte[] startRow;
-          byte[] stopRow;
-          if (indexPredication.getStartValue() != null) {
-            startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
+      int startRowsSize = selectedRange.getFirst().size();
+      for (int j = 0; j < startRowsSize; j++) {
+        byte[] startRow = selectedRange.getFirst().get(j);
+        byte[] stopRow = selectedRange.getSecond().get(j);
+        // determine if the given start an stop key fall into the region
+        if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
+            && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+          final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
+              regionStartKey : startRow;
+
+          final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
+              regionStopKey.length > 0 ? regionStopKey : stopRow;
+
+          if (fragmentMap.containsKey(regionStartKey)) {
+            final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
+            if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+              prevFragment.setStartRow(fragmentStart);
+            }
+            if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+              prevFragment.setStopRow(fragmentStop);
+            }
           } else {
-            startRow = HConstants.EMPTY_START_ROW;
-          }
-          if (indexPredication.getStopValue() != null) {
-            stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
-          } else {
-            stopRow = HConstants.EMPTY_END_ROW;
-          }
-          startRows.add(startRow);
-          stopRows.add(stopRow);
-        }
-      } else {
-        startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
-        stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
-      }
 
-      // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext)
-      // region startkey -> HBaseFragment
-      Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>();
-      for (int i = 0; i < keys.getFirst().length; i++) {
-        HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
-        if (null == location) {
-          throw new IOException("Can't find the region of the key: " +  Bytes.toStringBinary(keys.getFirst()[i]));
-        }
+            final HBaseFragment fragment = new HBaseFragment(table.getUri(),
+                inputSourceId,
+                htable.getName().getNameAsString(),
+                fragmentStart,
+                fragmentStop,
+                location.getHostname());
 
-        byte[] regionStartKey = keys.getFirst()[i];
-        byte[] regionStopKey = keys.getSecond()[i];
+            fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName()));
+            fragmentMap.put(regionStartKey, fragment);
 
-        int startRowsSize = startRows.size();
-        for (int j = 0; j < startRowsSize; j++) {
-          byte[] startRow = startRows.get(j);
-          byte[] stopRow = stopRows.get(j);
-          // determine if the given start an stop key fall into the region
-          if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
-              && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
-            byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
-                regionStartKey : startRow;
-
-            byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
-                regionStopKey.length > 0 ? regionStopKey : stopRow;
-
-            if (fragmentMap.containsKey(regionStartKey)) {
-              HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
-              if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
-                prevFragment.setStartRow(fragmentStart);
-              }
-              if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
-                prevFragment.setStopRow(fragmentStop);
-              }
-            } else {
-              byte[] regionName = location.getRegionInfo().getRegionName();
-              long regionSize = sizeCalculator.getRegionSize(regionName);
-
-              HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
-                  inputSourceId,
-                  htable.getName().getNameAsString(),
-                  fragmentStart,
-                  fragmentStop,
-                  location.getHostname());
-              if (regionSize == 0) {
-                fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
-              } else {
-                fragment.setLength(regionSize);
-              }
-
-              fragmentMap.put(regionStartKey, fragment);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
-              }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
             }
           }
         }
       }
+    }
 
-      List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values());
+    return fragmentMap.values();
+  }
+
+  @Override
+  public List<Fragment> getSplits(String inputSourceId,
+                                  TableDesc table,
+                                  @Nullable EvalNode filterCondition) throws IOException, TajoException {
+    return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
+  }
+
+  private List<HBaseFragment> getRawSplits(String inputSourceId,
+                                           TableDesc table,
+                                           @Nullable EvalNode filterCondition) throws IOException, TajoException {
+    final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getOptions());
+
+    try (final HTable htable = new HTable(hbaseConf, table.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY))) {
+      final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
+      final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys();
+      if (isEmptyRegion(tableRange)) {
+        return createEmptyFragment(table, inputSourceId, htable, sizeCalculator);
+      }
+
+      final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange(
+          columnMapping,
+          getIndexPredications(columnMapping, table, filterCondition));
+
+      // region startkey -> HBaseFragment
+      List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange));
       Collections.sort(fragments);
       if (!fragments.isEmpty()) {
         fragments.get(fragments.size() - 1).setLast(true);
       }
-      return (ArrayList<Fragment>) (ArrayList) fragments;
-    } finally {
-      if (htable != null) {
-        htable.close();
-      }
+
+      return ImmutableList.copyOf(fragments);
     }
   }
 
   private byte[] serialize(ColumnMapping columnMapping,
-                           IndexPredication indexPredication, Datum datum) throws IOException {
+                           IndexPredication indexPredication, Datum datum) {
     if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
       return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
     } else {
@@ -611,7 +646,7 @@
     private String username;
 
     HConnectionKey(Configuration conf) {
-      Map<String, String> m = new HashMap<String, String>();
+      Map<String, String> m = new HashMap<>();
       if (conf != null) {
         for (String property : CONNECTION_PROPERTIES) {
           String value = conf.get(property);
@@ -689,10 +724,7 @@
 
     @Override
     public String toString() {
-      return "HConnectionKey{" +
-          "properties=" + properties +
-          ", username='" + username + '\'' +
-          '}';
+      return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}';
     }
   }
 
@@ -724,7 +756,7 @@
 
   public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual,
                                                        Column[] indexableColumns) throws IOException {
-    List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>();
+    List<Set<EvalNode>> indexablePredicateList = new ArrayList<>();
 
     // if a query statement has a search condition, try to find indexable predicates
     if (indexableColumns != null && qual != null) {
@@ -874,7 +906,7 @@
           new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE}));
     }
     if (startDatum != null || endDatum != null) {
-      return new Pair<Datum, Datum>(startDatum, endDatum);
+      return new Pair<>(startDatum, endDatum);
     } else {
       return null;
     }
@@ -944,7 +976,7 @@
         if (endKeys.length == 1) {
           return new TupleRange[]{dataRange};
         }
-        List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length);
+        List<TupleRange> tupleRanges = new ArrayList<>(endKeys.length);
 
         TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs);
         Tuple previousTuple = dataRange.getStart();
@@ -976,12 +1008,12 @@
           for (int i = 0; i < sortSpecs.length; i++) {
             if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) {
               endTuple.put(i,
-                  HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
-                      rowKeyFields[i]));
+                      HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+                              rowKeyFields[i]));
             } else {
               endTuple.put(i,
-                  HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
-                      rowKeyFields[i]));
+                      HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+                              rowKeyFields[i]));
             }
           }
           tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple));
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
new file mode 100644
index 0000000..806320d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is borrowed from Hbase, but it is modified in order to recognize
+ * the mem store size.
+ *
+ * Computes size of each region for given table and given column families.
+ * The value is used by MapReduce for better scheduling.
+ * */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class RegionSizeCalculator {
+
+  private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
+
+  /**
+   * Maps each region to its size in bytes.
+   * */
+  private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+  static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+  /**
+   * Computes size of each region for table and given column families.
+   *
+   * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead.
+   */
+  @Deprecated
+  public RegionSizeCalculator(HTable table) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+    try {
+      init(table.getRegionLocator(), admin);
+    } finally {
+      admin.close();
+    }
+  }
+
+  /**
+   * Computes size of each region for table and given column families.
+   * */
+  public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException {
+    init(regionLocator, admin);
+  }
+
+  private void init(RegionLocator regionLocator, Admin admin)
+      throws IOException {
+    if (!enabled(admin.getConfiguration())) {
+      LOG.info("Region size calculation disabled.");
+      return;
+    }
+
+    LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
+
+    //get regions for table
+    List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+    Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    for (HRegionLocation regionInfo : tableRegionInfos) {
+      tableRegions.add(regionInfo.getRegionInfo().getRegionName());
+    }
+
+    ClusterStatus clusterStatus = admin.getClusterStatus();
+    Collection<ServerName> servers = clusterStatus.getServers();
+    final long megaByte = 1024L * 1024L;
+
+    //iterate all cluster regions, filter regions from our table and compute their size
+    for (ServerName serverName: servers) {
+      ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+      for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
+        byte[] regionId = regionLoad.getName();
+
+        if (tableRegions.contains(regionId)) {
+
+          long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte;
+          sizeMap.put(regionId, regionSizeBytes);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
+          }
+        }
+      }
+    }
+    LOG.debug("Region sizes calculated");
+  }
+
+  boolean enabled(Configuration configuration) {
+    return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+  }
+
+  /**
+   * Returns size of given region in bytes. Returns 0 if region was not found.
+   * */
+  public long getRegionSize(byte[] regionId) {
+    Long size = sizeMap.get(regionId);
+    if (size == null) {
+      LOG.debug("Unknown region:" + Arrays.toString(regionId));
+      return 0;
+    } else {
+      return size;
+    }
+  }
+
+  public Map<byte[], Long> getRegionSizeMap() {
+    return Collections.unmodifiableMap(sizeMap);
+  }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index dc4502c..61ecab8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -127,8 +127,8 @@
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
-    Path path = new Path(uri);
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
+    Path path = new Path(table.getUri());
     ContentSummary summary;
     try {
       summary = fs.getContentSummary(path);
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index da0b5a7..4c62523 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -110,7 +110,7 @@
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
     throw new UnsupportedException();
   }