blob: eb45bd60b47804f3458048ef3658f88c6be1bf10 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc;
import org.apache.hugegraph.backend.query.Condition.Relation;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.IdPrefixQuery;
import org.apache.hugegraph.backend.query.IdRangeQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.backend.serializer.BinaryEntryIterator;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.StringEncoding;
public class HbaseTable extends BackendTable<HbaseSessions.Session, BackendEntry> {
private static final Logger LOG = Log.logger(HbaseStore.class);
protected static final byte[] CF = "f".getBytes();
private final HbaseShardSplitter shardSplitter;
private final boolean enablePartition;
public HbaseTable(String table, boolean enablePartition) {
this.enablePartition = enablePartition;
this.shardSplitter = new HbaseShardSplitter(this.table());
public HbaseTable(String table) {
this(table, false);
public static List<byte[]> cfs() {
return ImmutableList.of(CF);
protected void registerMetaHandlers() {
this.registerMetaHandler("splits", (session, meta, args) -> {
E.checkArgument(args.length == 1,
"The args count of %s must be 1", meta);
long splitSize = (long) args[0];
return this.shardSplitter.getSplits(session, splitSize);
public void init(HbaseSessions.Session session) {
// pass
public void clear(HbaseSessions.Session session) {
// pass
public void insert(HbaseSessions.Session session, BackendEntry entry) {
assert !entry.columns().isEmpty();
session.put(this.table(), CF,, entry.columns());
public void delete(HbaseSessions.Session session, BackendEntry entry) {
if (entry.columns().isEmpty()) {
session.delete(this.table(), CF,;
} else {
for (BackendColumn col : entry.columns()) {
session.remove(table(), CF,,;
public void append(HbaseSessions.Session session, BackendEntry entry) {
assert entry.columns().size() == 1;
this.insert(session, entry);
public void eliminate(HbaseSessions.Session session, BackendEntry entry) {
assert entry.columns().size() == 1;
this.delete(session, entry);
public boolean queryExist(HbaseSessions.Session session, BackendEntry entry) {
Id id =;
try (HbaseSessions.RowIterator iter = this.queryById(session, id)) {
return iter.hasNext();
public Number queryNumber(HbaseSessions.Session session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
if (aggregate.func() != AggregateFunc.COUNT) {
throw new NotSupportException(aggregate.toString());
assert aggregate.func() == AggregateFunc.COUNT;
try (HbaseSessions.CountSession countSession = session.countSession()) {
return this.query(countSession, query);
} catch (IOException e) {
throw new BackendException(e);
public Iterator<BackendEntry> query(HbaseSessions.Session session, Query query) {
if (query.limit() == 0L && !query.noLimit()) {
LOG.debug("Return empty result(limit=0) for query {}", query);
return Collections.emptyIterator();
HbaseSessions.HbaseSession<HbaseSessions.RowIterator> hbaseSession = session;
return this.newEntryIterator(query, this.query(hbaseSession, query));
protected <R> R query(HbaseSessions.HbaseSession<R> session, Query query) {
// Query all
if (query.empty()) {
return this.queryAll(session, query);
// Query by prefix
if (query instanceof IdPrefixQuery) {
IdPrefixQuery pq = (IdPrefixQuery) query;
return this.queryByPrefix(session, pq);
// Query by range
if (query instanceof IdRangeQuery) {
IdRangeQuery rq = (IdRangeQuery) query;
return this.queryByRange(session, rq);
// Query by id
if (query.conditionsSize() == 0) {
assert query.idsSize() > 0;
if (query.idsSize() == 1) {
Id id = query.ids().iterator().next();
return this.queryById(session, id);
} else {
return this.queryByIds(session, query.ids());
// Query by condition (or condition + id)
ConditionQuery cq = (ConditionQuery) query;
return this.queryByCond(session, cq);
protected <R> R queryAll(HbaseSessions.HbaseSession<R> session, Query query) {
if (query.paging()) {
PageState page = PageState.fromString(;
byte[] begin = page.position();
return session.scan(this.table(), begin, null);
} else {
return session.scan(this.table(), -1);
protected <R> R queryById(HbaseSessions.HbaseSession<R> session, Id id) {
return session.get(this.table(), null, id.asBytes());
protected <R> R queryByIds(HbaseSessions.HbaseSession<R> session, Collection<Id> ids) {
Set<byte[]> rowkeys = InsertionOrderUtil.newSet();
for (Id id : ids) {
return session.get(this.table(), null, rowkeys);
protected <R> R queryByPrefix(HbaseSessions.HbaseSession<R> session,
IdPrefixQuery query) {
return session.scan(this.table(), query.start().asBytes(),
query.inclusiveStart(), query.prefix().asBytes());
protected <R> R queryByRange(HbaseSessions.HbaseSession<R> session, IdRangeQuery query) {
byte[] start = query.start().asBytes();
byte[] end = query.end() == null ? null : query.end().asBytes();
return session.scan(this.table(), start, query.inclusiveStart(),
end, query.inclusiveEnd());
protected <R> R queryByCond(HbaseSessions.HbaseSession<R> session, ConditionQuery query) {
if (query.containsScanRelation()) {
E.checkArgument(query.relations().size() == 1,
"Invalid scan with multi conditions: %s", query);
Relation scan = query.relations().iterator().next();
Shard shard = (Shard) scan.value();
return this.queryByRange(session, shard,;
throw new NotSupportException("query: %s", query);
protected <R> R queryByRange(HbaseSessions.HbaseSession<R> session,
Shard shard, String page) {
byte[] start = this.shardSplitter.position(shard.start());
byte[] end = this.shardSplitter.position(shard.end());
if (page != null && !page.isEmpty()) {
byte[] position = PageState.fromString(page).position();
E.checkArgument(start == null ||, start) >= 0,
"Invalid page out of lower bound");
start = position;
return session.scan(this.table(), start, end);
protected BackendEntryIterator newEntryIterator(Query query,
HbaseSessions.RowIterator rows) {
return new BinaryEntryIterator<>(rows, query, (entry, row) -> {
E.checkState(!row.isEmpty(), "Can't parse empty HBase result");
byte[] id = row.getRow();
if (entry == null || !Bytes.prefixWith(id, {
HugeType type = query.resultType();
// NOTE: only support BinaryBackendEntry currently
entry = new BinaryBackendEntry(type, id, this.enablePartition);
try {
this.parseRowColumns(row, entry, query, this.enablePartition);
} catch (IOException e) {
throw new BackendException("Failed to read HBase columns", e);
return entry;
protected void parseRowColumns(Result row, BackendEntry entry, Query query,
boolean enablePartition)
throws IOException {
CellScanner cellScanner = row.cellScanner();
while (cellScanner.advance()) {
Cell cell = cellScanner.current();
private static class HbaseShardSplitter extends ShardSplitter<HbaseSessions.Session> {
public HbaseShardSplitter(String table) {
public List<Shard> getSplits(HbaseSessions.Session session, long splitSize) {
E.checkArgument(splitSize >= MIN_SHARD_SIZE,
"The split-size must be >= %s bytes, but got %s",
MIN_SHARD_SIZE, splitSize);
List<Shard> shards = new ArrayList<>();
String namespace = session.namespace();
String table = this.table();
// Calc data size for each region
Map<String, Double> regionSizes = regionSizes(session, namespace,
// Get token range of each region
Map<String, Range> regionRanges = regionRanges(session, namespace,
// Split regions to shards
for (Map.Entry<String, Double> rs : regionSizes.entrySet()) {
String region = rs.getKey();
double size = rs.getValue();
Range range = regionRanges.get(region);
int count = calcSplitCount(size, splitSize);
return shards;
private static Map<String, Double> regionSizes(HbaseSessions.Session session,
String namespace,
String table) {
Map<String, Double> regionSizes = new HashMap<>();
try (Admin admin = session.hbase().getAdmin()) {
TableName tableName = TableName.valueOf(namespace, table);
for (ServerName serverName : admin.getRegionServers()) {
List<RegionMetrics> metrics = admin.getRegionMetrics(
serverName, tableName);
for (RegionMetrics metric : metrics) {
double size = metric.getStoreFileSize()
size += metric.getMemStoreSize().get(Size.Unit.BYTE);
regionSizes.put(metric.getNameAsString(), size);
} catch (Throwable e) {
throw new BackendException(String.format(
"Failed to get region sizes of %s(%s)",
table, namespace), e);
return regionSizes;
private static Map<String, Range> regionRanges(HbaseSessions.Session session,
String namespace,
String table) {
Map<String, Range> regionRanges = InsertionOrderUtil.newMap();
TableName tableName = TableName.valueOf(namespace, table);
try (Admin admin = session.hbase().getAdmin()) {
for (RegionInfo regionInfo : admin.getRegions(tableName)) {
byte[] start = regionInfo.getStartKey();
byte[] end = regionInfo.getEndKey();
new Range(start, end));
} catch (Throwable e) {
throw new BackendException(String.format(
"Failed to get region ranges of %s(%s)",
table, namespace), e);
return regionRanges;
private static int calcSplitCount(double totalSize, long splitSize) {
return (int) Math.ceil(totalSize / splitSize);
public byte[] position(String position) {
if (START.equals(position) || END.equals(position)) {
return null;
return StringEncoding.decodeBase64(position);
public long estimateDataSize(HbaseSessions.Session session) {
try {
return session.storeSize(this.table());
} catch (IOException ignored) {
return -1L;
public long estimateNumKeys(HbaseSessions.Session session) {
// TODO: improve
return 100000L;