blob: 894964aa46432bb36c09e00d89e4677bb71765be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.hstore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.Condition.Relation;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.IdPrefixQuery;
import org.apache.hugegraph.backend.query.IdRangeQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.backend.serializer.BinaryEntryIterator;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import org.apache.hugegraph.backend.store.BackendEntryIterator;
import org.apache.hugegraph.backend.store.BackendTable;
import org.apache.hugegraph.backend.store.Shard;
import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Countable;
import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.pd.client.PDClient;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.store.HgOwnerKey;
import org.apache.hugegraph.store.client.util.HgStoreClientConst;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.StringEncoding;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
/**
* This class provides the implementation for the HStore table in the backend store.
* It provides methods for querying, inserting, deleting, and updating entries in the table.
* It also provides methods for handling metadata and managing shards.
*/
public class HstoreTable extends BackendTable<Session, BackendEntry> {
private static final Logger LOG = Log.logger(HstoreStore.class);
private final HstoreShardSplitter shardSpliter;
Function<BackendEntry, byte[]> ownerDelegate = this::getOwner;
Function<Id, byte[]> ownerByIdDelegate = this::getOwnerId;
BiFunction<HugeType, Id, byte[]> ownerByQueryDelegate = this::getOwnerId;
Supplier<byte[]> ownerScanDelegate = () -> HgStoreClientConst.ALL_PARTITION_OWNER;
public HstoreTable(String database, String table) {
super(String.format("%s+%s", database, table));
this.shardSpliter = new HstoreShardSplitter(this.table());
}
public static ConditionQuery removeDirectionCondition(ConditionQuery conditionQuery) {
Collection<Condition> conditions = conditionQuery.conditions();
List<Condition> newConditions = new ArrayList<>();
for (Condition condition : conditions) {
if (!direction(condition)) {
newConditions.add(condition);
}
}
if (newConditions.size() > 0) {
conditionQuery.resetConditions(newConditions);
return conditionQuery;
} else {
return null;
}
}
private static boolean direction(Condition condition) {
boolean direction = true;
List<? extends Relation> relations = condition.relations();
for (Relation r : relations) {
if (!r.key().equals(HugeKeys.DIRECTION)) {
direction = false;
break;
}
}
return direction;
}
protected static BackendEntryIterator newEntryIterator(
BackendColumnIterator cols, Query query) {
return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
if (entry == null || !entry.belongToMe(col)) {
HugeType type = query.resultType();
// NOTE: only support BinaryBackendEntry currently
entry = new BinaryBackendEntry(type, col.name);
}
entry.columns(col);
return entry;
});
}
protected static BackendEntryIterator newEntryIteratorOlap(
BackendColumnIterator cols, Query query, boolean isOlap) {
return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
if (entry == null || !entry.belongToMe(col)) {
HugeType type = query.resultType();
// NOTE: only support BinaryBackendEntry currently
entry = new BinaryBackendEntry(type, col.name, false, isOlap);
}
entry.columns(col);
return entry;
});
}
public static String bytes2String(byte[] bytes) {
StringBuilder result = new StringBuilder();
for (byte b : bytes) {
String st = String.format("%02x", b);
result.append(st);
}
return result.toString();
}
@Override
protected void registerMetaHandlers() {
this.registerMetaHandler("splits", (session, meta, args) -> {
E.checkArgument(args.length == 1,
"The args count of %s must be 1", meta);
long splitSize = (long) args[0];
return this.shardSpliter.getSplits(session, splitSize);
});
}
@Override
public void init(Session session) {
// pass
}
@Override
public void clear(Session session) {
// pass
}
public boolean isOlap() {
return false;
}
private byte[] getOwner(BackendEntry entry) {
if (entry == null) {
return HgStoreClientConst.ALL_PARTITION_OWNER;
}
Id id = entry.type().isIndex() ? entry.id() : entry.originId();
return getOwnerId(id);
}
public Supplier<byte[]> getOwnerScanDelegate() {
return ownerScanDelegate;
}
public byte[] getInsertEdgeOwner(BackendEntry entry) {
Id id = entry.originId();
id = ((EdgeId) id).ownerVertexId();
return id.asBytes();
}
public byte[] getInsertOwner(BackendEntry entry) {
// 为适应 label 索引散列,不聚焦在一个分区
if (entry.type().isLabelIndex() && (entry.columns().size() == 1)) {
Iterator<BackendColumn> iterator = entry.columns().iterator();
while (iterator.hasNext()) {
BackendColumn next = iterator.next();
return next.name;
}
}
Id id = entry.type().isIndex() ? entry.id() : entry.originId();
return getOwnerId(id);
}
/**
* 返回 Id 所属的点 ID
*
* @param id
* @return
*/
protected byte[] getOwnerId(Id id) {
if (id instanceof BinaryBackendEntry.BinaryId) {
id = ((BinaryBackendEntry.BinaryId) id).origin();
}
if (id != null && id.edge()) {
id = ((EdgeId) id).ownerVertexId();
}
return id != null ? id.asBytes() :
HgStoreClientConst.ALL_PARTITION_OWNER;
}
/**
* 返回 Id 所属的点 ID
*
* @param id
* @return
*/
protected byte[] getOwnerId(HugeType type, Id id) {
if (type.equals(HugeType.VERTEX) || type.equals(HugeType.EDGE) ||
type.equals(HugeType.EDGE_OUT) || type.equals(HugeType.EDGE_IN) ||
type.equals(HugeType.COUNTER)) {
return getOwnerId(id);
} else {
return HgStoreClientConst.ALL_PARTITION_OWNER;
}
}
@Override
public void insert(Session session, BackendEntry entry) {
byte[] owner = entry.type().isEdge() ? getInsertEdgeOwner(entry) : getInsertOwner(entry);
ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns());
for (int i = 0; i < columns.size(); i++) {
BackendColumn col = columns.get(i);
session.put(this.table(), owner, col.name, col.value);
}
}
public void insert(Session session, BackendEntry entry, boolean isEdge) {
byte[] owner = isEdge ? getInsertEdgeOwner(entry) : getInsertOwner(entry);
ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns());
for (int i = 0; i < columns.size(); i++) {
BackendColumn col = columns.get(i);
session.put(this.table(), owner, col.name, col.value);
}
}
@Override
public void delete(Session session, BackendEntry entry) {
byte[] ownerKey = ownerDelegate.apply(entry);
if (entry.columns().isEmpty()) {
byte[] idBytes = entry.id().asBytes();
// LOG.debug("Delete from {} with owner {}, id: {}",
// this.table(), bytes2String(ownerKey), idBytes);
session.delete(this.table(), ownerKey, idBytes);
} else {
for (BackendColumn col : entry.columns()) {
// LOG.debug("Delete from {} with owner {}, id: {}",
// this.table(), bytes2String(ownerKey),
// bytes2String(col.name));
assert entry.belongToMe(col) : entry;
session.delete(this.table(), ownerKey, col.name);
}
}
}
@Override
public void append(Session session, BackendEntry entry) {
assert entry.columns().size() == 1;
this.insert(session, entry);
}
@Override
public void eliminate(Session session, BackendEntry entry) {
assert entry.columns().size() == 1;
this.delete(session, entry);
}
@Override
public boolean queryExist(Session session, BackendEntry entry) {
Id id = entry.id();
try (BackendColumnIterator iter = this.queryById(session, id)) {
return iter.hasNext();
}
}
@Override
public Number queryNumber(Session session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
if (aggregate.func() != AggregateFunc.COUNT) {
throw new NotSupportException(aggregate.toString());
}
assert aggregate.func() == AggregateFunc.COUNT;
assert query.noLimit();
Iterator<BackendColumn> results = this.queryBy(session, query);
if (results instanceof Countable) {
return ((Countable) results).count();
}
return IteratorUtils.count(results);
}
@Override
public Iterator<BackendEntry> query(Session session, Query query) {
if (query.limit() == 0L && !query.noLimit()) {
// LOG.debug("Return empty result(limit=0) for query {}", query);
return Collections.emptyIterator();
}
return newEntryIterator(this.queryBy(session, query), query);
}
@Override
public Iterator<BackendEntry> queryOlap(Session session, Query query) {
if (query.limit() == 0L && !query.noLimit()) {
// LOG.debug("Return empty result(limit=0) for query {}", query);
return Collections.emptyIterator();
}
return newEntryIteratorOlap(this.queryBy(session, query), query, true);
}
public List<Iterator<BackendEntry>> query(Session session,
List<IdPrefixQuery> queries,
String tableName) {
List<BackendColumnIterator> queryByPrefixList =
this.queryByPrefixList(session, queries, tableName);
LinkedList<Iterator<BackendEntry>> iterators = new LinkedList<>();
for (int i = 0; i < queryByPrefixList.size(); i++) {
IdPrefixQuery q = queries.get(i).copy();
q.capacity(Query.NO_CAPACITY);
q.limit(Query.NO_LIMIT);
BackendEntryIterator iterator =
newEntryIterator(queryByPrefixList.get(i), q);
iterators.add(iterator);
}
return iterators;
}
public BackendEntry.BackendIterator<Iterator<BackendEntry>> query(Session session,
Iterator<IdPrefixQuery> queries,
String tableName) {
final IdPrefixQuery[] first = {queries.next()};
int type = first[0].withProperties() ? 0 : Session.SCAN_KEY_ONLY;
IdPrefixQuery queryTmpl = first[0].copy();
queryTmpl.capacity(Query.NO_CAPACITY);
queryTmpl.limit(Query.NO_LIMIT);
ConditionQuery originQuery = (ConditionQuery) first[0].originQuery();
if (originQuery != null) {
originQuery = prepareConditionQueryList(originQuery);
}
byte[] queryBytes = originQuery == null ? null : originQuery.bytes();
BackendEntry.BackendIterator<BackendColumnIterator> it
= session.scan(tableName, new Iterator<HgOwnerKey>() {
@Override
public boolean hasNext() {
if (first[0] != null) {
return true;
}
return queries.hasNext();
}
@Override
public HgOwnerKey next() {
IdPrefixQuery query = first[0] != null ? first[0] : queries.next();
first[0] = null;
byte[] prefix = ownerByQueryDelegate.apply(query.resultType(),
query.prefix());
return HgOwnerKey.of(prefix, query.prefix().asBytes());
}
}, type, first[0], queryBytes);
return new BackendEntry.BackendIterator<Iterator<BackendEntry>>() {
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Iterator<BackendEntry> next() {
BackendEntryIterator iterator = newEntryIterator(it.next(), queryTmpl);
return iterator;
}
@Override
public void close() {
it.close();
}
@Override
public byte[] position() {
return new byte[0];
}
};
}
protected BackendColumnIterator queryBy(Session session, Query query) {
// Query all
if (query.empty()) {
return this.queryAll(session, query);
}
// Query by prefix
if (query instanceof IdPrefixQuery) {
IdPrefixQuery pq = (IdPrefixQuery) query;
return this.queryByPrefix(session, pq);
}
// Query by range
if (query instanceof IdRangeQuery) {
IdRangeQuery rq = (IdRangeQuery) query;
return this.queryByRange(session, rq);
}
// Query by id
if (query.conditions().isEmpty()) {
assert !query.ids().isEmpty();
// 单个 id 查询 走 get 接口查询
if (query.ids().size() == 1) {
return this.getById(session, query.ids().iterator().next());
}
// NOTE: this will lead to lazy create rocksdb iterator
LinkedList<HgOwnerKey> hgOwnerKeys = new LinkedList<>();
for (Id id : query.ids()) {
hgOwnerKeys.add(HgOwnerKey.of(this.ownerByIdDelegate.apply(id),
id.asBytes()));
}
BackendColumnIterator withBatch = session.getWithBatch(this.table(),
hgOwnerKeys);
return BackendColumnIterator.wrap(withBatch);
}
// Query by condition (or condition + id)
ConditionQuery cq = (ConditionQuery) query;
return this.queryByCond(session, cq);
}
protected BackendColumnIterator queryAll(Session session, Query query) {
if (query.paging()) {
PageState page = PageState.fromString(query.page());
byte[] ownerKey = this.getOwnerScanDelegate().get();
int scanType = Session.SCAN_ANY |
(query.withProperties() ? 0 : Session.SCAN_KEY_ONLY);
byte[] queryBytes = query instanceof ConditionQuery ?
((ConditionQuery) query).bytes() : null;
// LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " +
// "keyFrom: null, keyTo: null, scanType: {}, " +
// "conditionQuery: {}, position: {}",
// this.table(), bytes2String(ownerKey),
// bytes2String(ownerKey), scanType,
// queryBytes, page.position());
return session.scan(this.table(), ownerKey, ownerKey, null,
null, scanType, queryBytes,
page.position());
}
return session.scan(this.table(),
query instanceof ConditionQuery ?
((ConditionQuery) query).bytes() : null);
}
protected BackendColumnIterator queryById(Session session, Id id) {
// TODO: change to get() after vertex and schema don't use id prefix
return session.scan(this.table(), this.ownerByIdDelegate.apply(id),
id.asBytes());
}
protected BackendColumnIterator getById(Session session, Id id) {
byte[] value = session.get(this.table(),
this.ownerByIdDelegate.apply(id),
id.asBytes());
if (value.length == 0) {
return BackendColumnIterator.empty();
}
BackendColumn col = BackendColumn.of(id.asBytes(), value);
return BackendColumnIterator.iterator(col);
}
protected BackendColumnIterator queryByPrefix(Session session,
IdPrefixQuery query) {
int type = query.inclusiveStart() ?
Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN;
type |= Session.SCAN_PREFIX_END;
byte[] position = null;
if (query.paging()) {
position = PageState.fromString(query.page()).position();
}
ConditionQuery originQuery = (ConditionQuery) query.originQuery();
if (originQuery != null) {
originQuery = prepareConditionQuery(originQuery);
}
byte[] ownerKeyFrom = this.ownerByQueryDelegate.apply(query.resultType(),
query.start());
byte[] ownerKeyTo = this.ownerByQueryDelegate.apply(query.resultType(),
query.prefix());
byte[] keyFrom = query.start().asBytes();
// 前缀分页查询中,start 为最初的位置。因为在不同的分区 都是从 start 位置开始查询
if (query.paging()) {
keyFrom = query.prefix().asBytes();
}
byte[] keyTo = query.prefix().asBytes();
byte[] queryBytes = originQuery == null ?
null :
originQuery.bytes();
// LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}," +
// "keyFrom: {}, keyTo: {}, scanType: {}, conditionQuery: {}," +
// "position: {}",
// this.table(), bytes2String(ownerKeyFrom),
// bytes2String(ownerKeyTo), bytes2String(keyFrom),
// bytes2String(keyTo), type, originQuery, position);
return session.scan(this.table(), ownerKeyFrom, ownerKeyTo, keyFrom,
keyTo, type, queryBytes, position);
}
protected List<BackendColumnIterator> queryByPrefixList(
Session session,
List<IdPrefixQuery> queries,
String tableName) {
E.checkArgument(queries.size() > 0,
"The size of queries must be greater than zero");
IdPrefixQuery query = queries.get(0);
int type = 0;
LinkedList<HgOwnerKey> ownerKey = new LinkedList<>();
queries.forEach((item) -> {
byte[] prefix = this.ownerByQueryDelegate.apply(item.resultType(),
item.prefix());
ownerKey.add(HgOwnerKey.of(prefix, item.prefix().asBytes()));
});
ConditionQuery originQuery = (ConditionQuery) query.originQuery();
if (originQuery != null) {
originQuery = prepareConditionQueryList(originQuery);
}
byte[] queryBytes = originQuery == null ? null : originQuery.bytes();
// LOG.debug("query {} with scanType: {}, limit: {}, conditionQuery:
// {}", this.table(), type, query.limit(), queryBytes);
return session.scan(tableName, ownerKey, type,
query.limit(), queryBytes);
}
/***
* Prepare ConditionQuery to do operator sinking, because some scenes do not need to be
* preserved
* @param conditionQuery
* @return
*/
private ConditionQuery prepareConditionQuery(ConditionQuery conditionQuery) {
if (CollectionUtils.isEmpty(conditionQuery.userpropConditions())) {
return null;
}
// only userpropConditions can send to store
Collection<Condition> conditions = conditionQuery.conditions();
List<Condition> newConditions = new ArrayList<>();
for (Condition condition : conditions) {
if (!onlyOwnerVertex(condition)) {
newConditions.add(condition);
}
}
if (newConditions.size() > 0) {
conditionQuery.resetConditions(newConditions);
return conditionQuery;
} else {
return null;
}
}
/***
* Prepare ConditionQuery to do operator sinking, because some scenes do not need to be
* preserved
* @param conditionQuery
* @return
*/
private ConditionQuery prepareConditionQueryList(ConditionQuery conditionQuery) {
if (!conditionQuery.containsLabelOrUserpropRelation()) {
return null;
}
// only userpropConditions can send to store
Collection<Condition> conditions = conditionQuery.conditions();
List<Condition> newConditions = new ArrayList<>();
for (Condition condition : conditions) {
if (!onlyOwnerVertex(condition)) {
newConditions.add(condition);
}
}
if (newConditions.size() > 0) {
conditionQuery.resetConditions(newConditions);
return conditionQuery;
} else {
return null;
}
}
private boolean onlyOwnerVertex(Condition condition) {
boolean onlyOwnerVertex = true;
List<? extends Relation> relations = condition.relations();
for (Relation r : relations) {
if (!r.key().equals(HugeKeys.OWNER_VERTEX)) {
onlyOwnerVertex = false;
break;
}
}
return onlyOwnerVertex;
}
protected BackendColumnIterator queryByRange(Session session,
IdRangeQuery query) {
byte[] start = query.start().asBytes();
byte[] end = query.end() == null ? null : query.end().asBytes();
int type = query.inclusiveStart() ?
Session.SCAN_GTE_BEGIN : Session.SCAN_GT_BEGIN;
if (end != null) {
type |= query.inclusiveEnd() ?
Session.SCAN_LTE_END : Session.SCAN_LT_END;
}
ConditionQuery cq;
Query origin = query.originQuery();
byte[] position = null;
if (query.paging() && !query.page().isEmpty()) {
position = PageState.fromString(query.page()).position();
}
byte[] ownerStart = this.ownerByQueryDelegate.apply(query.resultType(),
query.start());
byte[] ownerEnd = this.ownerByQueryDelegate.apply(query.resultType(),
query.end());
if (origin instanceof ConditionQuery &&
(query.resultType().isEdge() || query.resultType().isVertex())) {
cq = (ConditionQuery) query.originQuery();
// LOG.debug("query {} with ownerKeyFrom: {}, ownerKeyTo: {}, " +
// "keyFrom: {}, keyTo: {}, " +
// "scanType: {}, conditionQuery: {}",
// this.table(), bytes2String(ownerStart),
// bytes2String(ownerEnd), bytes2String(start),
// bytes2String(end), type, cq.bytes());
return session.scan(this.table(), ownerStart,
ownerEnd, start, end, type, cq.bytes(), position);
}
return session.scan(this.table(), ownerStart,
ownerEnd, start, end, type, null, position);
}
protected BackendColumnIterator queryByCond(Session session,
ConditionQuery query) {
if (query.containsScanCondition()) {
E.checkArgument(query.relations().size() == 1,
"Invalid scan with multi conditions: %s", query);
Relation scan = query.relations().iterator().next();
Shard shard = (Shard) scan.value();
return this.queryByRange(session, shard, query);
}
// throw new NotSupportException("query: %s", query);
return this.queryAll(session, query);
}
protected BackendColumnIterator queryByRange(Session session, Shard shard,
ConditionQuery query) {
int type = Session.SCAN_GTE_BEGIN;
type |= Session.SCAN_LT_END;
type |= Session.SCAN_HASHCODE;
type |= query.withProperties() ? 0 : Session.SCAN_KEY_ONLY;
int start = Integer.parseInt(StringUtils.isEmpty(shard.start()) ?
"0" : shard.start());
int end = Integer.parseInt(StringUtils.isEmpty(shard.end()) ?
"0" : shard.end());
byte[] queryBytes = query.bytes();
String page = query.page();
if (page != null && !page.isEmpty()) {
byte[] position = PageState.fromString(page).position();
return session.scan(this.table(), start, end, type, queryBytes,
position);
}
return session.scan(this.table(), start, end, type, queryBytes);
}
private static class HstoreShardSplitter extends ShardSplitter<Session> {
public HstoreShardSplitter(String table) {
super(table);
}
@Override
public List<Shard> getSplits(Session session, long splitSize) {
E.checkArgument(splitSize >= MIN_SHARD_SIZE,
"The split-size must be >= %s bytes, but got %s",
MIN_SHARD_SIZE, splitSize);
List<Shard> splits = new ArrayList<>();
try {
PDClient pdClient = HstoreSessionsImpl.getDefaultPdClient();
List<Metapb.Partition> partitions = pdClient.getPartitions(0,
session.getGraphName());
for (Metapb.Partition partition : partitions) {
String start = String.valueOf(partition.getStartKey());
String end = String.valueOf(partition.getEndKey());
splits.add(new Shard(start, end, 0));
}
} catch (PDException e) {
e.printStackTrace();
}
return splits.size() != 0 ?
splits : super.getSplits(session, splitSize);
}
@Override
public long estimateDataSize(Session session) {
return 1L;
}
@Override
public long estimateNumKeys(Session session) {
return 1L;
}
@Override
public byte[] position(String position) {
if (END.equals(position)) {
return null;
}
return StringEncoding.decodeBase64(position);
}
}
}