KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index da25143..5d2cfc4 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.common.util.FIFOIterable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
@@ -222,6 +223,11 @@
}
@Override
+ public int getBatch() {
+ return -1;
+ }
+
+ @Override
public boolean nextRaw(List<Cell> result) throws IOException {
if (iiRowIterator.hasNext()) {
IIRow iiRow = iiRowIterator.next();
@@ -233,7 +239,7 @@
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
throw new NotImplementedException();
}
@@ -243,7 +249,7 @@
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
throw new NotImplementedException();
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1062749..9090068 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -645,5 +645,8 @@
public int getCubeStatsHLLPrecision() {
return Integer.parseInt(getOptional("kylin.job.cubing.inmem.sampling.hll.precision", "14"));
}
-
+
+ public String getPatchedFuzzyRowFilterVersion() {
+ return this.getOptional("kylin.hbase.filter.fuzzy.row.filter.version", "1.1.3");
+ }
}
diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml
index 46d5345..734908e 100644
--- a/examples/test_case_data/sandbox/hbase-site.xml
+++ b/examples/test_case_data/sandbox/hbase-site.xml
@@ -190,22 +190,5 @@
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
- <property>
- <name>hbase.client.pause</name>
- <value>100</value>
- <description>General client pause value. Used mostly as value to wait
- before running a retry of a failed get, region lookup, etc.
- See hbase.client.retries.number for description of how we backoff from
- this initial pause amount and how this pause works w/ retries.</description>
- </property>
- <property>
- <name>hbase.client.retries.number</name>
- <value>5</value>
- <description>Maximum retries. Used as maximum for all retryable
- operations such as the getting of a cell's value, starting a row update,
- etc. Retry interval is a rough function based on hbase.client.pause. At
- first we retry at this interval but then with backoff, we pretty quickly reach
- retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup
- ramps up. Change this setting and hbase.client.pause to suit your workload.</description>
- </property>
+
</configuration>
diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml
index bd947af..6082fa9 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf.xml
@@ -1,20 +1,18 @@
<?xml version="1.0"?>
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. See accompanying LICENSE file.
-->
+
<configuration>
<property>
@@ -26,44 +24,41 @@
</description>
</property>
- <property>
- <name>mapreduce.map.maxattempts</name>
- <value>2</value>
- </property>
+ <!-- uncomment the following 5 properties to enable lzo compressing
- <!--
- <property>
- <name>mapred.compress.map.output</name>
- <value>true</value>
- <description>Compress map outputs</description>
- </property>
+ <property>
+ <name>mapred.compress.map.output</name>
+ <value>true</value>
+ <description>Compress map outputs</description>
+ </property>
- <property>
- <name>mapred.map.output.compression.codec</name>
- <value>org.apache.hadoop.io.compress.SnappyCodec</value>
- <description>The compression codec to use for map outputs
- </description>
- </property>
+ <property>
+ <name>mapred.map.output.compression.codec</name>
+ <value>com.hadoop.compression.lzo.LzoCodec</value>
+ <description>The compression codec to use for map outputs
+ </description>
+ </property>
- <property>
- <name>mapred.output.compress</name>
- <value>true</value>
- <description>Compress the output of a MapReduce job</description>
- </property>
+ <property>
+ <name>mapred.output.compress</name>
+ <value>true</value>
+ <description>Compress the output of a MapReduce job</description>
+ </property>
- <property>
- <name>mapred.output.compression.codec</name>
- <value>org.apache.hadoop.io.compress.SnappyCodec</value>
- <description>The compression codec to use for job outputs
- </description>
- </property>
+ <property>
+ <name>mapred.output.compression.codec</name>
+ <value>com.hadoop.compression.lzo.LzoCodec</value>
+ <description>The compression codec to use for job outputs
+ </description>
+ </property>
- <property>
- <name>mapred.output.compression.type</name>
- <value>BLOCK</value>
- <description>The compression type to use for job outputs</description>
- </property>
--->
+ <property>
+ <name>mapred.output.compression.type</name>
+ <value>BLOCK</value>
+ <description>The compression type to use for job outputs</description>
+ </property>
+
+ !-->
<property>
<name>mapreduce.job.max.split.locations</name>
@@ -76,5 +71,4 @@
<value>2</value>
<description>Block replication</description>
</property>
-
</configuration>
\ No newline at end of file
diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml
index 18f6feb..ff1c7eb 100644
--- a/examples/test_case_data/sandbox/mapred-site.xml
+++ b/examples/test_case_data/sandbox/mapred-site.xml
@@ -18,7 +18,7 @@
<property>
<name>io.sort.mb</name>
- <value>128</value>
+ <value>64</value>
</property>
<property>
@@ -28,12 +28,12 @@
<property>
<name>mapred.job.map.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
<name>mapred.job.reduce.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -58,7 +58,7 @@
<property>
<name>mapreduce.application.classpath</name>
- <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/usr/hdp/${hdp.version}/hadoop/lib/snappy-java-1.0.4.1.jar:/etc/hadoop/conf/secure</value>
+ <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure</value>
</property>
<property>
@@ -81,10 +81,9 @@
<value>false</value>
</property>
- <!--the default value on hdp is 0.05, however for test environments we need to be conservative on resource -->
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
- <value>1</value>
+ <value>0.05</value>
</property>
<property>
@@ -114,7 +113,7 @@
<property>
<name>mapreduce.map.java.opts</name>
- <value>-Xmx512m</value>
+ <value>-Xmx200m</value>
</property>
<property>
@@ -124,7 +123,7 @@
<property>
<name>mapreduce.map.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -169,7 +168,7 @@
<property>
<name>mapreduce.reduce.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -219,7 +218,7 @@
<property>
<name>mapreduce.task.io.sort.mb</name>
- <value>128</value>
+ <value>64</value>
</property>
<property>
@@ -234,7 +233,7 @@
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
- <value>-Xmx512m</value>
+ <value>-Xmx200m</value>
</property>
<property>
@@ -244,7 +243,7 @@
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 07de5d7..a83a754 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -37,8 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -60,6 +59,7 @@
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -404,33 +404,33 @@
}
private void checkHFilesInHBase(CubeSegment segment) throws IOException {
- Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
String tableName = segment.getStorageLocationIdentifier();
- try (HTable table = new HTable(conf, tableName)) {
- HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
- Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
- long totalSize = 0;
- for (Long size : sizeMap.values()) {
- totalSize += size;
+
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+ long totalSize = 0;
+ for (Long size : sizeMap.values()) {
+ totalSize += size;
+ }
+ if (totalSize == 0) {
+ return;
+ }
+
+ Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+ // check if there's region contains more than one hfile, which means the hfile config take effects
+ boolean hasMultiHFileRegions = false;
+ for (Pair<Integer, Integer> count : countMap.values()) {
+ // check if hfile count is greater than store count
+ if (count.getSecond() > count.getFirst()) {
+ hasMultiHFileRegions = true;
+ break;
}
- if (totalSize == 0) {
- return;
- }
- Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
- // check if there's region contains more than one hfile, which means the hfile config take effects
- boolean hasMultiHFileRegions = false;
- for (Pair<Integer, Integer> count : countMap.values()) {
- // check if hfile count is greater than store count
- if (count.getSecond() > count.getFirst()) {
- hasMultiHFileRegions = true;
- break;
- }
- }
- if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
- throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
- } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
- throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
- }
+ }
+ if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+ throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+ } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+ throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
}
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index c0f6f63..a74ecb9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -45,7 +45,8 @@
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.util.ToolRunner;
@@ -208,7 +209,7 @@
}
}
final IISegment segment = createSegment(iiName);
- final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
+ final Table htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(segment.getStorageLocationIdentifier()));
String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
ToolRunner.run(new IICreateHTableJob(), args);
@@ -246,7 +247,7 @@
}
}
- private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException {
+ private void build(SliceBuilder sliceBuilder, StreamingBatch batch, Table htable) throws IOException {
final Slice slice = sliceBuilder.buildSlice(batch);
try {
loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
@@ -255,17 +256,17 @@
}
}
- private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+ private void loadToHBase(Table hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
List<Put> data = Lists.newArrayList();
for (IIRow row : codec.encodeKeyValue(slice)) {
final byte[] key = row.getKey().get();
final byte[] value = row.getValue().get();
Put put = new Put(key);
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+ put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
final ImmutableBytesWritable dictionary = row.getDictionary();
final byte[] dictBytes = dictionary.get();
if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+ put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
} else {
throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
index b5be703..5a591a1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
@@ -20,9 +20,7 @@
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
@@ -49,7 +47,7 @@
IIInstance ii;
IISegment seg;
- HConnection hconn;
+ Connection hconn;
TableRecordInfo info;
@@ -60,8 +58,7 @@
this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
this.seg = ii.getFirstSegment();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- hconn = HConnectionManager.createConnection(hconf);
+ hconn = HBaseConnection.get(getTestConfig().getStorageUrl());
this.info = new TableRecordInfo(seg);
}
diff --git a/pom.xml b/pom.xml
index fb8606b..ef635ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,12 +47,12 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Hadoop versions -->
- <hadoop2.version>2.6.0</hadoop2.version>
- <yarn.version>2.6.0</yarn.version>
+ <hadoop2.version>2.7.1</hadoop2.version>
+ <yarn.version>2.7.1</yarn.version>
<zookeeper.version>3.4.6</zookeeper.version>
- <hive.version>0.14.0</hive.version>
- <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
- <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+ <hive.version>1.2.1</hive.version>
+ <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+ <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
<kafka.version>0.8.1</kafka.version>
<!-- Dependency versions -->
@@ -64,6 +64,7 @@
<!-- Commons -->
<commons-cli.version>1.2</commons-cli.version>
+ <commons-codec.version>1.4</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.1</commons-lang3.version>
<commons-collections.version>3.2.1</commons-collections.version>
@@ -103,7 +104,7 @@
<calcite.version>1.6.0</calcite.version>
<!-- Curator.version Version -->
- <curator.version>2.6.0</curator.version>
+ <curator.version>2.7.1</curator.version>
<!-- Sonar -->
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -324,6 +325,11 @@
<version>${commons-cli.version}</version>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
diff --git a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
index 2220113..0e71cd5 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
@@ -20,7 +20,7 @@
import java.io.IOException;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
/**
*/
@@ -37,6 +37,6 @@
String prepareHBaseTable(Class clazz) throws IOException;
- HTableInterface getTable(String tableName) throws IOException;
+ Table getTable(String tableName) throws IOException;
}
diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index 193f5a8..933c49d 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.UserService;
@@ -29,8 +29,8 @@
*/
public class MockAclHBaseStorage implements AclHBaseStorage {
- private HTableInterface mockedAclTable;
- private HTableInterface mockedUserTable;
+ private Table mockedAclTable;
+ private Table mockedUserTable;
private static final String aclTableName = "MOCK-ACL-TABLE";
private static final String userTableName = "MOCK-USER-TABLE";
@@ -49,7 +49,7 @@
}
@Override
- public HTableInterface getTable(String tableName) throws IOException {
+ public Table getTable(String tableName) throws IOException {
if (StringUtils.equals(tableName, aclTableName)) {
return mockedAclTable;
} else if (StringUtils.equals(tableName, userTableName)) {
diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
index d0aa0ed..972eea9 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -51,7 +51,7 @@
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -91,7 +91,7 @@
* <li>remove some methods for loading data, checking values ...</li>
* </ul>
*/
-public class MockHTable implements HTableInterface {
+public class MockHTable implements Table {
private final String tableName;
private final List<String> columnFamilies = new ArrayList<>();
@@ -114,14 +114,6 @@
this.columnFamilies.add(columnFamily);
}
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getTableName() {
- return tableName.getBytes();
- }
-
@Override
public TableName getName() {
return null;
@@ -200,8 +192,8 @@
}
@Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- return new Boolean[0];
+ public boolean[] existsAll(List<Get> list) throws IOException {
+ return new boolean[0];
}
/**
@@ -306,15 +298,6 @@
* {@inheritDoc}
*/
@Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- // FIXME: implement
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public ResultScanner getScanner(Scan scan) throws IOException {
final List<Result> ret = new ArrayList<Result>();
byte[] st = scan.getStartRow();
@@ -446,7 +429,7 @@
*/
}
if (filter.hasFilterRow() && !filteredOnRowKey) {
- filter.filterRow(nkvs);
+ filter.filterRow();
}
if (filter.filterRow() || filteredOnRowKey) {
nkvs.clear();
@@ -535,6 +518,11 @@
return false;
}
+ @Override
+ public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
+ return false;
+ }
+
/**
* {@inheritDoc}
*/
@@ -555,7 +543,7 @@
continue;
}
for (KeyValue kv : delete.getFamilyMap().get(family)) {
- if (kv.isDeleteFamily()) {
+ if (kv.isDelete()) {
data.get(row).get(kv.getFamily()).clear();
} else {
data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
@@ -592,6 +580,11 @@
return false;
}
+ @Override
+ public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
+ return false;
+ }
+
/**
* {@inheritDoc}
*/
@@ -605,7 +598,7 @@
*/
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
- return incrementColumnValue(row, family, qualifier, amount, true);
+ return incrementColumnValue(row, family, qualifier, amount, null);
}
@Override
@@ -617,37 +610,6 @@
* {@inheritDoc}
*/
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
- if (check(row, family, qualifier, null)) {
- Put put = new Put(row);
- put.add(family, qualifier, Bytes.toBytes(amount));
- put(put);
- return amount;
- }
- long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
- data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
- return newValue;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAutoFlush() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void flushCommits() throws IOException {
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public void close() throws IOException {
}
@@ -673,29 +635,6 @@
* {@inheritDoc}
*/
@Override
- public void setAutoFlush(boolean autoFlush) {
- throw new NotImplementedException();
-
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- throw new NotImplementedException();
-
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- throw new NotImplementedException();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public long getWriteBufferSize() {
throw new NotImplementedException();
}
diff --git a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
index 5a48e83..d40bdf3 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
@@ -21,7 +21,8 @@
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.UserService;
@@ -56,11 +57,11 @@
}
@Override
- public HTableInterface getTable(String tableName) throws IOException {
+ public Table getTable(String tableName) throws IOException {
if (StringUtils.equals(tableName, aclTableName)) {
- return HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
} else if (StringUtils.equals(tableName, userTableName)) {
- return HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
} else {
throw new IllegalStateException("getTable failed" + tableName);
}
diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
index 58e093c..a03ff5e 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -33,7 +33,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -124,7 +124,7 @@
@Override
public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -173,7 +173,7 @@
@Override
public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
- HTableInterface htable = null;
+ Table htable = null;
Result result = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -225,17 +225,16 @@
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
PrincipalSid sid = new PrincipalSid(auth);
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " created successfully.");
} catch (IOException e) {
@@ -249,7 +248,7 @@
@Override
public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -265,7 +264,6 @@
}
htable.delete(delete);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " deleted successfully.");
} catch (IOException e) {
@@ -283,7 +281,7 @@
throw e;
}
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -294,17 +292,16 @@
Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
if (null != acl.getParentAcl()) {
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
}
for (AccessControlEntry ace : acl.getEntries()) {
AceInfo aceInfo = new AceInfo(ace);
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
}
if (!put.isEmpty()) {
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
}
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 8ed0802..6c47fde 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -20,10 +20,7 @@
import java.io.IOException;
import java.util.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -431,35 +428,24 @@
if (htableInfoCache.containsKey(tableName)) {
return htableInfoCache.get(tableName);
}
-
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- HTable table = null;
+ Connection conn = HBaseConnection.get(this.getConfig().getStorageUrl());
HBaseResponse hr = null;
long tableSize = 0;
int regionCount = 0;
- try {
- table = new HTable(hconf, tableName);
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
- HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
- Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
-
- for (long s : sizeMap.values()) {
- tableSize += s;
- }
-
- regionCount = sizeMap.size();
-
- // Set response.
- hr = new HBaseResponse();
- hr.setTableSize(tableSize);
- hr.setRegionCount(regionCount);
- } finally {
- if (null != table) {
- table.close();
- }
+ for (long s : sizeMap.values()) {
+ tableSize += s;
}
+ regionCount = sizeMap.size();
+
+ // Set response.
+ hr = new HBaseResponse();
+ hr.setTableSize(tableSize);
+ hr.setRegionCount(regionCount);
htableInfoCache.put(tableName, hr);
return hr;
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 08b338c..6691784 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -44,8 +44,9 @@
import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
@@ -130,14 +131,13 @@
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -163,14 +163,13 @@
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -182,9 +181,9 @@
}
List<Query> queries = new ArrayList<Query>();
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
index e4e2de3..624c49c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -29,7 +29,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -69,7 +69,7 @@
@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
@@ -100,16 +100,15 @@
@Override
public void updateUser(UserDetails user) {
- HTableInterface htable = null;
+ Table htable = null;
try {
byte[] userAuthorities = serialize(user.getAuthorities());
htable = aclHBaseStorage.getTable(userTableName);
Put put = new Put(Bytes.toBytes(user.getUsername()));
- put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities);
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities);
htable.put(put);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -119,14 +118,13 @@
@Override
public void deleteUser(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
Delete delete = new Delete(Bytes.toBytes(username));
htable.delete(delete);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -141,7 +139,7 @@
@Override
public boolean userExists(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
@@ -161,7 +159,7 @@
s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
List<String> authorities = new ArrayList<String>();
- HTableInterface htable = null;
+ Table htable = null;
ResultScanner scanner = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index e2f4f3a..a7ffa0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -32,9 +32,9 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.engine.mr.HadoopUtil;
@@ -52,13 +52,13 @@
private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
- private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
+ private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- for (HConnection conn : ConnPool.values()) {
+ for (Connection conn : ConnPool.values()) {
try {
conn.close();
} catch (IOException e) {
@@ -124,9 +124,9 @@
// ============================================================================
- // returned HConnection can be shared by multiple threads and does not require close()
+ // returned Connection can be shared by multiple threads and does not require close()
@SuppressWarnings("resource")
- public static HConnection get(String url) {
+ public static Connection get(String url) {
// find configuration
Configuration conf = ConfigCache.get(url);
if (conf == null) {
@@ -134,13 +134,13 @@
ConfigCache.put(url, conf);
}
- HConnection connection = ConnPool.get(url);
+ Connection connection = ConnPool.get(url);
try {
while (true) {
// I don't use DCL since recreate a connection is not a big issue.
if (connection == null || connection.isClosed()) {
logger.info("connection is null or closed, creating a new one");
- connection = HConnectionManager.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
ConnPool.put(url, connection);
}
@@ -159,8 +159,8 @@
return connection;
}
- public static boolean tableExists(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static boolean tableExists(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
return hbase.tableExists(TableName.valueOf(tableName));
} finally {
@@ -180,8 +180,8 @@
deleteTable(HBaseConnection.get(hbaseUrl), tableName);
}
- public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
if (tableExists(conn, tableName)) {
@@ -210,8 +210,8 @@
}
}
- public static void deleteTable(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static void deleteTable(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
if (!tableExists(conn, tableName)) {
@@ -221,10 +221,10 @@
logger.debug("delete HTable '" + tableName + "'");
- if (hbase.isTableEnabled(tableName)) {
- hbase.disableTable(tableName);
+ if (hbase.isTableEnabled(TableName.valueOf(tableName))) {
+ hbase.disableTable(TableName.valueOf(tableName));
}
- hbase.deleteTable(tableName);
+ hbase.deleteTable(TableName.valueOf(tableName));
logger.debug("HTable '" + tableName + "' deleted");
} finally {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 2262482..07a5586 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -31,10 +31,11 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -69,7 +70,7 @@
final String tableNameBase;
final String hbaseUrl;
- private HConnection getConnection() throws IOException {
+ private Connection getConnection() throws IOException {
return HBaseConnection.get(hbaseUrl);
}
@@ -120,7 +121,7 @@
byte[] endRow = Bytes.toBytes(lookForPrefix);
endRow[endRow.length - 1]++;
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
Scan scan = new Scan(startRow, endRow);
if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
scan.addColumn(B_FAMILY, B_COLUMN_TS);
@@ -238,13 +239,12 @@
IOUtils.copy(content, bout);
bout.close();
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
table.put(put);
- table.flushCommits();
} finally {
IOUtils.closeQuietly(table);
}
@@ -252,7 +252,7 @@
@Override
protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -265,8 +265,6 @@
throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
}
- table.flushCommits();
-
return newTS;
} finally {
IOUtils.closeQuietly(table);
@@ -275,11 +273,10 @@
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
Delete del = new Delete(Bytes.toBytes(resPath));
table.delete(del);
- table.flushCommits();
} finally {
IOUtils.closeQuietly(table);
}
@@ -304,7 +301,7 @@
get.addColumn(B_FAMILY, B_COLUMN_TS);
}
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
Result result = table.get(get);
boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
@@ -314,7 +311,7 @@
}
}
- private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
@@ -340,7 +337,7 @@
return redirectPath;
}
- private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+ private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
if (content.length > kvSizeLimit) {
writeLargeCellToHdfs(resPath, content, table);
@@ -348,8 +345,8 @@
}
Put put = new Put(row);
- put.add(B_FAMILY, B_COLUMN, content);
- put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+ put.addColumn(B_FAMILY, B_COLUMN, content);
+ put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
return put;
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
index bbdb542..81f205e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
@@ -26,12 +26,13 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.kv.RowConstants;
@@ -86,14 +87,13 @@
}
private class Writer implements IGTWriter {
- final HTableInterface table;
+ final BufferedMutator table;
final ByteBuffer rowkey = ByteBuffer.allocate(50);
final ByteBuffer value = ByteBuffer.allocate(50);
Writer() throws IOException {
- HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
- table = conn.getTable(htableName);
- table.setAutoFlush(false, true);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ table = conn.getBufferedMutator(htableName);
}
@Override
@@ -113,24 +113,24 @@
Put put = new Put(rowkey);
put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value);
- table.put(put);
+ table.mutate(put);
}
@Override
public void close() throws IOException {
- table.flushCommits();
+ table.flush();
table.close();
}
}
class Reader implements IGTScanner {
- final HTableInterface table;
+ final Table table;
final ResultScanner scanner;
int count = 0;
Reader() throws IOException {
- HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getTable(htableName);
Scan scan = new Scan();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 9eb05d2..7104da8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -25,8 +25,9 @@
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -70,7 +71,7 @@
protected final List<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
- private final HTableInterface table;
+ private final Table table;
protected CubeTupleConverter tupleConverter;
protected final Iterator<HBaseKeyRange> rangeIterator;
@@ -88,7 +89,7 @@
private int advMeasureRowsRemaining;
private int advMeasureRowIndex;
- public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+ public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
@@ -108,7 +109,7 @@
this.rangeIterator = keyRanges.iterator();
try {
- this.table = conn.getTable(tableName);
+ this.table = conn.getTable(TableName.valueOf(tableName));
} catch (Throwable t) {
throw new StorageException("Error when open connection to table " + tableName, t);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 47e94de..7211ec1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -33,7 +33,7 @@
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
@@ -147,7 +147,7 @@
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
- HConnection conn = HBaseConnection.get(context.getConnUrl());
+ Connection conn = HBaseConnection.get(context.getConnUrl());
// notice we're passing filterD down to storage instead of flatFilter
return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java
index 8aace22..aef3963 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java
@@ -24,8 +24,9 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,14 +41,14 @@
byte[] family;
- HTableInterface table;
+ Table table;
ResultScanner scanner;
Iterator<Result> iterator;
- public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family) throws IOException {
+ public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family) throws IOException {
this.family = family;
- this.table = hconn.getTable(tableName);
+ this.table = hconn.getTable(TableName.valueOf(tableName));
this.scanner = table.getScanner(family);
this.iterator = scanner.iterator();
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
index 6342c5c..ae442fe 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
@@ -23,9 +23,11 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
/**
* @author yangli9
@@ -50,7 +52,7 @@
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -60,7 +62,7 @@
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -94,4 +96,9 @@
return Long.MAX_VALUE;
}
+ @Override
+ public int getBatch() {
+ return -1;
+ }
+
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e8dd5b9..d033c77 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -25,7 +25,7 @@
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -57,7 +57,7 @@
private int scanCount;
private ITuple next;
- public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
+ public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
StorageContext context, TupleInfo returnTupleInfo) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
index c7b650a..8dba1b1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
@@ -99,7 +99,7 @@
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
- HRegion region = ctxt.getEnvironment().getRegion();
+ Region region = ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index d8b61b3..635a32f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -24,7 +24,9 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
@@ -116,8 +118,8 @@
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- return outerScanner.next(result, limit);
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.next(result, scannerContext);
}
@Override
@@ -126,8 +128,8 @@
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return outerScanner.nextRaw(result, limit);
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.nextRaw(result, scannerContext);
}
@Override
@@ -160,6 +162,11 @@
return outerScanner.getMvccReadPoint();
}
+ @Override
+ public int getBatch() {
+ return outerScanner.getBatch();
+ }
+
private static class Stats {
long inputRows = 0;
long inputBytes = 0;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
index 8404262..1809a44 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
@@ -24,12 +24,10 @@
import java.util.List;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
@@ -112,7 +110,7 @@
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -122,7 +120,7 @@
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -161,6 +159,11 @@
// AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()");
return Long.MAX_VALUE;
}
+
+ @Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index f0e9bed..c69fd8b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -23,7 +23,7 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -60,7 +60,7 @@
static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
- Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+ Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
if (context.isCoprocessorEnabled() == false) {
return table.getScanner(scan);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index f04d46e..f2573b4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -35,8 +35,9 @@
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -267,7 +268,7 @@
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
- final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+ final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
@@ -339,7 +340,7 @@
Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
try {
- results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
+ results = getResults(builder.build(), conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier())), epRange.getFirst(), epRange.getSecond());
} catch (Throwable throwable) {
throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
}
@@ -384,7 +385,7 @@
}
- private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
+ private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, Table table, byte[] startKey, byte[] endKey) throws Throwable {
Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 938145b..9b8a3de 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -24,8 +24,9 @@
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -164,8 +165,8 @@
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
- HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
- final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+ Connection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+ final Table hbaseTable = hbaseConn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()));
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 0cd35f1..c96d706 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -138,7 +138,7 @@
if (shardLength == 0) {
return;
}
- byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey();
+ byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey();
Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
}
@@ -174,7 +174,7 @@
try {
this.serviceStartTime = System.currentTimeMillis();
- region = env.getRegion();
+ region = (HRegion)env.getRegion();
region.startRegionOperation();
debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
index 9c96f21..a277eeb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -24,7 +24,8 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
@@ -32,6 +33,7 @@
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
@@ -47,7 +49,7 @@
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
- HBaseAdmin admin = null;
+ Admin admin = null;
try {
options.addOption(OPTION_II_NAME);
options.addOption(OPTION_HTABLE_NAME);
@@ -63,10 +65,11 @@
Configuration conf = HBaseConfiguration.create(getConf());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
// check if the table already exists
- admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- if (admin.isTableEnabled(tableName)) {
+ admin = conn.getAdmin();
+ if (admin.tableExists(TableName.valueOf(tableName))) {
+ if (admin.isTableEnabled(TableName.valueOf(tableName))) {
logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
return 0;
} else {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
index fef9662..fa782bf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
@@ -20,7 +20,7 @@
import java.util.ArrayList;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.metadata.realization.SQLDigest;
@@ -53,8 +53,8 @@
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
String tableName = seg.getStorageLocationIdentifier();
- //HConnection is cached, so need not be closed
- HConnection conn = HBaseConnection.get(context.getConnUrl());
+ //Connection is cached, so need not be closed
+ Connection conn = HBaseConnection.get(context.getConnUrl());
try {
dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
return dataIterator;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 4ec421b..8da8e59 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -31,8 +31,9 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -93,16 +94,16 @@
private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null;
private ITupleIterator tupleIterator = null;
- private HTableInterface table = null;
+ private Table table = null;
private TblColRef partitionCol;
private long lastDataTime = -1;
private int rowsInAllMetric = 0;
- public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable {
+ public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn, TupleInfo returnTupleInfo) throws Throwable {
String tableName = segment.getStorageLocationIdentifier();
- table = conn.getTable(tableName);
+ table = conn.getTable(TableName.valueOf(tableName));
factTableName = segment.getIIDesc().getFactTableName();
if (rootFilter == null) {
@@ -290,7 +291,7 @@
return request;
}
- private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
+ private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 48802cb..0b5a963 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -91,7 +91,7 @@
if (request.hasTsRange()) {
Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
- byte[] regionStartKey = region.getStartKey();
+ byte[] regionStartKey = region.getRegionInfo().getStartKey();
if (!ArrayUtils.isEmpty(regionStartKey)) {
shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
} else {
@@ -145,7 +145,7 @@
HRegion region = null;
try {
- region = env.getRegion();
+ region = (HRegion)env.getRegion();
region.startRegionOperation();
innerScanner = region.getScanner(prepareScan(request, region));
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 14f47ef..455d203 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -25,7 +25,8 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -79,7 +80,8 @@
tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+ Admin admin = conn.getAdmin();
try {
if (User.isHBaseSecurityEnabled(conf)) {
@@ -92,7 +94,7 @@
tableDesc.addFamily(cf);
}
- if (admin.tableExists(tableName)) {
+ if (admin.tableExists(TableName.valueOf(tableName))) {
// admin.disableTable(tableName);
// admin.deleteTable(tableName);
throw new RuntimeException("HBase table " + tableName + " exists!");
@@ -101,7 +103,7 @@
DeployCoprocessorCLI.deployCoprocessor(tableDesc);
admin.createTable(tableDesc, splitKeys);
- Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
+ Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
logger.info("create hbase table " + tableName + " done.");
} finally {
admin.close();
@@ -110,8 +112,7 @@
}
public static void deleteHTable(TableName tableName) throws IOException {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin();
try {
if (admin.tableExists(tableName)) {
logger.info("disabling hbase table " + tableName);
@@ -126,8 +127,7 @@
/** create a HTable that has the same performance settings as normal cube table, for benchmark purpose */
public static void createBenchmarkHTable(TableName tableName, String cfName) throws IOException {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin();
try {
if (admin.tableExists(tableName)) {
logger.info("disabling hbase table " + tableName);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index 472b638..9dc9715 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -27,11 +27,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -100,19 +100,21 @@
List<String> oldTables = getOldHTables();
if (oldTables != null && oldTables.size() > 0) {
String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin admin = null;
+ Admin admin = null;
try {
- admin = new HBaseAdmin(conf);
+
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ admin = conn.getAdmin();
+
for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ if (admin.tableExists(TableName.valueOf(table))) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
+ if (admin.isTableEnabled(TableName.valueOf(table))) {
+ admin.disableTable(TableName.valueOf(table));
}
- admin.deleteTable(table);
+ admin.deleteTable(TableName.valueOf(table));
logger.debug("Dropped HBase table " + table);
output.append("Dropped HBase table " + table + " \n");
} else {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index ddc868d..5063e00 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -36,7 +36,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
@@ -63,7 +63,7 @@
private final List<KeyValueCreator> keyValueCreators;
private final int nColumns;
- private final HTableInterface hTable;
+ private final Table hTable;
private final CubeDesc cubeDesc;
private final CubeSegment cubeSegment;
private final Object[] measureValues;
@@ -72,7 +72,7 @@
private AbstractRowKeyEncoder rowKeyEncoder;
private byte[] keybuf;
- public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) {
+ public HBaseCuboidWriter(CubeSegment segment, Table hTable) {
this.keyValueCreators = Lists.newArrayList();
this.cubeSegment = segment;
this.cubeDesc = cubeSegment.getCubeDesc();
@@ -131,7 +131,6 @@
long t = System.currentTimeMillis();
if (hTable != null) {
hTable.put(puts);
- hTable.flushCommits();
}
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index fa62a62..cf5c9d4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -28,7 +28,8 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -55,7 +56,7 @@
try {
CubeSegment cubeSegment = (CubeSegment) buildable;
- final HTableInterface hTable;
+ final Table hTable;
hTable = createHTable(cubeSegment);
List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
@@ -87,10 +88,10 @@
}
}
- private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException {
+ private Table createHTable(final CubeSegment cubeSegment) throws IOException {
final String hTableName = cubeSegment.getStorageLocationIdentifier();
CubeHTableUtil.createHTable(cubeSegment, null);
- final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+ final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName));
logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
return hTable;
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index a1377f2..dfde2dd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -24,12 +24,11 @@
import java.util.List;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -70,19 +69,20 @@
List<String> oldTables = getOldHTables();
if (oldTables != null && oldTables.size() > 0) {
String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin admin = null;
+ Admin admin = null;
try {
- admin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ admin = conn.getAdmin();
+
for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ if (admin.tableExists(TableName.valueOf(table))) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table)));
String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
+ if (admin.isTableEnabled(TableName.valueOf(table))) {
+ admin.disableTable(TableName.valueOf(table));
}
- admin.deleteTable(table);
+ admin.deleteTable(TableName.valueOf(table));
logger.debug("Dropped htable: " + table);
output.append("HBase table " + table + " is dropped. \n");
} else {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
index 1ba96e9..8c19b9f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
@@ -21,11 +21,12 @@
import java.io.IOException;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -53,8 +54,8 @@
}
private void clean() throws IOException {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
String name = descriptor.getNameAsString().toLowerCase();
@@ -65,7 +66,7 @@
System.out.println();
descriptor.setValue(IRealizationConstants.HTableOwner, "DL-eBay-Kylin@ebay.com");
- hbaseAdmin.modifyTable(descriptor.getNameAsString(), descriptor);
+ hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor);
}
}
hbaseAdmin.close();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 5acecbc..05eef7a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -73,7 +73,7 @@
private static ResourceStore srcStore;
private static ResourceStore dstStore;
private static FileSystem hdfsFS;
- private static HBaseAdmin hbaseAdmin;
+ private static Admin hbaseAdmin;
public static final String ACL_INFO_FAMILY = "i";
private static final String ACL_TABLE_NAME = "_acl";
@@ -117,8 +117,8 @@
checkAndGetHbaseUrl();
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(srcConfig.getStorageUrl());
+ hbaseAdmin = conn.getAdmin();
hdfsFS = FileSystem.get(new Configuration());
@@ -142,6 +142,10 @@
} else {
showOpts();
}
+
+ checkMigrationSuccess(dstConfig, cubeName, true);
+
+ IOUtils.closeQuietly(hbaseAdmin);
}
public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
@@ -311,10 +315,10 @@
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
logger.info("CHANGE_HTABLE_HOST is completed");
break;
}
@@ -425,11 +429,11 @@
Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
String projUUID = project.getUuid();
- HTableInterface srcAclHtable = null;
- HTableInterface destAclHtable = null;
+ Table srcAclHtable = null;
+ Table destAclHtable = null;
try {
- srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
- destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+ srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+ destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
// cube acl
Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
@@ -445,11 +449,10 @@
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(cubeId));
- put.add(family, column, value);
+ put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
destAclHtable.put(put);
}
}
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(srcAclHtable);
IOUtils.closeQuietly(destAclHtable);
@@ -476,10 +479,10 @@
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
break;
}
case COPY_FILE_IN_META: {
@@ -509,13 +512,12 @@
case COPY_ACL: {
String cubeId = (String) opt.params[0];
String modelId = (String) opt.params[1];
- HTableInterface destAclHtable = null;
+ Table destAclHtable = null;
try {
- destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+ destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(destAclHtable);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
index f1fc8e1..b735864 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
@@ -27,7 +27,9 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
@@ -61,7 +63,7 @@
private KylinConfig dstCfg;
- private HBaseAdmin hbaseAdmin;
+ private Admin hbaseAdmin;
private List<String> issueExistHTables;
private List<String> inconsistentHTables;
@@ -129,9 +131,8 @@
this.dstCfg = kylinConfig;
this.ifFix = isFix;
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- hbaseAdmin = new HBaseAdmin(conf);
-
+ Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+ hbaseAdmin = conn.getAdmin();
issueExistHTables = Lists.newArrayList();
inconsistentHTables = Lists.newArrayList();
}
@@ -188,10 +189,10 @@
String[] sepNameList = segFullName.split(",");
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
logger.info("Change the host of htable "+sepNameList[0]+"belonging to cube "+sepNameList[1]+" from "+desc.getValue(IRealizationConstants.HTableTag)+" to "+dstCfg.getMetadataUrlPrefix());
- hbaseAdmin.disableTable(sepNameList[0]);
+ hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0]));
desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(sepNameList[0], desc);
- hbaseAdmin.enableTable(sepNameList[0]);
+ hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0]));
}
}else{
logger.info("------ Inconsistent HTables Needed To Be Fixed ------");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 270da13..f742e3e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -40,7 +40,9 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
@@ -80,7 +82,8 @@
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
+ Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
String localCoprocessorJar;
if ("default".equals(args[0])) {
@@ -163,10 +166,10 @@
public static void deployCoprocessor(HTableDescriptor tableDesc) {
try {
initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+ logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
} catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+ logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
logger.error("Will try creating the table without coprocessor.");
}
}
@@ -189,9 +192,9 @@
desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
}
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
logger.info("Unset coprocessor on " + tableName);
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
@@ -219,13 +222,13 @@
desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
}
- hbaseAdmin.modifyTable(tableName, desc);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
}
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
List<String> processed = new ArrayList<String>();
for (String tableName : tableNames) {
@@ -336,7 +339,7 @@
return coprocessorDir;
}
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
+ private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
HashSet<String> result = new HashSet<String>();
for (String tableName : tableNames) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index c55bde4..ad5b647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -26,8 +26,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
@@ -232,9 +233,9 @@
Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer);
String projUUID = project.getUuid();
- HTableInterface aclHtable = null;
+ Table aclHtable = null;
try {
- aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl");
+ aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl"));
// cube acl
Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId)));
@@ -254,7 +255,6 @@
aclHtable.put(put);
}
}
- aclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(aclHtable);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index 8a88b6d..760ed3a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -28,9 +28,9 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -75,7 +75,7 @@
System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
- HConnection conn = HBaseConnection.get(hbaseUrl);
+ Connection conn = HBaseConnection.get(hbaseUrl);
createHTableIfNeeded(conn, TEST_TABLE);
prepareData(conn);
@@ -91,10 +91,10 @@
}
- private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+ private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
Stats stats = new Stats("COLUMN_SCAN");
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -122,20 +122,20 @@
}
}
- private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
}
- private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
}
- private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
}
- private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -156,11 +156,11 @@
}
}
- private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+ private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -204,8 +204,8 @@
}
}
- private static void prepareData(HConnection conn) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void prepareData(Connection conn) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
// check how many rows existing
@@ -258,8 +258,8 @@
return bytes;
}
- private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
boolean tableExist = false;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
index 4ce0dac..f5b315d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
@@ -29,8 +29,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -79,8 +83,8 @@
private void cleanUp() {
try {
// get all kylin hbase tables
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
List<String> allTablesNeedToBeDropped = Lists.newArrayList();
@@ -95,12 +99,12 @@
// drop tables
for (String htableName : allTablesNeedToBeDropped) {
logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+ if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htableName));
}
- hbaseAdmin.deleteTable(htableName);
+ hbaseAdmin.deleteTable(TableName.valueOf(htableName));
logger.info("Deleted HBase table " + htableName);
} else {
logger.info("HBase table" + htableName + " does not exist");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
index 346c3a2..58aa8fd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -20,22 +20,11 @@
package org.apache.kylin.storage.hbase.util;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
@@ -57,30 +46,31 @@
/**
* Computes size of each region for table and given column families.
* */
- public HBaseRegionSizeCalculator(HTable table) throws IOException {
- this(table, new HBaseAdmin(table.getConfiguration()));
- }
+ public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException {
- /** Constructor for unit testing */
- HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
-
+ Table table = null;
+ Admin admin = null;
try {
+ table = hbaseConnection.getTable(TableName.valueOf(tableName));
+ admin = hbaseConnection.getAdmin();
+
if (!enabled(table.getConfiguration())) {
logger.info("Region size calculation disabled.");
return;
}
- logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+ logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
// Get regions for table.
- Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
+ List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- for (HRegionInfo regionInfo : tableRegionInfos) {
- tableRegions.add(regionInfo.getRegionName());
+ for (HRegionLocation hRegionLocation : regionLocationList) {
+ tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
}
- ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+ ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
@@ -104,7 +94,7 @@
}
}
} finally {
- hBaseAdmin.close();
+ admin.close();
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
index 0a791b7..8fb2f58 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
@@ -26,12 +26,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.kylin.storage.hbase.HBaseConnection;
public class HBaseUsage {
@@ -43,8 +47,8 @@
Map<String, List<String>> envs = Maps.newHashMap();
// get all kylin hbase tables
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
for (HTableDescriptor desc : tableDescriptors) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
index deb8da1..79f9fb3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
@@ -32,15 +32,16 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
@@ -58,11 +59,11 @@
private static final byte[] QN = "C".getBytes();
public static void createTable(String tableName) throws IOException {
- HConnection conn = getConnection();
- HBaseAdmin hadmin = new HBaseAdmin(conn);
+ Connection conn = getConnection();
+ Admin hadmin = conn.getAdmin();
try {
- boolean tableExist = hadmin.tableExists(tableName);
+ boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName));
if (tableExist) {
logger.info("HTable '" + tableName + "' already exists");
return;
@@ -119,8 +120,8 @@
e.printStackTrace();
}
- HConnection conn = getConnection();
- HTableInterface table = conn.getTable(tableName);
+ Connection conn = getConnection();
+ Table table = conn.getTable(TableName.valueOf(tableName));
byte[] key = new byte[8 + 4];//time + id
@@ -135,7 +136,7 @@
Bytes.putInt(key, 8, i);
Put put = new Put(key);
byte[] cell = randomBytes(CELL_SIZE);
- put.add(CF, QN, cell);
+ put.addColumn(CF, QN, cell);
buffer.add(put);
}
table.put(buffer);
@@ -170,8 +171,8 @@
}
Random r = new Random();
- HConnection conn = getConnection();
- HTableInterface table = conn.getTable(tableName);
+ Connection conn = getConnection();
+ Table table = conn.getTable(TableName.valueOf(tableName));
long leftBound = getFirstKeyTime(table);
long rightBound = System.currentTimeMillis();
@@ -206,7 +207,7 @@
}
}
- private static long getFirstKeyTime(HTableInterface table) throws IOException {
+ private static long getFirstKeyTime(Table table) throws IOException {
long startTime = 0;
Scan scan = new Scan();
@@ -224,8 +225,8 @@
}
- private static HConnection getConnection() throws IOException {
- return HConnectionManager.createConnection(HBaseConnection.getCurrentHBaseConfiguration());
+ private static Connection getConnection() throws IOException {
+ return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
}
private static String formatTime(long time) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
index 2b4a9a7..095743f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
@@ -27,8 +27,11 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
@@ -71,8 +74,8 @@
}
private void alter() throws IOException {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
hbaseAdmin.disableTable(table.getTableName());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
index f0618c9..b7303a5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
@@ -31,10 +31,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +91,8 @@
private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
// get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -105,12 +111,12 @@
// drop tables
for (String htableName : allTablesNeedToBeDropped) {
logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+ if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htableName));
}
- hbaseAdmin.deleteTable(htableName);
+ hbaseAdmin.deleteTable(TableName.valueOf(htableName));
logger.info("Deleted HBase table " + htableName);
} else {
logger.info("HBase table" + htableName + " does not exist");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
index 58ef7cb..b86b561 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
@@ -21,9 +21,10 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -57,12 +58,12 @@
Scan scan = new Scan();
int limit = 20;
- HConnection conn = null;
- HTableInterface table = null;
+ Connection conn = null;
+ Table table = null;
ResultScanner scanner = null;
try {
- conn = HConnectionManager.createConnection(hconf);
- table = conn.getTable(hbaseTable);
+ conn = ConnectionFactory.createConnection(hconf);
+ table = conn.getTable(TableName.valueOf(hbaseTable));
scanner = table.getScanner(scan);
int count = 0;
for (Result r : scanner) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
index b1a5ba4..52f71cf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -23,9 +23,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.Bytes;
@@ -71,8 +72,8 @@
logger.info("My Scan " + scan.toString());
- HConnection conn = HConnectionManager.createConnection(conf);
- HTableInterface tableInterface = conn.getTable(htableName);
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table tableInterface = conn.getTable(TableName.valueOf(htableName));
Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
int counter = 0;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index c010d51..fd39ee6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -35,7 +35,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.CliCommandExecutor;
@@ -52,6 +54,7 @@
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,7 +107,8 @@
IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
// get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -148,12 +152,12 @@
// drop tables
for (String htableName : allTablesNeedToBeDropped) {
logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+ if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htableName));
}
- hbaseAdmin.deleteTable(htableName);
+ hbaseAdmin.deleteTable(TableName.valueOf(htableName));
logger.info("Deleted HBase table " + htableName);
} else {
logger.info("HBase table" + htableName + " does not exist");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
index 23357f5..3fec98c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
@@ -26,9 +26,12 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -39,7 +42,6 @@
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import com.google.common.collect.Lists;
@@ -52,14 +54,15 @@
private List<String> errorMsgs = Lists.newArrayList();
private List<String> htables;
- private HBaseAdmin hbaseAdmin;
+ private Admin hbaseAdmin;
private KylinConfig kylinConfig;
private String oldHostValue;
public UpdateHTableHostCLI(List<String> htables, String oldHostValue) throws IOException {
this.htables = htables;
this.oldHostValue = oldHostValue;
- this.hbaseAdmin = new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration());
+ Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
+ hbaseAdmin = conn.getAdmin();
this.kylinConfig = KylinConfig.getInstanceFromEnv();
}
@@ -181,9 +184,9 @@
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
if (oldHostValue.equals(desc.getValue(IRealizationConstants.HTableTag))) {
desc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
- hbaseAdmin.disableTable(tableName);
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
updatedResources.add(tableName);
}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
index cd4e33d..9585575 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -29,12 +29,10 @@
import java.util.HashSet;
import java.util.List;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.metadata.datatype.LongMutable;
@@ -233,15 +231,8 @@
return nextRaw(results);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
- * .List, int)
- */
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -310,6 +301,11 @@
return 0;
}
+ @Override
+ public int getBatch() {
+ return 0;
+ }
+
/*
* (non-Javadoc)
*
@@ -326,16 +322,9 @@
return i < input.size();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
- * .List, int)
- */
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return nextRaw(result);
+ public boolean nextRaw(List<Cell> list, ScannerContext scannerContext) throws IOException {
+ return false;
}
}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
index 1d85922..04e2e8b 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -136,7 +137,7 @@
Put p = new Put(rk);
p.setDurability(Durability.SKIP_WAL);
- p.add(cf.getBytes(), cq, Bytes.toBytes(c));
+ p.addColumn(cf.getBytes(), cq, Bytes.toBytes(c));
ht.put(p);
}
}
@@ -224,7 +225,7 @@
scan.addFamily(cf.getBytes());
scan.setFilter(filter);
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table.getBytes());
- HRegion first = regions.get(0);
+ Region first = regions.get(0);
first.getScanner(scan);
RegionScanner scanner = first.getScanner(scan);
List<Cell> results = new ArrayList<Cell>();