TAJO-1940: Implement HBaseTablespace::getTableVolume() method.
Closes #910
diff --git a/CHANGES b/CHANGES
index 008b987..62c0504 100644
--- a/CHANGES
+++ b/CHANGES
@@ -7,6 +7,9 @@
IMPROVEMENT
+ TAJO-1940: Implement HBaseTablespace::getTableVolume() method.
+ (hyunsik)
+
TAJO-1991: Tablespace::getVolume should take filter predication.
(hyunsik)
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 7454927..ffa48aa 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -54,7 +54,6 @@
import org.apache.tajo.storage.hbase.*;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.TUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -65,10 +64,7 @@
import java.net.URI;
import java.sql.ResultSet;
import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.junit.Assert.*;
@@ -221,7 +217,62 @@
} finally {
TablespaceManager.addTableSpaceForTest(existing.get());
}
+ }
+ private void putData(HTableInterface htable, int rownum) throws IOException {
+ for (int i = 0; i < rownum; i++) {
+ Put put = new Put(String.valueOf(i).getBytes());
+ put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+ put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+ put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+ put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+ put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+ htable.put(put);
+ }
+ }
+
+ @Test
+ public void testGetTableVolume() throws Exception {
+ final String tableName = "external_hbase_table";
+
+ Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1");
+ assertTrue(existing.isPresent());
+
+ try {
+ HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ hTableDesc.addFamily(new HColumnDescriptor("col1"));
+ hTableDesc.addFamily(new HColumnDescriptor("col2"));
+ hTableDesc.addFamily(new HColumnDescriptor("col3"));
+ testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+ String sql = String.format(
+ "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+ "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+ "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+ executeString(sql).close();
+
+ assertTableExists("external_hbase_mapped_table");
+
+ HBaseTablespace tablespace = (HBaseTablespace)existing.get();
+ HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
+
+ try (HTableInterface htable = hconn.getTable(tableName)) {
+ htable.setAutoFlushTo(true);
+ putData(htable, 4000);
+ }
+ hconn.close();
+
+ Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster.
+
+ TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table");
+ assertNotNull(tablespace);
+ long volume = tablespace.getTableVolume(createdTable, Optional.<EvalNode>absent());
+ assertTrue(volume > 0 || volume == -1);
+ executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+
+ } finally {
+ TablespaceManager.addTableSpaceForTest(existing.get());
+ }
}
@Test
@@ -248,22 +299,14 @@
HTableInterface htable = hconn.getTable("external_hbase_table");
try {
- for (int i = 0; i < 100; i++) {
- Put put = new Put(String.valueOf(i).getBytes());
- put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
- put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
- put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
- put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
- put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
- htable.put(put);
- }
-
+ putData(htable, 100);
ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
assertResultSet(res);
cleanupQuery(res);
} finally {
executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
htable.close();
+ hconn.close();
}
} finally {
TablespaceManager.addTableSpaceForTest(existing.get());
@@ -793,13 +836,13 @@
Schema schema = new Schema();
schema.addColumn("id", Type.TEXT);
schema.addColumn("name", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
DecimalFormat df = new DecimalFormat("000");
for (int i = 99; i >= 0; i--) {
datas.add(df.format(i) + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select id, name from base_table ").close();
@@ -849,12 +892,12 @@
Schema schema = new Schema();
schema.addColumn("id", Type.TEXT);
schema.addColumn("name", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
for (int i = 99; i >= 0; i--) {
datas.add(i + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select id, name from base_table ").close();
@@ -907,13 +950,13 @@
Schema schema = new Schema();
schema.addColumn("id", Type.TEXT);
schema.addColumn("name", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
DecimalFormat df = new DecimalFormat("000");
for (int i = 99; i >= 0; i--) {
datas.add(df.format(i) + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select id, name from base_table ").close();
@@ -967,12 +1010,12 @@
schema.addColumn("id2", Type.TEXT);
schema.addColumn("name", Type.TEXT);
DecimalFormat df = new DecimalFormat("000");
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
for (int i = 99; i >= 0; i--) {
datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select id1, id2, name from base_table ").close();
@@ -1022,12 +1065,12 @@
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("name", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
for (int i = 99; i >= 0; i--) {
datas.add(i + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select id, name from base_table ").close();
@@ -1080,14 +1123,14 @@
schema.addColumn("col2_key", Type.TEXT);
schema.addColumn("col2_value", Type.TEXT);
schema.addColumn("col3", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
for (int i = 20; i >= 0; i--) {
for (int j = 0; j < 3; j++) {
datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
}
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into hbase_mapped_table " +
"select rk, col2_key, col2_value, col3 from base_table ").close();
@@ -1167,12 +1210,12 @@
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("name", Type.TEXT);
- List<String> datas = new ArrayList<String>();
+ List<String> datas = new ArrayList<>();
for (int i = 99; i >= 0; i--) {
datas.add(i + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
try {
executeString("insert into hbase_mapped_table " +
@@ -1245,7 +1288,7 @@
datas.add(df.format(i) + "|value" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString(
"CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
@@ -1330,7 +1373,7 @@
} finally {
executeString("DROP TABLE hbase_mapped_table PURGE").close();
- client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
+ client.unsetSessionVariables(Arrays.asList(HBaseStorageConstants.INSERT_PUT_MODE));
if (scanner != null) {
scanner.close();
@@ -1367,7 +1410,7 @@
datas.add(df.format(i) + "|value" + i + "|comment-" + i);
}
TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
- schema, tableOptions, datas.toArray(new String[]{}), 2);
+ schema, tableOptions, datas.toArray(new String[datas.size()]), 2);
executeString("insert into location '/tmp/hfile_test' " +
"select id, name, comment from base_table ").close();
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
index f27aeff..3720dca 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
@@ -19,6 +19,7 @@
package org.apache.tajo.plan;
import com.google.common.base.Optional;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
@@ -40,5 +41,5 @@
*/
URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
- long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+ long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
}
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
index a1e9a6d..df65482 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
@@ -110,7 +110,7 @@
private long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
try {
if (table.getStats() != null) {
- return storage.getTableVolumn(table.getUri(), filter);
+ return storage.getTableVolumn(table, filter);
}
} catch (UnsupportedException t) {
LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()");
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 01ef9de..8919d87 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -99,7 +99,7 @@
return name + "=" + uri.toString();
}
- public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+ public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
/**
* if {@link StorageProperty#isArbitraryPathAllowed} is true,
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 33cd7a3..798de1d 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.MetadataProvider;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UndefinedTablespaceException;
@@ -439,9 +440,9 @@
}
@Override
- public long getTableVolumn(URI tableUri, Optional<EvalNode> filter)
+ public long getTableVolumn(TableDesc table, Optional<EvalNode> filter)
throws UnsupportedException {
- return get(tableUri).getTableVolume(tableUri, filter);
+ return get(table.getUri()).getTableVolume(table, filter);
}
public static Iterable<Tablespace> getAllTablespaces() {
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 8aa51e6..d522fe2 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage.hbase;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import net.minidev.json.JSONObject;
import org.apache.commons.logging.Log;
@@ -34,7 +35,6 @@
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
@@ -74,6 +74,8 @@
new StorageProperty("hbase", false, true, false, false);
public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
+ public static final List<byte []> EMPTY_START_ROW_KEY = Arrays.asList(new byte [0]);
+ public static final List<byte []> EMPTY_END_ROW_KEY = Arrays.asList(new byte [0]);
private Configuration hbaseConf;
@@ -99,8 +101,20 @@
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
- throw new UnsupportedException();
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+ long totalVolume = 0;
+ try {
+ for (HBaseFragment f : getRawSplits("", table, filter.orNull())) {
+ if (f.getLength() > 0) {
+ totalVolume += f.getLength();
+ }
+ }
+ } catch (TajoException e) {
+ throw new TajoRuntimeException(e);
+ } catch (Throwable ioe) {
+ throw new TajoInternalError(ioe);
+ }
+ return totalVolume;
}
@Override
@@ -141,8 +155,8 @@
ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
int numRowKeys = 0;
boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
- for (int i = 0; i < isRowKeyMappings.length; i++) {
- if (isRowKeyMappings[i]) {
+ for (boolean isRowKeyMapping : isRowKeyMappings) {
+ if (isRowKeyMapping) {
numRowKeys++;
}
}
@@ -180,7 +194,7 @@
tableColumnFamilies.add(eachColumn.getNameAsString());
}
- Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames();
+ Collection<String> mappingColumnFamilies = columnMapping.getColumnFamilyNames();
if (mappingColumnFamilies.isEmpty()) {
throw new MissingTablePropertyException(HBaseStorageConstants.META_COLUMNS_KEY, hbaseTableName);
}
@@ -272,13 +286,15 @@
// If there is many split keys, Tajo allows to define in the file.
Path path = new Path(splitRowKeysFile);
FileSystem fs = path.getFileSystem(conf);
+
if (!fs.exists(path)) {
throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.",
hbaseTableName);
}
- SortedSet<String> splitKeySet = new TreeSet<String>();
+ SortedSet<String> splitKeySet = new TreeSet<>();
BufferedReader reader = null;
+
try {
reader = new BufferedReader(new InputStreamReader(fs.open(path)));
String line = null;
@@ -415,147 +431,166 @@
return new Column[]{indexColumn};
}
- @Override
- public List<Fragment> getSplits(String inputSourceId,
- TableDesc tableDesc,
- @Nullable EvalNode filterCondition)
- throws IOException, TajoException {
+ private Pair<List<byte []>, List<byte []>> getSelectedKeyRange(
+ ColumnMapping columnMap,
+ List<IndexPredication> predicates) {
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
+ final List<byte[]> startRows;
+ final List<byte[]> stopRows;
- List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition);
- HTable htable = null;
+ if (predicates != null && !predicates.isEmpty()) {
+ startRows = new ArrayList<>();
+ stopRows = new ArrayList<>();
- try {
-
- htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
- RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
-
- org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
- HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
-
- List<Fragment> fragments = new ArrayList<Fragment>(1);
- HBaseFragment fragment = new HBaseFragment(
- tableDesc.getUri(),
- inputSourceId, htable.getName().getNameAsString(),
- HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY,
- regLoc.getHostname());
- long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
- if (regionSize == 0) {
- fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+ // indexPredications is Disjunctive set
+ for (IndexPredication pred: predicates) {
+ if (pred.getStartValue() != null) {
+ startRows.add(serialize(columnMap, pred, pred.getStartValue()));
} else {
- fragment.setLength(regionSize);
+ startRows.add(HConstants.EMPTY_START_ROW);
}
- fragments.add(fragment);
- return fragments;
+
+ if (pred.getStopValue() != null) {
+ stopRows.add(serialize(columnMap, pred, pred.getStopValue()));
+ } else {
+ stopRows.add(HConstants.EMPTY_START_ROW);
+ }
+ }
+ } else {
+ startRows = EMPTY_START_ROW_KEY;
+ stopRows = EMPTY_END_ROW_KEY;
+ }
+
+ return new Pair(startRows, stopRows);
+ }
+
+ private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) {
+ return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0;
+ }
+
+ private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) {
+ long regionSize = calculator.getRegionSize(regionName);
+ if (regionSize == 0) {
+ return TajoConstants.UNKNOWN_LENGTH;
+ } else {
+ return regionSize;
+ }
+ }
+
+ private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable,
+ RegionSizeCalculator sizeCalculator) throws IOException {
+ HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ HBaseFragment fragment = new HBaseFragment(
+ table.getUri(),
+ sourceId, htable.getName().getNameAsString(),
+ HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY,
+ regLoc.getHostname());
+
+ fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName()));
+ return ImmutableList.of(fragment);
+ }
+
+ private Collection<HBaseFragment> convertRangeToFragment(
+ TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator,
+ org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange,
+ Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException {
+
+ final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
+
+ for (int i = 0; i < tableRange.getFirst().length; i++) {
+ HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false);
+ if (location == null) {
+ throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i]));
}
- List<byte[]> startRows;
- List<byte[]> stopRows;
+ final byte[] regionStartKey = tableRange.getFirst()[i];
+ final byte[] regionStopKey = tableRange.getSecond()[i];
- if (indexPredications != null && !indexPredications.isEmpty()) {
- // indexPredications is Disjunctive set
- startRows = new ArrayList<byte[]>();
- stopRows = new ArrayList<byte[]>();
- for (IndexPredication indexPredication: indexPredications) {
- byte[] startRow;
- byte[] stopRow;
- if (indexPredication.getStartValue() != null) {
- startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
+ int startRowsSize = selectedRange.getFirst().size();
+ for (int j = 0; j < startRowsSize; j++) {
+ byte[] startRow = selectedRange.getFirst().get(j);
+ byte[] stopRow = selectedRange.getSecond().get(j);
+ // determine if the given start an stop key fall into the region
+ if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
+ && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+ final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
+ regionStartKey : startRow;
+
+ final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
+ regionStopKey.length > 0 ? regionStopKey : stopRow;
+
+ if (fragmentMap.containsKey(regionStartKey)) {
+ final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
+ if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+ prevFragment.setStartRow(fragmentStart);
+ }
+ if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+ prevFragment.setStopRow(fragmentStop);
+ }
} else {
- startRow = HConstants.EMPTY_START_ROW;
- }
- if (indexPredication.getStopValue() != null) {
- stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
- } else {
- stopRow = HConstants.EMPTY_END_ROW;
- }
- startRows.add(startRow);
- stopRows.add(stopRow);
- }
- } else {
- startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
- stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
- }
- // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext)
- // region startkey -> HBaseFragment
- Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>();
- for (int i = 0; i < keys.getFirst().length; i++) {
- HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
- if (null == location) {
- throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(keys.getFirst()[i]));
- }
+ final HBaseFragment fragment = new HBaseFragment(table.getUri(),
+ inputSourceId,
+ htable.getName().getNameAsString(),
+ fragmentStart,
+ fragmentStop,
+ location.getHostname());
- byte[] regionStartKey = keys.getFirst()[i];
- byte[] regionStopKey = keys.getSecond()[i];
+ fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName()));
+ fragmentMap.put(regionStartKey, fragment);
- int startRowsSize = startRows.size();
- for (int j = 0; j < startRowsSize; j++) {
- byte[] startRow = startRows.get(j);
- byte[] stopRow = stopRows.get(j);
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
- && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
- byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
- regionStartKey : startRow;
-
- byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
- regionStopKey.length > 0 ? regionStopKey : stopRow;
-
- if (fragmentMap.containsKey(regionStartKey)) {
- HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
- if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
- prevFragment.setStartRow(fragmentStart);
- }
- if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
- prevFragment.setStopRow(fragmentStop);
- }
- } else {
- byte[] regionName = location.getRegionInfo().getRegionName();
- long regionSize = sizeCalculator.getRegionSize(regionName);
-
- HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
- inputSourceId,
- htable.getName().getNameAsString(),
- fragmentStart,
- fragmentStop,
- location.getHostname());
- if (regionSize == 0) {
- fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
- } else {
- fragment.setLength(regionSize);
- }
-
- fragmentMap.put(regionStartKey, fragment);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
}
}
}
}
+ }
- List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values());
+ return fragmentMap.values();
+ }
+
+ @Override
+ public List<Fragment> getSplits(String inputSourceId,
+ TableDesc table,
+ @Nullable EvalNode filterCondition) throws IOException, TajoException {
+ return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
+ }
+
+ private List<HBaseFragment> getRawSplits(String inputSourceId,
+ TableDesc table,
+ @Nullable EvalNode filterCondition) throws IOException, TajoException {
+ final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getOptions());
+
+ try (final HTable htable = new HTable(hbaseConf, table.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY))) {
+ final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
+ final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys();
+ if (isEmptyRegion(tableRange)) {
+ return createEmptyFragment(table, inputSourceId, htable, sizeCalculator);
+ }
+
+ final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange(
+ columnMapping,
+ getIndexPredications(columnMapping, table, filterCondition));
+
+ // region startkey -> HBaseFragment
+ List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange));
Collections.sort(fragments);
if (!fragments.isEmpty()) {
fragments.get(fragments.size() - 1).setLast(true);
}
- return (ArrayList<Fragment>) (ArrayList) fragments;
- } finally {
- if (htable != null) {
- htable.close();
- }
+
+ return ImmutableList.copyOf(fragments);
}
}
private byte[] serialize(ColumnMapping columnMapping,
- IndexPredication indexPredication, Datum datum) throws IOException {
+ IndexPredication indexPredication, Datum datum) {
if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
} else {
@@ -611,7 +646,7 @@
private String username;
HConnectionKey(Configuration conf) {
- Map<String, String> m = new HashMap<String, String>();
+ Map<String, String> m = new HashMap<>();
if (conf != null) {
for (String property : CONNECTION_PROPERTIES) {
String value = conf.get(property);
@@ -689,10 +724,7 @@
@Override
public String toString() {
- return "HConnectionKey{" +
- "properties=" + properties +
- ", username='" + username + '\'' +
- '}';
+ return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}';
}
}
@@ -724,7 +756,7 @@
public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual,
Column[] indexableColumns) throws IOException {
- List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>();
+ List<Set<EvalNode>> indexablePredicateList = new ArrayList<>();
// if a query statement has a search condition, try to find indexable predicates
if (indexableColumns != null && qual != null) {
@@ -874,7 +906,7 @@
new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE}));
}
if (startDatum != null || endDatum != null) {
- return new Pair<Datum, Datum>(startDatum, endDatum);
+ return new Pair<>(startDatum, endDatum);
} else {
return null;
}
@@ -944,7 +976,7 @@
if (endKeys.length == 1) {
return new TupleRange[]{dataRange};
}
- List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length);
+ List<TupleRange> tupleRanges = new ArrayList<>(endKeys.length);
TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs);
Tuple previousTuple = dataRange.getStart();
@@ -976,12 +1008,12 @@
for (int i = 0; i < sortSpecs.length; i++) {
if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) {
endTuple.put(i,
- HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
- rowKeyFields[i]));
+ HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+ rowKeyFields[i]));
} else {
endTuple.put(i,
- HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
- rowKeyFields[i]));
+ HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+ rowKeyFields[i]));
}
}
tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple));
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
new file mode 100644
index 0000000..806320d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is borrowed from Hbase, but it is modified in order to recognize
+ * the mem store size.
+ *
+ * Computes size of each region for given table and given column families.
+ * The value is used by MapReduce for better scheduling.
+ * */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class RegionSizeCalculator {
+
+ private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
+
+ /**
+ * Maps each region to its size in bytes.
+ * */
+ private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+ static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+ /**
+ * Computes size of each region for table and given column families.
+ *
+ * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead.
+ */
+ @Deprecated
+ public RegionSizeCalculator(HTable table) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+ try {
+ init(table.getRegionLocator(), admin);
+ } finally {
+ admin.close();
+ }
+ }
+
+ /**
+ * Computes size of each region for table and given column families.
+ * */
+ public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException {
+ init(regionLocator, admin);
+ }
+
+ private void init(RegionLocator regionLocator, Admin admin)
+ throws IOException {
+ if (!enabled(admin.getConfiguration())) {
+ LOG.info("Region size calculation disabled.");
+ return;
+ }
+
+ LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
+
+ //get regions for table
+ List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+ Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation regionInfo : tableRegionInfos) {
+ tableRegions.add(regionInfo.getRegionInfo().getRegionName());
+ }
+
+ ClusterStatus clusterStatus = admin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ final long megaByte = 1024L * 1024L;
+
+ //iterate all cluster regions, filter regions from our table and compute their size
+ for (ServerName serverName: servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+ for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+
+ if (tableRegions.contains(regionId)) {
+
+ long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte;
+ sizeMap.put(regionId, regionSizeBytes);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
+ }
+ }
+ }
+ }
+ LOG.debug("Region sizes calculated");
+ }
+
+ boolean enabled(Configuration configuration) {
+ return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+ }
+
+ /**
+ * Returns size of given region in bytes. Returns 0 if region was not found.
+ * */
+ public long getRegionSize(byte[] regionId) {
+ Long size = sizeMap.get(regionId);
+ if (size == null) {
+ LOG.debug("Unknown region:" + Arrays.toString(regionId));
+ return 0;
+ } else {
+ return size;
+ }
+ }
+
+ public Map<byte[], Long> getRegionSizeMap() {
+ return Collections.unmodifiableMap(sizeMap);
+ }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index dc4502c..61ecab8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -127,8 +127,8 @@
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
- Path path = new Path(uri);
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
+ Path path = new Path(table.getUri());
ContentSummary summary;
try {
summary = fs.getContentSummary(path);
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index da0b5a7..4c62523 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -110,7 +110,7 @@
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
throw new UnsupportedException();
}