blob: 603ef3a9361c6b1d1411507e956b2661a315a00e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.hstore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.query.IdPrefixQuery;
import org.apache.hugegraph.backend.query.IdQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.serializer.MergeIterator;
import org.apache.hugegraph.backend.store.AbstractBackendStore;
import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.BackendTable;
import org.apache.hugegraph.backend.store.hstore.HstoreSessions.Session;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.iterator.CIter;
import org.apache.hugegraph.type.HugeTableType;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
public abstract class HstoreStore extends AbstractBackendStore<Session> {
private static final Logger LOG = Log.logger(HstoreStore.class);
private static final Set<HugeType> INDEX_TYPES = ImmutableSet.of(
HugeType.SECONDARY_INDEX, HugeType.VERTEX_LABEL_INDEX,
HugeType.EDGE_LABEL_INDEX, HugeType.RANGE_INT_INDEX,
HugeType.RANGE_FLOAT_INDEX, HugeType.RANGE_LONG_INDEX,
HugeType.RANGE_DOUBLE_INDEX, HugeType.SEARCH_INDEX,
HugeType.SHARD_INDEX, HugeType.UNIQUE_INDEX
);
private static final BackendFeatures FEATURES = new HstoreFeatures();
private final String store, namespace;
private final BackendStoreProvider provider;
private final Map<Integer, HstoreTable> tables;
private final ReadWriteLock storeLock;
private boolean isGraphStore;
private HstoreSessions sessions;
public HstoreStore(final BackendStoreProvider provider,
String namespace, String store) {
this.tables = new HashMap<>();
this.provider = provider;
this.namespace = namespace;
this.store = store;
this.sessions = null;
this.storeLock = new ReentrantReadWriteLock();
this.registerMetaHandlers();
LOG.debug("Store loaded: {}", store);
}
private void registerMetaHandlers() {
Supplier<List<HstoreSessions>> dbsGet = () -> {
List<HstoreSessions> dbs = new ArrayList<>();
dbs.add(this.sessions);
return dbs;
};
this.registerMetaHandler("metrics", (session, meta, args) -> {
HstoreMetrics metrics = new HstoreMetrics(dbsGet.get(), session);
return metrics.metrics();
});
this.registerMetaHandler("mode", (session, meta, args) -> {
E.checkArgument(args.length == 1,
"The args count of %s must be 1", meta);
session.setMode((GraphMode) args[0]);
return null;
});
}
protected void registerTableManager(HugeTableType type, HstoreTable table) {
this.tables.put((int) type.code(), table);
}
@Override
protected final HstoreTable table(HugeType type) {
assert type != null;
HugeTableType table;
switch (type) {
case VERTEX:
table = HugeTableType.VERTEX;
break;
case EDGE_OUT:
table = HugeTableType.OUT_EDGE;
break;
case EDGE_IN:
table = HugeTableType.IN_EDGE;
break;
case OLAP:
table = HugeTableType.OLAP_TABLE;
break;
case TASK:
table = HugeTableType.TASK_INFO_TABLE;
break;
case SERVER:
table = HugeTableType.SERVER_INFO_TABLE;
break;
case SEARCH_INDEX:
case SHARD_INDEX:
case SECONDARY_INDEX:
case RANGE_INT_INDEX:
case RANGE_LONG_INDEX:
case RANGE_FLOAT_INDEX:
case RANGE_DOUBLE_INDEX:
case EDGE_LABEL_INDEX:
case VERTEX_LABEL_INDEX:
case UNIQUE_INDEX:
table = HugeTableType.ALL_INDEX_TABLE;
break;
default:
throw new AssertionError(String.format(
"Invalid type: %s", type));
}
return this.tables.get((int) table.code());
}
protected List<String> tableNames() {
return this.tables.values().stream()
.map(BackendTable::table)
.collect(Collectors.toList());
}
@Override
protected Session session(HugeType type) {
this.checkOpened();
return this.sessions.session();
}
public String namespace() {
return this.namespace;
}
@Override
public String store() {
return this.store;
}
@Override
public String database() {
return this.namespace;
}
@Override
public BackendStoreProvider provider() {
return this.provider;
}
@Override
public BackendFeatures features() {
return FEATURES;
}
@Override
public synchronized void open(HugeConfig config) {
E.checkNotNull(config, "config");
if (this.sessions == null) {
this.sessions = new HstoreSessionsImpl(config, this.namespace,
this.store);
}
String graphStore = config.get(CoreOptions.STORE_GRAPH);
this.isGraphStore = this.store.equals(graphStore);
assert this.sessions != null;
if (!this.sessions.closed()) {
LOG.debug("Store {} has been opened before", this.store);
this.sessions.useSession();
return;
}
try {
// NOTE: won't throw error even if connection refused
this.sessions.open();
} catch (Exception e) {
LOG.error("Failed to open Hstore '{}':{}", this.store, e);
}
this.sessions.session();
LOG.debug("Store opened: {}", this.store);
}
@Override
public void close() {
this.checkOpened();
this.sessions.close();
LOG.debug("Store closed: {}", this.store);
}
@Override
public boolean opened() {
this.checkConnectionOpened();
return this.sessions.session().opened();
}
@Override
public void mutate(BackendMutation mutation) {
Session session = this.sessions.session();
assert session.opened();
Map<HugeType, Map<Id, List<BackendAction>>> mutations = mutation.mutations();
Set<Map.Entry<HugeType, Map<Id, List<BackendAction>>>> entries = mutations.entrySet();
for (Map.Entry<HugeType, Map<Id, List<BackendAction>>> entry : entries) {
HugeType key = entry.getKey();
// in order to obtain the owner efficiently, special for edge
boolean isEdge = key.isEdge();
HstoreTable hTable = this.table(key);
Map<Id, List<BackendAction>> table = entry.getValue();
Collection<List<BackendAction>> values = table.values();
for (List<BackendAction> items : values) {
for (int i = 0; i < items.size(); i++) {
BackendAction item = items.get(i);
// set to ArrayList, use index to get item
this.mutate(session, item, hTable, isEdge);
}
}
}
}
private void mutate(Session session, BackendAction item,
HstoreTable hTable, boolean isEdge) {
BackendEntry entry = item.entry();
HstoreTable table;
if (!entry.olap()) {
// Oltp table
table = hTable;
} else {
if (entry.type().isIndex()) {
// Olap index
table = this.table(entry.type());
} else {
// Olap vertex
table = this.table(HugeType.OLAP);
}
session = this.session(HugeType.OLAP);
}
if (item.action().code() == Action.INSERT.code()) {
table.insert(session, entry, isEdge);
} else {
if (item.action().code() == Action.APPEND.code()) {
table.append(session, entry);
} else {
switch (item.action()) {
case DELETE:
table.delete(session, entry);
break;
case ELIMINATE:
table.eliminate(session, entry);
break;
case UPDATE_IF_PRESENT:
table.updateIfPresent(session, entry);
break;
case UPDATE_IF_ABSENT:
table.updateIfAbsent(session, entry);
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s",
item.action()));
}
}
}
}
private HstoreTable getTableByQuery(Query query) {
HugeType tableType = HstoreTable.tableType(query);
HstoreTable table;
if (query.olap()) {
if (query.resultType().isIndex()) {
// Any index type is ok here
table = this.table(HugeType.SECONDARY_INDEX);
} else {
table = this.table(HugeType.OLAP);
}
} else {
table = this.table(tableType);
}
return table;
}
@Override
public Iterator<BackendEntry> query(Query query) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
Session session = this.sessions.session();
HstoreTable table = getTableByQuery(query);
Iterator<BackendEntry> entries = table.query(session, query);
// Merge olap results as needed
entries = getBackendEntryIterator(entries, query);
return entries;
} finally {
readLock.unlock();
}
}
// TODO: uncomment later - sub edge labels
//@Override
//public Iterator<Iterator<BackendEntry>> query(Iterator<Query> queries,
// Function<Query, Query> queryWriter,
// HugeGraph hugeGraph) {
// if (queries == null || !queries.hasNext()) {
// return Collections.emptyIterator();
// }
//
// class QueryWrapper implements Iterator<IdPrefixQuery> {
// Query first;
// final Iterator<Query> queries;
// Iterator<Id> subEls;
// Query preQuery;
// Iterator<IdPrefixQuery> queryListIterator;
//
// QueryWrapper(Iterator<Query> queries, Query first) {
// this.queries = queries;
// this.first = first;
// }
//
// @Override
// public boolean hasNext() {
// return first != null || (this.subEls != null && this.subEls.hasNext())
// || (queryListIterator != null && queryListIterator.hasNext()) ||
// queries.hasNext();
// }
//
// @Override
// public IdPrefixQuery next() {
// if (queryListIterator != null && queryListIterator.hasNext()) {
// return queryListIterator.next();
// }
//
// Query q;
// if (first != null) {
// q = first;
// preQuery = q.copy();
// first = null;
// } else {
// if (this.subEls == null || !this.subEls.hasNext()) {
// q = queries.next();
// preQuery = q.copy();
// } else {
// q = preQuery.copy();
// }
// }
//
// assert q instanceof ConditionQuery;
// ConditionQuery cq = (ConditionQuery) q;
// ConditionQuery originQuery = (ConditionQuery) q.copy();
//
// List<IdPrefixQuery> queryList = Lists.newArrayList();
// if (hugeGraph != null) {
// for (ConditionQuery conditionQuery :
// ConditionQueryFlatten.flatten(cq)) {
// Id label = conditionQuery.condition(HugeKeys.LABEL);
// /* 父类型 + sortKeys: g.V("V.id").outE("parentLabel").has
// ("sortKey","value")转成 所有子类型 + sortKeys*/
// if ((this.subEls == null ||
// !this.subEls.hasNext()) && label != null &&
// hugeGraph.edgeLabel(label).isFather() &&
// conditionQuery.condition(HugeKeys.SUB_LABEL) ==
// null &&
// conditionQuery.condition(HugeKeys.OWNER_VERTEX) !=
// null &&
// conditionQuery.condition(HugeKeys.DIRECTION) !=
// null &&
// matchEdgeSortKeys(conditionQuery, false,
// hugeGraph)) {
// this.subEls =
// getSubLabelsOfParentEl(
// hugeGraph.edgeLabels(),
// label);
// }
//
// if (this.subEls != null &&
// this.subEls.hasNext()) {
// conditionQuery.eq(HugeKeys.SUB_LABEL,
// subEls.next());
// }
//
// HugeType hugeType = conditionQuery.resultType();
// if (hugeType != null && hugeType.isEdge() &&
// !conditionQuery.conditions().isEmpty()) {
// IdPrefixQuery idPrefixQuery =
// (IdPrefixQuery) queryWriter.apply(
// conditionQuery);
// idPrefixQuery.setOriginQuery(originQuery);
// queryList.add(idPrefixQuery);
// }
// }
//
// queryListIterator = queryList.iterator();
// if (queryListIterator.hasNext()) {
// return queryListIterator.next();
// }
// }
//
// Id ownerId = cq.condition(HugeKeys.OWNER_VERTEX);
// assert ownerId != null;
// BytesBuffer buffer =
// BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
// buffer.writeId(ownerId);
// return new IdPrefixQuery(cq, new BinaryBackendEntry.BinaryId(
// buffer.bytes(), ownerId));
// }
//
// private boolean matchEdgeSortKeys(ConditionQuery query,
// boolean matchAll,
// HugeGraph graph) {
// assert query.resultType().isEdge();
// Id label = query.condition(HugeKeys.LABEL);
// if (label == null) {
// return false;
// }
// List<Id> sortKeys = graph.edgeLabel(label).sortKeys();
// if (sortKeys.isEmpty()) {
// return false;
// }
// Set<Id> queryKeys = query.userpropKeys();
// for (int i = sortKeys.size(); i > 0; i--) {
// List<Id> subFields = sortKeys.subList(0, i);
// if (queryKeys.containsAll(subFields)) {
// if (queryKeys.size() == subFields.size() || !matchAll) {
// /*
// * Return true if:
// * matchAll=true and all queryKeys are in sortKeys
// * or
// * partial queryKeys are in sortKeys
// */
// return true;
// }
// }
// }
// return false;
// }
// }
// Query first = queries.next();
// List<HugeType> typeList = getHugeTypes(first);
// QueryWrapper idPrefixQueries = new QueryWrapper(queries, first);
//
// return query(typeList, idPrefixQueries);
//}
//private Iterator<Id> getSubLabelsOfParentEl(Collection<EdgeLabel> allEls,
// Id label) {
// List<Id> list = new ArrayList<>();
// for (EdgeLabel el : allEls) {
// if (el.edgeLabelType().sub() && el.fatherId().equals(label)) {
// list.add(el.id());
// }
// }
// return list.iterator();
//}
public List<CIter<BackendEntry>> query(List<HugeType> typeList,
List<IdPrefixQuery> queries) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
LinkedList<CIter<BackendEntry>> results = new LinkedList<>();
try {
this.checkOpened();
Session session = this.sessions.session();
E.checkState(!CollectionUtils.isEmpty(queries) &&
!CollectionUtils.isEmpty(typeList),
"Please check query list or type list.");
HstoreTable table = null;
StringBuilder builder = new StringBuilder();
for (HugeType type : typeList) {
builder.append((table = this.table(type)).table()).append(",");
}
List<Iterator<BackendEntry>> iteratorList =
table.query(session, queries,
builder.substring(0, builder.length() - 1));
for (int i = 0; i < iteratorList.size(); i++) {
Iterator<BackendEntry> entries = iteratorList.get(i);
// Merge olap results as needed
Query query = queries.get(i);
entries = getBackendEntryIterator(entries, query);
if (entries instanceof CIter) {
results.add((CIter) entries);
}
}
return results;
} finally {
readLock.unlock();
}
}
public Iterator<Iterator<BackendEntry>> query(List<HugeType> typeList,
Iterator<IdPrefixQuery> queries) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
Session session = this.sessions.session();
E.checkState(queries.hasNext() &&
!CollectionUtils.isEmpty(typeList),
"Please check query list or type list.");
HstoreTable table = null;
StringBuilder builder = new StringBuilder();
for (HugeType type : typeList) {
builder.append((table = this.table(type)).table()).append(",");
}
Iterator<Iterator<BackendEntry>> iterators =
table.query(session, queries,
builder.substring(0, builder.length() - 1));
return iterators;
} finally {
readLock.unlock();
}
}
private Iterator<BackendEntry> getBackendEntryIterator(
Iterator<BackendEntry> entries,
Query query) {
HstoreTable table;
Set<Id> olapPks = query.olapPks();
if (this.isGraphStore && !olapPks.isEmpty()) {
List<Iterator<BackendEntry>> iterators = new ArrayList<>();
for (Id pk : olapPks) {
// 构造olap表查询query condition
Query q = this.constructOlapQueryCondition(pk, query);
table = this.table(HugeType.OLAP);
iterators.add(table.queryOlap(this.session(HugeType.OLAP), q));
}
entries = new MergeIterator<>(entries, iterators,
BackendEntry::mergeable);
}
return entries;
}
/**
* 重新构造 查询olap表 query
* 由于 olap合并成一张表, 在写入olap数据, key在后面增加了pk
* 所以在此进行查询的时候,需要重新构造pk前缀
* 写入参考 BinarySerializer.writeOlapVertex
*
* @param pk
* @param query
* @return
*/
private Query constructOlapQueryCondition(Id pk, Query query) {
if (query instanceof IdQuery && !CollectionUtils.isEmpty((query).ids())) {
IdQuery q = (IdQuery) query.copy();
Iterator<Id> iterator = q.ids().iterator();
LinkedHashSet<Id> linkedHashSet = new LinkedHashSet<>();
while (iterator.hasNext()) {
Id id = iterator.next();
if (id instanceof BinaryBackendEntry.BinaryId) {
id = ((BinaryBackendEntry.BinaryId) id).origin();
}
// create binary id
BytesBuffer buffer =
BytesBuffer.allocate(1 + pk.length() + 1 + id.length());
buffer.writeId(pk);
id = new BinaryBackendEntry.BinaryId(
buffer.writeId(id).bytes(), id);
linkedHashSet.add(id);
}
q.resetIds();
q.query(linkedHashSet);
return q;
} else {
// create binary id
BytesBuffer buffer = BytesBuffer.allocate(1 + pk.length());
pk = new BinaryBackendEntry.BinaryId(
buffer.writeId(pk).bytes(), pk);
IdPrefixQuery idPrefixQuery = new IdPrefixQuery(HugeType.OLAP, pk);
return idPrefixQuery;
}
}
@Override
public Number queryNumber(Query query) {
this.checkOpened();
Session session = this.sessions.session();
HstoreTable table = this.table(HstoreTable.tableType(query));
return table.queryNumber(session, query);
}
@Override
public synchronized void init() {
Lock writeLock = this.storeLock.writeLock();
writeLock.lock();
try {
// Create tables with main disk
this.sessions.createTable(this.tableNames().toArray(new String[0]));
LOG.debug("Store initialized: {}", this.store);
} finally {
writeLock.unlock();
}
}
@Override
public void clear(boolean clearSpace) {
Lock writeLock = this.storeLock.writeLock();
writeLock.lock();
try {
// Drop tables with main disk
this.sessions.dropTable(this.tableNames().toArray(new String[0]));
if (clearSpace) {
this.sessions.clear();
}
LOG.debug("Store cleared: {}", this.store);
} finally {
writeLock.unlock();
}
}
@Override
public boolean initialized() {
return true;
}
@Override
public void truncate() {
try {
this.sessions.session().truncate();
} catch (Exception e) {
LOG.error("Store truncated failed", e);
return;
}
LOG.debug("Store truncated: {}", this.store);
}
@Override
public void beginTx() {
this.sessions.session().beginTx();
}
@Override
public void commitTx() {
this.checkOpened();
Session session = this.sessions.session();
session.commit();
}
@Override
public void rollbackTx() {
this.checkOpened();
Session session = this.sessions.session();
session.rollback();
}
private void checkConnectionOpened() {
}
@Override
public Id nextId(HugeType type) {
long counter = 0L;
counter = this.getCounter(type);
E.checkState(counter != 0L, "Please check whether '%s' is OK",
this.provider().type());
return IdGenerator.of(counter);
}
@Override
public void setCounterLowest(HugeType type, long lowest) {
this.increaseCounter(type, lowest);
}
@Override
public String storedVersion() {
return "1.13";
}
/***************************** Store defines *****************************/
public static class HstoreSchemaStore extends HstoreStore {
public HstoreSchemaStore(BackendStoreProvider provider, String namespace, String store) {
super(provider, namespace, store);
}
@Override
public boolean isSchemaStore() {
return true;
}
@Override
public void increaseCounter(HugeType type, long num) {
throw new UnsupportedOperationException(
"HstoreSchemaStore.increaseCounter()");
}
@Override
public long getCounter(HugeType type) {
throw new UnsupportedOperationException(
"HstoreSchemaStore.getCounter()");
}
}
public static class HstoreGraphStore extends HstoreStore {
public HstoreGraphStore(BackendStoreProvider provider,
String namespace, String store) {
super(provider, namespace, store);
registerTableManager(HugeTableType.VERTEX,
new HstoreTables.Vertex(store));
registerTableManager(HugeTableType.OUT_EDGE,
HstoreTables.Edge.out(store));
registerTableManager(HugeTableType.IN_EDGE,
HstoreTables.Edge.in(store));
registerTableManager(HugeTableType.ALL_INDEX_TABLE,
new HstoreTables.IndexTable(store));
registerTableManager(HugeTableType.OLAP_TABLE,
new HstoreTables.OlapTable(store));
registerTableManager(HugeTableType.TASK_INFO_TABLE,
new HstoreTables.TaskInfo(store));
registerTableManager(HugeTableType.SERVER_INFO_TABLE,
new HstoreTables.ServerInfo(store));
}
@Override
public boolean isSchemaStore() {
return false;
}
@Override
public Id nextId(HugeType type) {
throw new UnsupportedOperationException(
"HstoreGraphStore.nextId()");
}
@Override
public void increaseCounter(HugeType type, long num) {
throw new UnsupportedOperationException(
"HstoreGraphStore.increaseCounter()");
}
@Override
public long getCounter(HugeType type) {
throw new UnsupportedOperationException(
"HstoreGraphStore.getCounter()");
}
@Override
public void createOlapTable(Id pkId) {
HstoreTable table = new HstoreTables.OlapTable(this.store());
LOG.info("Hstore create olap table {}", table.table());
super.sessions.createTable(table.table());
LOG.info("Hstore finish create olap table");
registerTableManager(HugeTableType.OLAP_TABLE, table);
LOG.info("OLAP table {} has been created", table.table());
}
@Override
public void checkAndRegisterOlapTable(Id pkId) {
HstoreTable table = new HstoreTables.OlapTable(this.store());
if (!super.sessions.existsTable(table.table())) {
LOG.error("Found exception: Table '{}' doesn't exist, we'll " +
"recreate it now. Please carefully check the recent" +
"operation in server and computer, then ensure the " +
"integrity of store file.", table.table());
this.createOlapTable(pkId);
} else {
registerTableManager(HugeTableType.OLAP_TABLE, table);
}
}
@Override
public void clearOlapTable(Id pkId) {
}
@Override
public void removeOlapTable(Id pkId) {
}
@Override
public boolean existOlapTable(Id pkId) {
String tableName = this.olapTableName(pkId);
return super.sessions.existsTable(tableName);
}
}
}