| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hugegraph.backend.store.hbase; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.Future; |
| import org.apache.hugegraph.util.Log; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellScanner; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.NamespaceDescriptor; |
| import org.apache.hadoop.hbase.RegionMetrics; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.Size; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.TableNotDisabledException; |
| import org.apache.hadoop.hbase.TableNotEnabledException; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Row; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; |
| import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; |
| import org.apache.hadoop.hbase.filter.FilterList; |
| import org.apache.hadoop.hbase.filter.FilterList.Operator; |
| import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; |
| import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange; |
| import org.apache.hadoop.hbase.filter.PageFilter; |
| import org.apache.hadoop.hbase.filter.PrefixFilter; |
| import org.apache.hadoop.hbase.util.VersionInfo; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.slf4j.Logger; |
| |
| import org.apache.hugegraph.backend.BackendException; |
| import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; |
| import org.apache.hugegraph.backend.store.BackendEntry.BackendIterator; |
| import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession; |
| import org.apache.hugegraph.backend.store.BackendSessionPool; |
| import org.apache.hugegraph.config.HugeConfig; |
| import org.apache.hugegraph.exception.NotSupportException; |
| import org.apache.hugegraph.util.Bytes; |
| import org.apache.hugegraph.util.E; |
| import org.apache.hugegraph.util.StringEncoding; |
| import org.apache.hugegraph.util.VersionUtil; |
| import com.google.common.util.concurrent.Futures; |
| |
| public class HbaseSessions extends BackendSessionPool { |
| |
| private static final Logger LOG = Log.logger(HbaseSessions.class); |
| |
| private static final String COPROCESSOR_AGGR = |
| "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; |
| private static final long SCANNER_CACHING = 1000L; |
| |
| private final String namespace; |
| private Connection hbase; |
| |
| public HbaseSessions(HugeConfig config, String namespace, String store) { |
| super(config, namespace + "/" + store); |
| this.namespace = namespace; |
| } |
| |
| protected Connection hbase() { |
| E.checkState(this.hbase != null, "HBase connection is not opened"); |
| return this.hbase; |
| } |
| |
| private Table table(String table) throws IOException { |
| E.checkState(this.hbase != null, "HBase connection is not opened"); |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| return this.hbase.getTable(tableName); |
| } |
| |
| private AggregationClient aggregationClient() { |
| Configuration hConfig = this.hbase.getConfiguration(); |
| hConfig = HBaseConfiguration.create(hConfig); |
| long timeout = this.config().get(HbaseOptions.AGGR_TIMEOUT); |
| hConfig.setLong("hbase.rpc.timeout", timeout * 1000L); |
| hConfig.setLong("hbase.client.scanner.caching", SCANNER_CACHING); |
| return new AggregationClient(hConfig); |
| } |
| |
| @Override |
| public synchronized void open() throws IOException { |
| HugeConfig config = this.config(); |
| String hosts = config.get(HbaseOptions.HBASE_HOSTS); |
| int port = config.get(HbaseOptions.HBASE_PORT); |
| String znodeParent = config.get(HbaseOptions.HBASE_ZNODE_PARENT); |
| boolean isEnableKerberos = config.get(HbaseOptions.HBASE_KERBEROS_ENABLE); |
| Configuration hConfig = HBaseConfiguration.create(); |
| hConfig.set(HConstants.ZOOKEEPER_QUORUM, hosts); |
| hConfig.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port)); |
| hConfig.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); |
| |
| hConfig.setInt("zookeeper.recovery.retry", |
| config.get(HbaseOptions.HBASE_ZK_RETRY)); |
| |
| // Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256) |
| hConfig.setInt("hbase.hconnection.threads.max", |
| config.get(HbaseOptions.HBASE_THREADS_MAX)); |
| |
| String hbaseSite = config.get(HbaseOptions.HBASE_HBASE_SITE); |
| hConfig.addResource(new Path(hbaseSite)); |
| |
| if (isEnableKerberos) { |
| String krb5Conf = config.get(HbaseOptions.HBASE_KRB5_CONF); |
| System.setProperty("java.security.krb5.conf", krb5Conf); |
| String principal = config.get(HbaseOptions.HBASE_KERBEROS_PRINCIPAL); |
| String keyTab = config.get(HbaseOptions.HBASE_KERBEROS_KEYTAB); |
| hConfig.set("hadoop.security.authentication", "kerberos"); |
| hConfig.set("hbase.security.authentication", "kerberos"); |
| |
| // login/authenticate using keytab |
| UserGroupInformation.setConfiguration(hConfig); |
| UserGroupInformation.loginUserFromKeytab(principal, keyTab); |
| } |
| this.hbase = ConnectionFactory.createConnection(hConfig); |
| } |
| |
| @Override |
| protected synchronized boolean opened() { |
| // NOTE: isClosed() seems to always return true even if not connected |
| return this.hbase != null && !this.hbase.isClosed(); |
| } |
| |
| @Override |
| public final Session session() { |
| return (Session) super.getOrNewSession(); |
| } |
| |
| @Override |
| protected Session newSession() { |
| return new Session(); |
| } |
| |
| @Override |
| protected synchronized void doClose() { |
| if (this.hbase == null || this.hbase.isClosed()) { |
| return; |
| } |
| try { |
| this.hbase.close(); |
| } catch (IOException e) { |
| throw new BackendException("Failed to close HBase connection", e); |
| } |
| } |
| |
| public boolean existsNamespace() throws IOException { |
| try (Admin admin = this.hbase.getAdmin()) { |
| for (NamespaceDescriptor ns : admin.listNamespaceDescriptors()) { |
| if (this.namespace.equals(ns.getName())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| public void createNamespace() throws IOException { |
| NamespaceDescriptor ns = NamespaceDescriptor.create(this.namespace) |
| .build(); |
| try (Admin admin = this.hbase.getAdmin()) { |
| admin.createNamespace(ns); |
| } |
| } |
| |
| public void dropNamespace() throws IOException { |
| try (Admin admin = this.hbase.getAdmin()) { |
| admin.deleteNamespace(this.namespace); |
| } |
| } |
| |
| public boolean existsTable(String table) throws IOException { |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| try (Admin admin = this.hbase.getAdmin()) { |
| return admin.tableExists(tableName); |
| } |
| } |
| |
| public void createTable(String table, List<byte[]> cfs) throws IOException { |
| TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder( |
| TableName.valueOf(this.namespace, table)); |
| for (byte[] cf : cfs) { |
| tdb.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf) |
| .build()); |
| } |
| tdb.setCoprocessor(COPROCESSOR_AGGR); |
| try (Admin admin = this.hbase.getAdmin()) { |
| admin.createTable(tdb.build()); |
| } |
| } |
| |
| public void createPreSplitTable(String table, List<byte[]> cfs, |
| short numOfPartitions) throws IOException { |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder( |
| TableName.valueOf(this.namespace, table)); |
| for (byte[] cf : cfs) { |
| builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf).build()); |
| } |
| byte[][] splits = new byte[numOfPartitions - 1] |
| [org.apache.hadoop.hbase.util.Bytes.SIZEOF_SHORT]; |
| for (short split = 1; split < numOfPartitions; split++) { |
| splits[split - 1] = org.apache.hadoop.hbase.util.Bytes.toBytes(split); |
| } |
| builder.setCoprocessor(COPROCESSOR_AGGR); |
| try (Admin admin = this.hbase.getAdmin()) { |
| admin.createTable(builder.build(), splits); |
| } |
| } |
| |
| public void dropTable(String table) throws IOException { |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| try (Admin admin = this.hbase.getAdmin()) { |
| try { |
| admin.disableTable(tableName); |
| } catch (TableNotEnabledException ignored) { |
| // pass |
| } |
| admin.deleteTable(tableName); |
| } |
| } |
| |
| public void enableTable(String table) throws IOException { |
| assert this.existsTable(table); |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| try (Admin admin = this.hbase.getAdmin()) { |
| if (admin.isTableEnabled(tableName)) { |
| return; |
| } |
| try { |
| admin.enableTable(tableName); |
| } catch (TableNotDisabledException ignored) { |
| // pass |
| } |
| } |
| } |
| |
| public Future<Void> disableTableAsync(String table) throws IOException { |
| assert this.existsTable(table); |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| try (Admin admin = this.hbase.getAdmin()) { |
| try { |
| return admin.disableTableAsync(tableName); |
| } catch (TableNotEnabledException ignored) { |
| // Ignore if it's disabled |
| return Futures.immediateFuture(null); |
| } |
| } |
| } |
| |
| public Future<Void> truncateTableAsync(String table) throws IOException { |
| assert this.existsTable(table); |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| try (Admin admin = this.hbase.getAdmin()) { |
| return admin.truncateTableAsync(tableName, true); |
| } |
| } |
| |
| public void compactTables(List<String> tableNames) throws IOException { |
| try (Admin admin = this.hbase.getAdmin()) { |
| for (String table : tableNames) { |
| admin.compact(TableName.valueOf(this.namespace, table)); |
| } |
| } |
| } |
| |
| public long storeSize(String table) throws IOException { |
| long total = 0L; |
| try (Admin admin = this.hbase.getAdmin()) { |
| for (ServerName rs : admin.getRegionServers()) { |
| // NOTE: we can use getLoad() before hbase 2.0 |
| //ServerLoad load = admin.getClusterStatus().getLoad(rs); |
| //total += load.getStorefileSizeMB() * Bytes.MB; |
| //total += load.getMemStoreSizeMB() * Bytes.MB; |
| TableName tableName = TableName.valueOf(this.namespace, table); |
| for (RegionMetrics m : admin.getRegionMetrics(rs, tableName)) { |
| Double storeFileSize = m.getStoreFileSize().get(Size.Unit.BYTE); |
| total += storeFileSize.longValue(); |
| |
| Double memStoreFileSize = m.getMemStoreSize().get(Size.Unit.BYTE); |
| total += memStoreFileSize.longValue(); |
| } |
| } |
| } |
| return total; |
| } |
| |
| /** |
| * Session interface for HBase |
| */ |
| public interface HbaseSession<R> { |
| |
| /** |
| * Add a row record to a table |
| */ |
| void put(String table, byte[] family, byte[] rowkey, |
| Collection<BackendColumn> columns); |
| |
| /** |
| * Add a row record to a table(can be used when adding an index) |
| */ |
| void put(String table, byte[] family, byte[] rowkey, byte[] qualifier, |
| byte[] value); |
| |
| /** |
| * Delete a record by rowkey and qualifier from a table |
| */ |
| default void remove(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier) { |
| this.remove(table, family, rowkey, qualifier, false); |
| } |
| |
| /** |
| * Delete a record by rowkey and qualifier from a table, |
| * just delete the latest version of the specified column if need |
| */ |
| void remove(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, boolean latestVersion); |
| |
| /** |
| * Delete a record by rowkey from a table |
| */ |
| void delete(String table, byte[] family, byte[] rowkey); |
| |
| /** |
| * Get a record by rowkey and qualifier from a table |
| */ |
| R get(String table, byte[] family, byte[] rowkey, byte[] qualifier); |
| |
| /** |
| * Get a record by rowkey from a table |
| */ |
| R get(String table, byte[] family, byte[] rowkey); |
| |
| /** |
| * Get multi records by rowkeys from a table |
| */ |
| R get(String table, byte[] family, Set<byte[]> rowkeys); |
| |
| /** |
| * Scan all records from a table |
| */ |
| default R scan(String table, long limit) { |
| Scan scan = new Scan(); |
| if (limit >= 0) { |
| scan.setFilter(new PageFilter(limit)); |
| } |
| return this.scan(table, scan); |
| } |
| |
| /** |
| * Scan records by rowkey prefix from a table |
| */ |
| default R scan(String table, byte[] prefix) { |
| return this.scan(table, prefix, true, prefix); |
| } |
| |
| /** |
| * Scan records by multi rowkey prefixes from a table |
| */ |
| default R scan(String table, Set<byte[]> prefixes) { |
| FilterList orFilters = new FilterList(Operator.MUST_PASS_ONE); |
| for (byte[] prefix : prefixes) { |
| FilterList andFilters = new FilterList(Operator.MUST_PASS_ALL); |
| List<RowRange> ranges = new ArrayList<>(); |
| ranges.add(new RowRange(prefix, true, null, true)); |
| andFilters.addFilter(new MultiRowRangeFilter(ranges)); |
| andFilters.addFilter(new PrefixFilter(prefix)); |
| |
| orFilters.addFilter(andFilters); |
| } |
| |
| Scan scan = new Scan().setFilter(orFilters); |
| return this.scan(table, scan); |
| } |
| |
| /** |
| * Scan records by rowkey start and prefix from a table |
| */ |
| default R scan(String table, byte[] startRow, boolean inclusiveStart, |
| byte[] prefix) { |
| Scan scan = new Scan().withStartRow(startRow, inclusiveStart) |
| .setFilter(new PrefixFilter(prefix)); |
| return this.scan(table, scan); |
| } |
| |
| /** |
| * Scan records by rowkey range from a table |
| */ |
| default R scan(String table, byte[] startRow, byte[] stopRow) { |
| return this.scan(table, startRow, true, stopRow, false); |
| } |
| |
| /** |
| * Scan records by rowkey range from a table |
| */ |
| default R scan(String table, byte[] startRow, boolean inclusiveStart, |
| byte[] stopRow, boolean inclusiveStop) { |
| Scan scan = new Scan().withStartRow(startRow, inclusiveStart); |
| if (stopRow != null) { |
| scan.withStopRow(stopRow, inclusiveStop); |
| } |
| return this.scan(table, scan); |
| } |
| |
| /** |
| * Inner scan: send scan request to HBase and get iterator |
| */ |
| R scan(String table, Scan scan); |
| |
| /** |
| * Increase a counter by rowkey and qualifier to a table |
| */ |
| long increase(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, long value); |
| } |
| |
| /** |
| * Session implement for HBase |
| */ |
| public class Session extends AbstractBackendSession |
| implements HbaseSession<RowIterator> { |
| |
| private final Map<String, List<Row>> batch; |
| |
| public Session() { |
| this.batch = new HashMap<>(); |
| } |
| |
| private void batch(String table, Row row) { |
| List<Row> rows = this.batch.get(table); |
| if (rows == null) { |
| rows = new ArrayList<>(); |
| this.batch.put(table, rows); |
| } |
| rows.add(row); |
| } |
| |
| private int batchSize() { |
| int size = 0; |
| for (List<Row> puts : this.batch.values()) { |
| size += puts.size(); |
| } |
| return size; |
| } |
| |
| private void checkBatchResults(Object[] results, List<Row> rows) |
| throws Throwable { |
| assert rows.size() == results.length; |
| for (int i = 0; i < results.length; i++) { |
| Object result = results[i]; |
| if (result instanceof Throwable) { |
| throw (Throwable) result; |
| } |
| if (result == null || !((Result) result).isEmpty()) { |
| throw new BackendException("Failed batch for row: %s", |
| rows.get(i)); |
| } |
| } |
| } |
| |
| public Connection hbase() { |
| return HbaseSessions.this.hbase(); |
| } |
| |
| public String namespace() { |
| return HbaseSessions.this.namespace; |
| } |
| |
| @Override |
| public void open() { |
| this.opened = true; |
| } |
| |
| @Override |
| public void close() { |
| assert this.closeable(); |
| this.opened = false; |
| } |
| |
| @Override |
| public boolean closed() { |
| return !this.opened || !HbaseSessions.this.opened(); |
| } |
| |
| /** |
| * Any change in the session |
| */ |
| @Override |
| public boolean hasChanges() { |
| return this.batch.size() > 0; |
| } |
| |
| /** |
| * Commit all updates(put/delete) to DB |
| */ |
| @Override |
| public Integer commit() { |
| int count = this.batchSize(); |
| if (count <= 0) { |
| return 0; |
| } |
| |
| // TODO: this will not be atomic, to be improved |
| for (Entry<String, List<Row>> action : this.batch.entrySet()) { |
| List<Row> rows = action.getValue(); |
| Object[] results = new Object[rows.size()]; |
| try (Table table = table(action.getKey())) { |
| table.batch(rows, results); |
| checkBatchResults(results, rows); |
| } catch (InterruptedIOException e) { |
| throw new BackendException("Interrupted, " + |
| "maybe it is timed out", e); |
| } catch (Throwable e) { |
| // TODO: Mark and delete committed records |
| throw new BackendException("Failed to commit, " + |
| "there may be inconsistent states for HBase", e); |
| } |
| } |
| |
| // Clear batch if write() successfully (retained if failed) |
| this.batch.clear(); |
| |
| return count; |
| } |
| |
| /** |
| * Rollback all updates(put/delete) not committed |
| */ |
| @Override |
| public void rollback() { |
| this.batch.clear(); |
| } |
| |
| /** |
| * Add a row record to a table |
| */ |
| @Override |
| public void put(String table, byte[] family, byte[] rowkey, |
| Collection<BackendColumn> columns) { |
| Put put = new Put(rowkey); |
| for (BackendColumn column : columns) { |
| put.addColumn(family, column.name, column.value); |
| } |
| this.batch(table, put); |
| } |
| |
| /** |
| * Add a row record to a table with ttl |
| */ |
| public void put(String table, byte[] family, byte[] rowkey, |
| Collection<BackendColumn> columns, long ttl) { |
| Put put = new Put(rowkey); |
| for (BackendColumn column : columns) { |
| put.addColumn(family, column.name, column.value); |
| } |
| put.setTTL(ttl); |
| this.batch(table, put); |
| } |
| |
| /** |
| * Add a row record to a table(can be used when adding an index) |
| */ |
| @Override |
| public void put(String table, byte[] family, |
| byte[] rowkey, byte[] qualifier, byte[] value) { |
| Put put = new Put(rowkey); |
| put.addColumn(family, qualifier, value); |
| this.batch(table, put); |
| } |
| |
| /** |
| * Add a row record to a table with ttl for index |
| */ |
| public void put(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, byte[] value, long ttl) { |
| Put put = new Put(rowkey); |
| put.addColumn(family, qualifier, value); |
| put.setTTL(ttl); |
| this.batch(table, put); |
| } |
| |
| /** |
| * Delete a record by rowkey and qualifier from a table |
| */ |
| @Override |
| public void remove(String table, byte[] family, |
| byte[] rowkey, byte[] qualifier) { |
| this.remove(table, family, rowkey, qualifier, false); |
| } |
| |
| /** |
| * Delete a record by rowkey and qualifier from a table, |
| * just delete the latest version of the specified column if need |
| */ |
| @Override |
| public void remove(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, boolean latestVersion) { |
| assert family != null; |
| assert rowkey != null; |
| E.checkArgument(qualifier != null, |
| "HBase qualifier can't be null when removing"); |
| Delete delete = new Delete(rowkey); |
| if (latestVersion) { |
| // Just delete the latest version of the specified column |
| delete.addColumn(family, qualifier); |
| } else { |
| // Delete all versions of the specified column |
| delete.addColumns(family, qualifier); |
| } |
| this.batch(table, delete); |
| } |
| |
| /** |
| * Delete a record by rowkey from a table |
| */ |
| @Override |
| public void delete(String table, byte[] family, byte[] rowkey) { |
| assert rowkey != null; |
| Delete delete = new Delete(rowkey); |
| if (family != null) { |
| delete.addFamily(family); |
| } |
| this.batch(table, delete); |
| } |
| |
| /** |
| * Get a record by rowkey and qualifier from a table |
| */ |
| @Override |
| public RowIterator get(String table, byte[] family, |
| byte[] rowkey, byte[] qualifier) { |
| assert !this.hasChanges(); |
| |
| Get get = new Get(rowkey); |
| get.addColumn(family, qualifier); |
| |
| try (Table htable = table(table)) { |
| return new RowIterator(htable.get(get)); |
| } catch (IOException e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| /** |
| * Get a record by rowkey from a table |
| */ |
| @Override |
| public RowIterator get(String table, byte[] family, byte[] rowkey) { |
| assert !this.hasChanges(); |
| |
| Get get = new Get(rowkey); |
| if (family != null) { |
| get.addFamily(family); |
| } |
| |
| try (Table htable = table(table)) { |
| return new RowIterator(htable.get(get)); |
| } catch (IOException e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| /** |
| * Get multi records by rowkeys from a table |
| */ |
| @Override |
| public RowIterator get(String table, byte[] family, |
| Set<byte[]> rowkeys) { |
| assert !this.hasChanges(); |
| |
| List<Get> gets = new ArrayList<>(rowkeys.size()); |
| for (byte[] rowkey : rowkeys) { |
| Get get = new Get(rowkey); |
| if (family != null) { |
| get.addFamily(family); |
| } |
| gets.add(get); |
| } |
| |
| try (Table htable = table(table)) { |
| return new RowIterator(htable.get(gets)); |
| } catch (IOException e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| /** |
| * Scan records by rowkey range from a table |
| */ |
| @Override |
| public RowIterator scan(String table, |
| byte[] startRow, boolean inclusiveStart, |
| byte[] stopRow, boolean inclusiveStop) { |
| assert !this.hasChanges(); |
| |
| Scan scan = new Scan(); |
| if (startRow != null) { |
| // Refer: https://issues.apache.org/jira/browse/HBASE-16498 (Bug Fix) |
| scan.withStartRow(startRow, inclusiveStart); |
| } |
| if (stopRow != null) { |
| String version = VersionInfo.getVersion(); |
| if (inclusiveStop && !VersionUtil.gte(version, "2.0")) { |
| // The parameter stoprow-inclusive doesn't work before v2.0 |
| // https://issues.apache.org/jira/browse/HBASE-20675 |
| inclusiveStop = false; |
| // Add a trailing 0 byte to stopRow |
| stopRow = Arrays.copyOf(stopRow, stopRow.length + 1); |
| } |
| if (Bytes.equals(startRow, stopRow) && |
| inclusiveStart && !inclusiveStop) { |
| // Bug https://issues.apache.org/jira/browse/HBASE-21618 |
| return new RowIterator(); |
| } |
| scan.withStopRow(stopRow, inclusiveStop); |
| } |
| return this.scan(table, scan); |
| } |
| |
| /** |
| * Inner scan: send scan request to HBase and get iterator |
| */ |
| @Override |
| public RowIterator scan(String table, Scan scan) { |
| assert !this.hasChanges(); |
| |
| try (Table htable = table(table)) { |
| return new RowIterator(htable.getScanner(scan)); |
| } catch (IOException e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| /** |
| * Increase a counter by rowkey and qualifier to a table |
| */ |
| @Override |
| public long increase(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, long value) { |
| try (Table htable = table(table)) { |
| return htable.incrementColumnValue(rowkey, family, |
| qualifier, value); |
| } catch (IOException e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| /** |
| * Get store size of specified table |
| */ |
| public long storeSize(String table) throws IOException { |
| return HbaseSessions.this.storeSize(table); |
| } |
| |
| /** |
| * Just for debug |
| */ |
| @SuppressWarnings("unused") |
| private void dump(String table, Scan scan) throws IOException { |
| LOG.info(String.format(">>>> scan table {} with {}", table, scan)); |
| RowIterator iterator = this.scan(table, scan); |
| while (iterator.hasNext()) { |
| Result row = iterator.next(); |
| LOG.info(StringEncoding.format(row.getRow())); |
| CellScanner cellScanner = row.cellScanner(); |
| while (cellScanner.advance()) { |
| Cell cell = cellScanner.current(); |
| byte[] key = CellUtil.cloneQualifier(cell); |
| byte[] val = CellUtil.cloneValue(cell); |
| LOG.info(" {}={}", |
| StringEncoding.format(key), |
| StringEncoding.format(val)); |
| } |
| } |
| } |
| |
| public CountSession countSession() { |
| return new CountSession(this); |
| } |
| } |
| |
| public class CountSession implements HbaseSession<Number>, AutoCloseable { |
| |
| private final Session origin; |
| private final AggregationClient aggrClient; |
| |
| public CountSession(Session origin) { |
| this.origin = origin; |
| this.aggrClient = aggregationClient(); |
| } |
| |
| @Override |
| public void put(String table, byte[] family, byte[] rowkey, |
| Collection<BackendColumn> columns) { |
| throw new NotSupportException("AggrSession.put"); |
| } |
| |
| @Override |
| public void put(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, byte[] value) { |
| throw new NotSupportException("AggrSession.put"); |
| } |
| |
| @Override |
| public void remove(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, boolean latestVersion) { |
| throw new NotSupportException("AggrSession.remove"); |
| } |
| |
| @Override |
| public void delete(String table, byte[] family, byte[] rowkey) { |
| throw new NotSupportException("AggrSession.delete"); |
| } |
| |
| @Override |
| public Number get(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier) { |
| return count(this.origin.get(table, family, rowkey, qualifier)); |
| } |
| |
| @Override |
| public Number get(String table, byte[] family, byte[] rowkey) { |
| return count(this.origin.get(table, family, rowkey)); |
| } |
| |
| @Override |
| public Number get(String table, byte[] family, Set<byte[]> rowkeys) { |
| return count(this.origin.get(table, family, rowkeys)); |
| } |
| |
| private long count(RowIterator iter) { |
| long count = 0L; |
| while (iter.hasNext()) { |
| if (!iter.next().isEmpty()) { |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| @Override |
| public Number scan(String table, Scan scan) { |
| LongColumnInterpreter ci = new LongColumnInterpreter(); |
| try { |
| return this.aggrClient.rowCount(table(table), ci, scan); |
| } catch (Throwable e) { |
| throw new BackendException(e); |
| } |
| } |
| |
| @Override |
| public long increase(String table, byte[] family, byte[] rowkey, |
| byte[] qualifier, long value) { |
| throw new NotSupportException("AggrSession.increase"); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.aggrClient.close(); |
| } |
| } |
| |
| protected static class RowIterator implements BackendIterator<Result> { |
| |
| private final ResultScanner resultScanner; |
| private final Iterator<Result> results; |
| |
| private byte[] position = null; |
| |
| public RowIterator(ResultScanner resultScanner) { |
| this.resultScanner = resultScanner; |
| this.results = resultScanner.iterator(); |
| } |
| |
| public RowIterator(Result... results) { |
| this.resultScanner = null; |
| List<Result> rs = new ArrayList<>(results.length); |
| for (Result result : results) { |
| // Get by Ids may return empty result |
| if (!result.isEmpty()) { |
| rs.add(result); |
| } |
| } |
| this.results = rs.iterator(); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| boolean has = this.results.hasNext(); |
| if (!has) { |
| this.position = null; |
| this.close(); |
| } |
| return has; |
| } |
| |
| @Override |
| public Result next() { |
| // Reset position due to results.next() may throw ex |
| this.position = null; |
| |
| Result next = this.results.next(); |
| |
| // Update position for paging |
| this.position = next.getRow(); |
| |
| return next; |
| } |
| |
| @Override |
| public void close() { |
| if (this.resultScanner != null) { |
| this.resultScanner.close(); |
| } |
| } |
| |
| @Override |
| public byte[] position() { |
| return this.position; |
| } |
| } |
| } |