blob: 0574318d74abaadb3b7fa536b7d4c7f1244fad58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.tx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.SplicingIdGenerator;
import org.apache.hugegraph.backend.page.IdHolderList;
import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.page.QueryList;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.ConditionQuery.OptimizedType;
import org.apache.hugegraph.backend.query.ConditionQueryFlatten;
import org.apache.hugegraph.backend.query.IdQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.exception.LimitExceedException;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.iterator.BatchMapperIterator;
import org.apache.hugegraph.iterator.ExtendableIterator;
import org.apache.hugegraph.iterator.FilterIterator;
import org.apache.hugegraph.iterator.FlatMapperIterator;
import org.apache.hugegraph.iterator.LimitIterator;
import org.apache.hugegraph.iterator.ListIterator;
import org.apache.hugegraph.iterator.MapperIterator;
import org.apache.hugegraph.job.system.DeleteExpiredJob;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.SchemaLabel;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeEdgeProperty;
import org.apache.hugegraph.structure.HugeElement;
import org.apache.hugegraph.structure.HugeFeatures.HugeVertexFeatures;
import org.apache.hugegraph.structure.HugeIndex;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.structure.HugeVertexProperty;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.type.define.IdStrategy;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.LockUtil;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import com.google.common.collect.ImmutableList;
import jakarta.ws.rs.ForbiddenException;
public class GraphTransaction extends IndexableTransaction {
public static final int COMMIT_BATCH = (int) Query.COMMIT_BATCH;
private final GraphIndexTransaction indexTx;
private Map<Id, HugeVertex> addedVertices;
private Map<Id, HugeVertex> removedVertices;
private Map<Id, HugeEdge> addedEdges;
private Map<Id, HugeEdge> removedEdges;
private Set<HugeProperty<?>> addedProps;
private Set<HugeProperty<?>> removedProps;
// These are used to rollback state
private Map<Id, HugeVertex> updatedVertices;
private Map<Id, HugeEdge> updatedEdges;
private Set<HugeProperty<?>> updatedOldestProps; // Oldest props
private final LockUtil.LocksTable locksTable;
private final boolean checkCustomVertexExist;
private final boolean checkAdjacentVertexExist;
private final boolean lazyLoadAdjacentVertex;
private final boolean removeLeftIndexOnOverwrite;
private final boolean ignoreInvalidEntry;
private final boolean optimizeAggrByIndex;
private final int commitPartOfAdjacentEdges;
private final int batchSize;
private final int pageSize;
private final int verticesCapacity;
private final int edgesCapacity;
public GraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
this.indexTx = new GraphIndexTransaction(graph, store);
assert !this.indexTx.autoCommit();
this.locksTable = new LockUtil.LocksTable(graph.name());
final HugeConfig conf = graph.configuration();
this.checkCustomVertexExist =
conf.get(CoreOptions.VERTEX_CHECK_CUSTOMIZED_ID_EXIST);
this.checkAdjacentVertexExist =
conf.get(CoreOptions.VERTEX_ADJACENT_VERTEX_EXIST);
this.lazyLoadAdjacentVertex =
conf.get(CoreOptions.VERTEX_ADJACENT_VERTEX_LAZY);
this.removeLeftIndexOnOverwrite =
conf.get(CoreOptions.VERTEX_REMOVE_LEFT_INDEX);
this.commitPartOfAdjacentEdges =
conf.get(CoreOptions.VERTEX_PART_EDGE_COMMIT_SIZE);
this.ignoreInvalidEntry =
conf.get(CoreOptions.QUERY_IGNORE_INVALID_DATA);
this.optimizeAggrByIndex =
conf.get(CoreOptions.QUERY_OPTIMIZE_AGGR_BY_INDEX);
this.batchSize = conf.get(CoreOptions.QUERY_BATCH_SIZE);
this.pageSize = conf.get(CoreOptions.QUERY_PAGE_SIZE);
this.verticesCapacity = conf.get(CoreOptions.VERTEX_TX_CAPACITY);
this.edgesCapacity = conf.get(CoreOptions.EDGE_TX_CAPACITY);
E.checkArgument(this.commitPartOfAdjacentEdges < this.edgesCapacity,
"Option value of %s(%s) must be < %s(%s)",
CoreOptions.VERTEX_PART_EDGE_COMMIT_SIZE.name(),
this.commitPartOfAdjacentEdges,
CoreOptions.EDGE_TX_CAPACITY.name(),
this.edgesCapacity);
}
@Override
public boolean hasUpdate() {
return this.mutationSize() > 0 || super.hasUpdate();
}
@Override
public boolean hasUpdate(HugeType type, Action action) {
if (type.isVertex()) {
if (action == Action.DELETE) {
if (this.removedVertices.size() > 0) {
return true;
}
} else {
if (this.addedVertices.size() > 0 ||
this.updatedVertices.size() > 0) {
return true;
}
}
} else if (type.isEdge()) {
if (action == Action.DELETE) {
if (this.removedEdges.size() > 0) {
return true;
}
} else {
if (this.addedEdges.size() > 0 ||
this.updatedEdges.size() > 0) {
return true;
}
}
}
return super.hasUpdate(type, action);
}
@Override
public int mutationSize() {
return this.verticesInTxSize() + this.edgesInTxSize();
}
public boolean checkAdjacentVertexExist() {
return this.checkAdjacentVertexExist;
}
@Override
protected void reset() {
super.reset();
// Clear mutation
if (this.addedVertices == null || !this.addedVertices.isEmpty()) {
this.addedVertices = InsertionOrderUtil.newMap();
}
if (this.removedVertices == null || !this.removedVertices.isEmpty()) {
this.removedVertices = InsertionOrderUtil.newMap();
}
if (this.updatedVertices == null || !this.updatedVertices.isEmpty()) {
this.updatedVertices = InsertionOrderUtil.newMap();
}
if (this.addedEdges == null || !this.addedEdges.isEmpty()) {
this.addedEdges = InsertionOrderUtil.newMap();
}
if (this.removedEdges == null || !this.removedEdges.isEmpty()) {
this.removedEdges = InsertionOrderUtil.newMap();
}
if (this.updatedEdges == null || !this.updatedEdges.isEmpty()) {
this.updatedEdges = InsertionOrderUtil.newMap();
}
if (this.addedProps == null || !this.addedProps.isEmpty()) {
this.addedProps = InsertionOrderUtil.newSet();
}
if (this.removedProps == null || !this.removedProps.isEmpty()) {
this.removedProps = InsertionOrderUtil.newSet();
}
if (this.updatedOldestProps == null ||
!this.updatedOldestProps.isEmpty()) {
this.updatedOldestProps = InsertionOrderUtil.newSet();
}
}
@Override
protected GraphIndexTransaction indexTransaction() {
return this.indexTx;
}
@Override
protected void beforeWrite() {
this.checkTxVerticesCapacity();
this.checkTxEdgesCapacity();
super.beforeWrite();
}
protected final int verticesInTxSize() {
return this.addedVertices.size() +
this.removedVertices.size() +
this.updatedVertices.size();
}
protected final int edgesInTxSize() {
return this.addedEdges.size() +
this.removedEdges.size() +
this.updatedEdges.size();
}
protected final Collection<HugeVertex> verticesInTxUpdated() {
int size = this.addedVertices.size() + this.updatedVertices.size();
List<HugeVertex> vertices = new ArrayList<>(size);
vertices.addAll(this.addedVertices.values());
vertices.addAll(this.updatedVertices.values());
return vertices;
}
protected final Collection<HugeVertex> verticesInTxRemoved() {
return new ArrayList<>(this.removedVertices.values());
}
protected final boolean removingEdgeOwner(HugeEdge edge) {
for (HugeVertex vertex : this.removedVertices.values()) {
if (edge.belongToVertex(vertex)) {
return true;
}
}
return false;
}
@Watched(prefix = "tx")
@Override
protected BackendMutation prepareCommit() {
// Serialize and add updates into super.deletions
if (this.removedVertices.size() > 0 || this.removedEdges.size() > 0) {
this.prepareDeletions(this.removedVertices, this.removedEdges);
}
if (this.addedProps.size() > 0 || this.removedProps.size() > 0) {
this.prepareUpdates(this.addedProps, this.removedProps);
}
// Serialize and add updates into super.additions
if (this.addedVertices.size() > 0 || this.addedEdges.size() > 0) {
this.prepareAdditions(this.addedVertices, this.addedEdges);
}
return this.mutation();
}
protected void prepareAdditions(Map<Id, HugeVertex> addedVertices,
Map<Id, HugeEdge> addedEdges) {
if (this.checkCustomVertexExist) {
this.checkVertexExistIfCustomizedId(addedVertices);
}
if (this.removeLeftIndexOnOverwrite) {
this.removeLeftIndexIfNeeded(addedVertices);
}
// Do vertex update
for (HugeVertex v : addedVertices.values()) {
assert !v.removed();
v.committed();
this.checkAggregateProperty(v);
// Check whether passed all non-null properties
if (!this.graphMode().loading()) {
this.checkNonnullProperty(v);
}
// Add vertex entry
this.doInsert(this.serializer.writeVertex(v));
// Update index of vertex(only include props)
this.indexTx.updateVertexIndex(v, false);
this.indexTx.updateLabelIndex(v, false);
}
// Do edge update
for (HugeEdge e : addedEdges.values()) {
assert !e.removed();
e.committed();
// Skip edge if its owner has been removed
if (this.removingEdgeOwner(e)) {
continue;
}
this.checkAggregateProperty(e);
// Add edge entry of OUT and IN
this.doInsert(this.serializer.writeEdge(e));
this.doInsert(this.serializer.writeEdge(e.switchOwner()));
// Update index of edge
this.indexTx.updateEdgeIndex(e, false);
this.indexTx.updateLabelIndex(e, false);
}
}
protected void prepareDeletions(Map<Id, HugeVertex> removedVertices,
Map<Id, HugeEdge> removedEdges) {
// Remove related edges of each vertex
for (HugeVertex v : removedVertices.values()) {
if (!v.schemaLabel().existsLinkLabel()) {
continue;
}
// Query all edges of the vertex and remove them
Query query = constructEdgesQuery(v.id(), Directions.BOTH);
Iterator<HugeEdge> vedges = this.queryEdgesFromBackend(query);
try {
while (vedges.hasNext()) {
this.checkTxEdgesCapacity();
HugeEdge edge = vedges.next();
// NOTE: will change the input parameter
removedEdges.put(edge.id(), edge);
// Commit first if enabled commit-part mode
if (this.commitPartOfAdjacentEdges > 0 &&
removedEdges.size() >= this.commitPartOfAdjacentEdges) {
this.commitPartOfEdgeDeletions(removedEdges);
}
}
} finally {
CloseableIterator.closeIterator(vedges);
}
}
// Remove vertices
for (HugeVertex v : removedVertices.values()) {
this.checkAggregateProperty(v);
/*
* If the backend stores vertex together with edges, it's edges
* would be removed after removing vertex. Otherwise, if the
* backend stores vertex which is separated from edges, it's
* edges should be removed manually when removing vertex.
*/
this.doRemove(this.serializer.writeVertex(v.prepareRemoved()));
this.indexTx.updateVertexIndex(v, true);
this.indexTx.updateLabelIndex(v, true);
}
// Remove edges
this.prepareDeletions(removedEdges);
}
protected void prepareDeletions(Map<Id, HugeEdge> removedEdges) {
// Remove edges
for (HugeEdge e : removedEdges.values()) {
this.checkAggregateProperty(e);
// Update edge index
this.indexTx.updateEdgeIndex(e, true);
this.indexTx.updateLabelIndex(e, true);
// Remove edge of OUT and IN
e = e.prepareRemoved();
this.doRemove(this.serializer.writeEdge(e));
this.doRemove(this.serializer.writeEdge(e.switchOwner()));
}
}
protected void prepareUpdates(Set<HugeProperty<?>> addedProps,
Set<HugeProperty<?>> removedProps) {
for (HugeProperty<?> p : removedProps) {
this.checkAggregateProperty(p);
if (p.element().type().isVertex()) {
HugeVertexProperty<?> prop = (HugeVertexProperty<?>) p;
if (this.store().features().supportsUpdateVertexProperty()) {
// Update vertex index without removed property
this.indexTx.updateVertexIndex(prop.element(), false);
// Eliminate the property(OUT and IN owner edge)
this.doEliminate(this.serializer.writeVertexProperty(prop));
} else {
// Override vertex
this.addVertex(prop.element());
}
} else {
assert p.element().type().isEdge();
HugeEdgeProperty<?> prop = (HugeEdgeProperty<?>) p;
if (this.store().features().supportsUpdateEdgeProperty()) {
// Update edge index without removed property
this.indexTx.updateEdgeIndex(prop.element(), false);
// Eliminate the property(OUT and IN owner edge)
this.doEliminate(this.serializer.writeEdgeProperty(prop));
this.doEliminate(this.serializer.writeEdgeProperty(
prop.switchEdgeOwner()));
} else {
// Override edge(it will be in addedEdges & updatedEdges)
this.addEdge(prop.element());
}
}
}
for (HugeProperty<?> p : addedProps) {
this.checkAggregateProperty(p);
if (p.element().type().isVertex()) {
HugeVertexProperty<?> prop = (HugeVertexProperty<?>) p;
if (this.store().features().supportsUpdateVertexProperty()) {
// Update vertex index with new added property
this.indexTx.updateVertexIndex(prop.element(), false);
// Append new property(OUT and IN owner edge)
this.doAppend(this.serializer.writeVertexProperty(prop));
} else {
// Override vertex
this.addVertex(prop.element());
}
} else {
assert p.element().type().isEdge();
HugeEdgeProperty<?> prop = (HugeEdgeProperty<?>) p;
if (this.store().features().supportsUpdateEdgeProperty()) {
// Update edge index with new added property
this.indexTx.updateEdgeIndex(prop.element(), false);
// Append new property(OUT and IN owner edge)
this.doAppend(this.serializer.writeEdgeProperty(prop));
this.doAppend(this.serializer.writeEdgeProperty(
prop.switchEdgeOwner()));
} else {
// Override edge (it will be in addedEdges & updatedEdges)
this.addEdge(prop.element());
}
}
}
}
private void commitPartOfEdgeDeletions(Map<Id, HugeEdge> removedEdges) {
assert this.commitPartOfAdjacentEdges > 0;
this.prepareDeletions(removedEdges);
BackendMutation mutation = this.mutation();
BackendMutation idxMutation = this.indexTransaction().mutation();
try {
this.commitMutation2Backend(mutation, idxMutation);
} catch (Throwable e) {
this.rollbackBackend();
} finally {
mutation.clear();
idxMutation.clear();
}
removedEdges.clear();
}
@Override
public void commit() throws BackendException {
try {
super.commit();
} finally {
this.locksTable.unlock();
}
}
@Override
public void rollback() throws BackendException {
// Rollback properties changes
for (HugeProperty<?> prop : this.updatedOldestProps) {
prop.element().setProperty(prop);
}
try {
super.rollback();
} finally {
this.locksTable.unlock();
}
}
@Override
public QueryResults<BackendEntry> query(Query query) {
if (!(query instanceof ConditionQuery)) {
// It's a sysprop-query, don't need to optimize
LOG.debug("Query{final:{}}", query);
return super.query(query);
}
QueryList<BackendEntry> queries = this.optimizeQueries(query, super::query);
LOG.debug("{}", queries);
return queries.empty() ? QueryResults.empty() :
queries.fetch(this.pageSize);
}
@Override
public Number queryNumber(Query query) {
boolean isConditionQuery = query instanceof ConditionQuery;
boolean hasUpdate = this.hasUpdate();
Aggregate aggregate = query.aggregateNotNull();
// TODO: we can concat index-query results and tx uncommitted records.
if (hasUpdate) {
E.checkArgument(!isConditionQuery,
"It's not allowed to query by index when " +
"there are uncommitted records.");
}
QueryList<Number> queries = this.optimizeQueries(query, q -> {
boolean isIndexQuery = q instanceof IdQuery;
assert isIndexQuery || isConditionQuery || q == query;
// Need to fallback if there are uncommitted records
boolean fallback = hasUpdate;
Number result;
if (fallback) {
// Here just ignore it, and do fallback later
result = null;
} else if (!isIndexQuery || !isConditionQuery) {
// It's a sysprop-query, let parent tx do it
assert !fallback;
result = super.queryNumber(q);
} else {
E.checkArgument(aggregate.func() == AggregateFunc.COUNT,
"The %s operator on index is not supported now",
aggregate.func().string());
assert query instanceof ConditionQuery;
OptimizedType optimized = ((ConditionQuery) query).optimized();
if (this.optimizeAggrByIndex && optimized == OptimizedType.INDEX) {
// The ids size means results count (assume no left index)
result = q.idsSize();
} else {
assert !fallback;
fallback = true;
result = null;
}
}
// Can't be optimized, then do fallback
if (fallback) {
assert result == null;
assert q.resultType().isVertex() || q.resultType().isEdge();
// Reset aggregate to fallback and scan
q.aggregate(null);
result = IteratorUtils.count(q.resultType().isVertex() ?
this.queryVertices(q) :
this.queryEdges(q));
}
return new QueryResults<>(IteratorUtils.of(result), q);
});
QueryResults<Number> results = queries.empty() ?
QueryResults.empty() :
queries.fetch(this.pageSize);
return aggregate.reduce(results.iterator());
}
@Watched(prefix = "graph")
public HugeVertex addVertex(Object... keyValues) {
return this.addVertex(this.constructVertex(true, keyValues));
}
@Watched("graph.addVertex-instance")
public HugeVertex addVertex(HugeVertex vertex) {
this.checkOwnerThread();
assert !vertex.removed();
// Override vertices in local `removedVertices`
this.removedVertices.remove(vertex.id());
try {
this.locksTable.lockReads(LockUtil.VERTEX_LABEL_DELETE,
vertex.schemaLabel().id());
this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
vertex.schemaLabel().indexLabels());
// Ensure vertex label still exists from vertex-construct to lock
this.graph().vertexLabel(vertex.schemaLabel().id());
/*
* No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
* update only can add nullable properties and user data, which is
* unconcerned with add vertex
*/
this.beforeWrite();
this.addedVertices.put(vertex.id(), vertex);
this.afterWrite();
} catch (Throwable e) {
this.locksTable.unlock();
throw e;
}
return vertex;
}
@Watched(prefix = "graph")
public HugeVertex constructVertex(boolean verifyVL, Object... keyValues) {
HugeElement.ElementKeys elemKeys = HugeElement.classifyKeys(keyValues);
if (possibleOlapVertex(elemKeys)) {
Id id = HugeVertex.getIdValue(elemKeys.id());
HugeVertex vertex = HugeVertex.create(this, id,
VertexLabel.OLAP_VL);
ElementHelper.attachProperties(vertex, keyValues);
Iterator<HugeProperty<?>> iterator = vertex.getProperties()
.iterator();
assert iterator.hasNext();
if (iterator.next().propertyKey().olap()) {
return vertex;
}
}
VertexLabel vertexLabel = this.checkVertexLabel(elemKeys.label(),
verifyVL);
Id id = HugeVertex.getIdValue(elemKeys.id());
List<Id> keys = this.graph().mapPkName2Id(elemKeys.keys());
// Check whether id match with id strategy
this.checkId(id, keys, vertexLabel);
// Create HugeVertex
HugeVertex vertex = HugeVertex.create(this, null, vertexLabel);
// Set properties
ElementHelper.attachProperties(vertex, keyValues);
// Assign vertex id
if (this.params().mode().maintaining() &&
vertexLabel.idStrategy() == IdStrategy.AUTOMATIC) {
// Resume id for AUTOMATIC id strategy in restoring mode
vertex.assignId(id, true);
} else {
vertex.assignId(id);
}
return vertex;
}
private boolean possibleOlapVertex(HugeElement.ElementKeys elemKeys) {
return elemKeys.id() != null && elemKeys.label() == null &&
elemKeys.keys().size() == 1;
}
@Watched(prefix = "graph")
public void removeVertex(HugeVertex vertex) {
this.checkOwnerThread();
this.beforeWrite();
// Override vertices in local `addedVertices`
this.addedVertices.remove(vertex.id());
// Force load vertex to ensure all properties are loaded (refer to #2181)
if (vertex.schemaLabel().indexLabels().size() > 0) {
vertex.forceLoad();
}
// Collect the removed vertex
this.removedVertices.put(vertex.id(), vertex);
this.afterWrite();
}
public Iterator<Vertex> queryAdjacentVertices(Iterator<Edge> edges) {
if (this.lazyLoadAdjacentVertex) {
return new MapperIterator<>(edges, edge -> {
return ((HugeEdge) edge).otherVertex();
});
}
return new BatchMapperIterator<>(this.batchSize, edges, batchEdges -> {
List<Id> vertexIds = new ArrayList<>(batchEdges.size());
for (Edge edge : batchEdges) {
vertexIds.add(((HugeEdge) edge).otherVertex().id());
}
assert vertexIds.size() > 0;
return this.queryAdjacentVertices(vertexIds.toArray());
});
}
public Iterator<Vertex> queryAdjacentVertices(Object... vertexIds) {
return this.queryVerticesByIds(vertexIds, true,
this.checkAdjacentVertexExist);
}
public Iterator<Vertex> queryVertices(Object... vertexIds) {
return this.queryVerticesByIds(vertexIds, false, false);
}
public Vertex queryVertex(Object vertexId) {
Iterator<Vertex> iter = this.queryVerticesByIds(new Object[]{vertexId},
false, true);
Vertex vertex = QueryResults.one(iter);
if (vertex == null) {
throw new NotFoundException("Vertex '%s' does not exist", vertexId);
}
return vertex;
}
protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds,
boolean adjacentVertex,
boolean checkMustExist) {
Query.checkForceCapacity(vertexIds.length);
// NOTE: allowed duplicated vertices if query by duplicated ids
List<Id> ids = InsertionOrderUtil.newList();
Map<Id, HugeVertex> vertices = new HashMap<>(vertexIds.length);
IdQuery query = new IdQuery(HugeType.VERTEX);
for (Object vertexId : vertexIds) {
HugeVertex vertex;
Id id = HugeVertex.getIdValue(vertexId);
if (id == null || this.removedVertices.containsKey(id)) {
// The record has been deleted
continue;
} else if ((vertex = this.addedVertices.get(id)) != null ||
(vertex = this.updatedVertices.get(id)) != null) {
if (vertex.expired()) {
continue;
}
// Found from local tx
vertices.put(vertex.id(), vertex);
} else {
// Prepare to query from backend store
query.query(id);
}
ids.add(id);
}
if (!query.empty()) {
// Query from backend store
query.mustSortByInput(false);
Iterator<HugeVertex> it = this.queryVerticesFromBackend(query);
QueryResults.fillMap(it, vertices);
}
return new MapperIterator<>(ids.iterator(), id -> {
HugeVertex vertex = vertices.get(id);
if (vertex == null) {
if (checkMustExist) {
throw new NotFoundException(
"Vertex '%s' does not exist", id);
} else if (adjacentVertex) {
assert !checkMustExist;
// Return undefined if adjacentVertex but !checkMustExist
vertex = HugeVertex.undefined(this.graph(), id);
} else {
// Return null
assert vertex == null;
}
}
return vertex;
});
}
public Iterator<Vertex> queryVertices() {
Query q = new Query(HugeType.VERTEX);
return this.queryVertices(q);
}
public Iterator<Vertex> queryVertices(Query query) {
if (this.hasUpdate()) {
E.checkArgument(query.noLimitAndOffset(),
"It's not allowed to query with offser/limit " +
"when there are uncommitted records.");
// TODO: also add check: no SCAN, no OLAP
E.checkArgument(!query.paging(),
"It's not allowed to query by paging when " +
"there are uncommitted records.");
}
query.resetActualOffset();
Iterator<HugeVertex> results = this.queryVerticesFromBackend(query);
results = this.filterUnmatchedRecords(results, query);
@SuppressWarnings("unchecked")
Iterator<Vertex> r = (Iterator<Vertex>) joinTxVertices(query, results);
return this.skipOffsetOrStopLimit(r, query);
}
protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
assert query.resultType().isVertex();
QueryResults<BackendEntry> results = this.query(query);
Iterator<BackendEntry> entries = results.iterator();
Iterator<HugeVertex> vertices = new MapperIterator<>(entries,
this::parseEntry);
vertices = this.filterExpiredResultFromBackend(query, vertices);
if (!this.store().features().supportsQuerySortByInputIds()) {
// There is no id in BackendEntry, so sort after deserialization
vertices = results.keepInputOrderIfNeeded(vertices);
}
return vertices;
}
@Watched(prefix = "graph")
public HugeEdge addEdge(HugeEdge edge) {
this.checkOwnerThread();
assert !edge.removed();
// Override edges in local `removedEdges`
this.removedEdges.remove(edge.id());
try {
this.locksTable.lockReads(LockUtil.EDGE_LABEL_DELETE,
edge.schemaLabel().id());
this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
edge.schemaLabel().indexLabels());
// Ensure edge label still exists from edge-construct to lock
this.graph().edgeLabel(edge.schemaLabel().id());
/*
* No need to lock EDGE_LABEL_ADD_UPDATE, because edge label
* update only can add nullable properties and user data, which is
* unconcerned with add edge
*/
this.beforeWrite();
this.addedEdges.put(edge.id(), edge);
this.afterWrite();
} catch (Throwable e) {
this.locksTable.unlock();
throw e;
}
return edge;
}
@Watched(prefix = "graph")
public void removeEdge(HugeEdge edge) {
this.checkOwnerThread();
this.beforeWrite();
// Override edges in local `addedEdges`
this.addedEdges.remove(edge.id());
// Collect the removed edge
this.removedEdges.put(edge.id(), edge);
this.afterWrite();
}
public Iterator<Edge> queryEdgesByVertex(Id id) {
return this.queryEdges(constructEdgesQuery(id, Directions.BOTH));
}
public Iterator<Edge> queryEdges(Object... edgeIds) {
return this.queryEdgesByIds(edgeIds, false);
}
public Edge queryEdge(Object edgeId) {
Iterator<Edge> iter = this.queryEdgesByIds(new Object[]{edgeId}, true);
Edge edge = QueryResults.one(iter);
if (edge == null) {
throw new NotFoundException("Edge '%s' does not exist", edgeId);
}
return edge;
}
protected Iterator<Edge> queryEdgesByIds(Object[] edgeIds,
boolean verifyId) {
Query.checkForceCapacity(edgeIds.length);
// NOTE: allowed duplicated edges if query by duplicated ids
List<Id> ids = InsertionOrderUtil.newList();
Map<Id, HugeEdge> edges = new HashMap<>(edgeIds.length);
IdQuery query = new IdQuery(HugeType.EDGE);
for (Object edgeId : edgeIds) {
HugeEdge edge;
EdgeId id = HugeEdge.getIdValue(edgeId, !verifyId);
if (id == null) {
continue;
}
if (id.direction() == Directions.IN) {
id = id.switchDirection();
}
if (this.removedEdges.containsKey(id)) {
// The record has been deleted
continue;
} else if ((edge = this.addedEdges.get(id)) != null ||
(edge = this.updatedEdges.get(id)) != null) {
if (edge.expired()) {
continue;
}
// Found from local tx
edges.put(edge.id(), edge);
} else {
// Prepare to query from backend store
query.query(id);
}
ids.add(id);
}
if (!query.empty()) {
// Query from backend store
if (edges.isEmpty() && query.idsSize() == ids.size()) {
/*
* Sort at the lower layer and return directly if there is no
* local vertex and duplicated id.
*/
Iterator<HugeEdge> it = this.queryEdgesFromBackend(query);
@SuppressWarnings({"unchecked", "rawtypes"})
Iterator<Edge> r = (Iterator) it;
return r;
}
query.mustSortByInput(false);
Iterator<HugeEdge> it = this.queryEdgesFromBackend(query);
QueryResults.fillMap(it, edges);
}
return new MapperIterator<>(ids.iterator(), id -> {
Edge edge = edges.get(id);
return edge;
});
}
public Iterator<Edge> queryEdges() {
Query q = new Query(HugeType.EDGE);
return this.queryEdges(q);
}
@Watched
public Iterator<Edge> queryEdges(Query query) {
if (this.hasUpdate()) {
E.checkArgument(query.noLimitAndOffset(),
"It's not allowed to query with offser/limit " +
"when there are uncommitted records.");
// TODO: also add check: no SCAN, no OLAP
E.checkArgument(!query.paging(),
"It's not allowed to query by paging when " +
"there are uncommitted records.");
}
query.resetActualOffset();
Iterator<HugeEdge> results = this.queryEdgesFromBackend(query);
results = this.filterUnmatchedRecords(results, query);
/*
* Without repeated edges if not querying by BOTH all edges
* TODO: any unconsidered case, maybe the query with OR condition?
*/
boolean dedupEdge = false;
if (dedupEdge) {
Set<Id> returnedEdges = new HashSet<>();
results = new FilterIterator<>(results, edge -> {
// Filter duplicated edges (edge may be repeated query both)
if (!returnedEdges.contains(edge.id())) {
/*
* NOTE: Maybe some edges are IN and others are OUT
* if querying edges both directions, perhaps it would look
* better if we convert all edges in results to OUT, but
* that would break the logic when querying IN edges.
*/
returnedEdges.add(edge.id());
return true;
} else {
LOG.debug("Result contains duplicated edge: {}", edge);
return false;
}
});
}
@SuppressWarnings("unchecked")
Iterator<Edge> r = (Iterator<Edge>) joinTxEdges(query, results,
this.removedVertices);
return this.skipOffsetOrStopLimit(r, query);
}
protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
assert query.resultType().isEdge();
QueryResults<BackendEntry> results = this.query(query);
Iterator<BackendEntry> entries = results.iterator();
Iterator<HugeEdge> edges = new FlatMapperIterator<>(entries, entry -> {
// Edges are in a vertex
HugeVertex vertex = this.parseEntry(entry);
if (vertex == null) {
return null;
}
if (query.idsSize() == 1) {
assert vertex.getEdges().size() == 1;
}
/*
* Copy to avoid ConcurrentModificationException when removing edge
* because HugeEdge.remove() will update edges in owner vertex
*/
return new ListIterator<>(ImmutableList.copyOf(vertex.getEdges()));
});
edges = this.filterExpiredResultFromBackend(query, edges);
if (!this.store().features().supportsQuerySortByInputIds()) {
// There is no id in BackendEntry, so sort after deserialization
edges = results.keepInputOrderIfNeeded(edges);
}
return edges;
}
@Watched(prefix = "graph")
public <V> void addVertexProperty(HugeVertexProperty<V> prop) {
// NOTE: this method can also be used to update property
HugeVertex vertex = prop.element();
E.checkState(vertex != null,
"No owner for updating property '%s'", prop.key());
// Add property in memory for new created vertex
if (vertex.fresh()) {
// The owner will do property update
vertex.setProperty(prop);
return;
}
// Check is updating property of added/removed vertex
E.checkArgument(!this.addedVertices.containsKey(vertex.id()) ||
this.updatedVertices.containsKey(vertex.id()),
"Can't update property '%s' for adding-state vertex",
prop.key());
E.checkArgument(!vertex.removed() &&
!this.removedVertices.containsKey(vertex.id()),
"Can't update property '%s' for removing-state vertex",
prop.key());
// Check is updating primary key
List<Id> primaryKeyIds = vertex.schemaLabel().primaryKeys();
E.checkArgument(!primaryKeyIds.contains(prop.propertyKey().id()),
"Can't update primary key: '%s'", prop.key());
// Do property update
this.lockForUpdateProperty(vertex.schemaLabel(), prop, () -> {
// Update old vertex to remove index (without new property)
this.indexTx.updateVertexIndex(vertex, true);
// Update(add) vertex property
this.propertyUpdated(vertex, prop, vertex.setProperty(prop));
});
}
@Watched(prefix = "graph")
public <V> void removeVertexProperty(HugeVertexProperty<V> prop) {
HugeVertex vertex = prop.element();
PropertyKey propKey = prop.propertyKey();
E.checkState(vertex != null,
"No owner for removing property '%s'", prop.key());
// Maybe have ever been removed (compatible with tinkerpop)
if (!vertex.hasProperty(propKey.id())) {
// PropertyTest shouldAllowRemovalFromVertexWhenAlreadyRemoved()
return;
}
// Check is removing primary key
List<Id> primaryKeyIds = vertex.schemaLabel().primaryKeys();
E.checkArgument(!primaryKeyIds.contains(propKey.id()),
"Can't remove primary key '%s'", prop.key());
// Remove property in memory for new created vertex
if (vertex.fresh()) {
// The owner will do property update
vertex.removeProperty(propKey.id());
return;
}
// Check is updating property of added/removed vertex
E.checkArgument(!this.addedVertices.containsKey(vertex.id()) ||
this.updatedVertices.containsKey(vertex.id()),
"Can't remove property '%s' for adding-state vertex",
prop.key());
E.checkArgument(!this.removedVertices.containsKey(vertex.id()),
"Can't remove property '%s' for removing-state vertex",
prop.key());
// Do property update
this.lockForUpdateProperty(vertex.schemaLabel(), prop, () -> {
// Update old vertex to remove index (with the property)
this.indexTx.updateVertexIndex(vertex, true);
// Update(remove) vertex property
HugeProperty<?> removed = vertex.removeProperty(propKey.id());
this.propertyUpdated(vertex, null, removed);
});
}
@Watched(prefix = "graph")
public <V> void addEdgeProperty(HugeEdgeProperty<V> prop) {
// NOTE: this method can also be used to update property
HugeEdge edge = prop.element();
E.checkState(edge != null,
"No owner for updating property '%s'", prop.key());
// Add property in memory for new created edge
if (edge.fresh()) {
// The owner will do property update
edge.setProperty(prop);
return;
}
// Check is updating property of added/removed edge
E.checkArgument(!this.addedEdges.containsKey(edge.id()) ||
this.updatedEdges.containsKey(edge.id()),
"Can't update property '%s' for adding-state edge",
prop.key());
E.checkArgument(!edge.removed() &&
!this.removedEdges.containsKey(edge.id()),
"Can't update property '%s' for removing-state edge",
prop.key());
// Check is updating sort key
List<Id> sortKeys = edge.schemaLabel().sortKeys();
E.checkArgument(!sortKeys.contains(prop.propertyKey().id()),
"Can't update sort key '%s'", prop.key());
// Do property update
this.lockForUpdateProperty(edge.schemaLabel(), prop, () -> {
// Update old edge to remove index (without new property)
this.indexTx.updateEdgeIndex(edge, true);
// Update(add) edge property
this.propertyUpdated(edge, prop, edge.setProperty(prop));
});
}
@Watched(prefix = "graph")
public <V> void removeEdgeProperty(HugeEdgeProperty<V> prop) {
HugeEdge edge = prop.element();
PropertyKey propKey = prop.propertyKey();
E.checkState(edge != null,
"No owner for removing property '%s'", prop.key());
// Maybe have ever been removed
if (!edge.hasProperty(propKey.id())) {
return;
}
// Check is removing sort key
List<Id> sortKeyIds = edge.schemaLabel().sortKeys();
E.checkArgument(!sortKeyIds.contains(prop.propertyKey().id()),
"Can't remove sort key '%s'", prop.key());
// Remove property in memory for new created edge
if (edge.fresh()) {
// The owner will do property update
edge.removeProperty(propKey.id());
return;
}
// Check is updating property of added/removed edge
E.checkArgument(!this.addedEdges.containsKey(edge.id()) ||
this.updatedEdges.containsKey(edge.id()),
"Can't remove property '%s' for adding-state edge",
prop.key());
E.checkArgument(!this.removedEdges.containsKey(edge.id()),
"Can't remove property '%s' for removing-state edge",
prop.key());
// Do property update
this.lockForUpdateProperty(edge.schemaLabel(), prop, () -> {
// Update old edge to remove index (with the property)
this.indexTx.updateEdgeIndex(edge, true);
// Update(remove) edge property
this.propertyUpdated(edge, null,
edge.removeProperty(propKey.id()));
});
}
/**
* Construct one edge condition query based on source vertex, direction and
* edge labels
*
* @param sourceVertex source vertex of edge
* @param direction only be "IN", "OUT" or "BOTH"
* @param edgeLabels edge labels of queried edges
* @return constructed condition query
*/
@Watched
public static ConditionQuery constructEdgesQuery(Id sourceVertex,
Directions direction,
Id... edgeLabels) {
E.checkState(sourceVertex != null,
"The edge query must contain source vertex");
E.checkState(direction != null,
"The edge query must contain direction");
ConditionQuery query = new ConditionQuery(HugeType.EDGE);
// Edge source vertex
query.eq(HugeKeys.OWNER_VERTEX, sourceVertex);
// Edge direction
if (direction == Directions.BOTH) {
query.query(Condition.or(
Condition.eq(HugeKeys.DIRECTION, Directions.OUT),
Condition.eq(HugeKeys.DIRECTION, Directions.IN)));
} else {
assert direction == Directions.OUT || direction == Directions.IN;
query.eq(HugeKeys.DIRECTION, direction);
}
// Edge labels
if (edgeLabels.length == 1) {
query.eq(HugeKeys.LABEL, edgeLabels[0]);
} else if (edgeLabels.length > 1) {
query.query(Condition.in(HugeKeys.LABEL,
Arrays.asList(edgeLabels)));
}
return query;
}
public static ConditionQuery constructEdgesQuery(Id sourceVertex,
Directions direction,
List<Id> edgeLabels) {
E.checkState(sourceVertex != null,
"The edge query must contain source vertex");
E.checkState(direction != null,
"The edge query must contain direction");
ConditionQuery query = new ConditionQuery(HugeType.EDGE);
// Edge source vertex
query.eq(HugeKeys.OWNER_VERTEX, sourceVertex);
// Edge direction
if (direction == Directions.BOTH) {
query.query(Condition.or(
Condition.eq(HugeKeys.DIRECTION, Directions.OUT),
Condition.eq(HugeKeys.DIRECTION, Directions.IN)));
} else {
assert direction == Directions.OUT || direction == Directions.IN;
query.eq(HugeKeys.DIRECTION, direction);
}
// Edge labels
if (edgeLabels.size() == 1) {
query.eq(HugeKeys.LABEL, edgeLabels.get(0));
} else if (edgeLabels.size() > 1) {
query.query(Condition.in(HugeKeys.LABEL, edgeLabels));
}
return query;
}
public static boolean matchFullEdgeSortKeys(ConditionQuery query,
HugeGraph graph) {
// All queryKeys in sortKeys
return matchEdgeSortKeys(query, true, graph);
}
public static boolean matchPartialEdgeSortKeys(ConditionQuery query,
HugeGraph graph) {
// Partial queryKeys in sortKeys
return matchEdgeSortKeys(query, false, graph);
}
private static boolean matchEdgeSortKeys(ConditionQuery query,
boolean matchAll,
HugeGraph graph) {
assert query.resultType().isEdge();
Id label = query.condition(HugeKeys.LABEL);
if (label == null) {
return false;
}
List<Id> sortKeys = graph.edgeLabel(label).sortKeys();
if (sortKeys.isEmpty()) {
return false;
}
Set<Id> queryKeys = query.userpropKeys();
for (int i = sortKeys.size(); i > 0; i--) {
List<Id> subFields = sortKeys.subList(0, i);
if (queryKeys.containsAll(subFields)) {
if (queryKeys.size() == subFields.size() || !matchAll) {
/*
* Return true if:
* matchAll=true and all queryKeys are in sortKeys
* or
* partial queryKeys are in sortKeys
*/
return true;
}
}
}
return false;
}
private static void verifyVerticesConditionQuery(ConditionQuery query) {
assert query.resultType().isVertex();
int total = query.conditionsSize();
if (total == 1) {
/*
* Supported query:
* 1.query just by vertex label
* 2.query just by PROPERTIES (like containsKey,containsValue)
* 3.query with scan
*/
if (query.containsCondition(HugeKeys.LABEL) ||
query.containsCondition(HugeKeys.PROPERTIES) ||
query.containsScanRelation()) {
return;
}
}
int matched = 0;
if (query.containsCondition(HugeKeys.PROPERTIES)) {
matched++;
if (query.containsCondition(HugeKeys.LABEL)) {
matched++;
}
}
if (matched != total) {
throw new HugeException("Not supported querying vertices by %s",
query.conditions());
}
}
private static void verifyEdgesConditionQuery(ConditionQuery query) {
assert query.resultType().isEdge();
int total = query.conditionsSize();
if (total == 1) {
/*
* Supported query:
* 1.query just by edge label
* 2.query just by PROPERTIES (like containsKey,containsValue)
* 3.query with scan
*/
if (query.containsCondition(HugeKeys.LABEL) ||
query.containsCondition(HugeKeys.PROPERTIES) ||
query.containsScanRelation()) {
return;
}
}
int matched = 0;
for (HugeKeys key : EdgeId.KEYS) {
Object value = query.condition(key);
if (value == null) {
break;
}
matched++;
}
int count = matched;
if (query.containsCondition(HugeKeys.PROPERTIES)) {
matched++;
if (count < 3 && query.containsCondition(HugeKeys.LABEL)) {
matched++;
}
}
if (matched != total) {
throw new HugeException(
"Not supported querying edges by %s, expect %s",
query.conditions(), EdgeId.KEYS[count]);
}
}
private <R> QueryList<R> optimizeQueries(Query query,
QueryResults.Fetcher<R> fetcher) {
QueryList<R> queries = new QueryList<>(query, fetcher);
if (!(query instanceof ConditionQuery)) {
// It's a sysprop-query, add itself as subquery, don't need to flatten
queries.add(query);
return queries;
}
boolean supportIn = this.storeFeatures().supportsQueryWithInCondition();
for (ConditionQuery cq : ConditionQueryFlatten.flatten(
(ConditionQuery) query, supportIn)) {
// Optimize by sysprop
Query q = this.optimizeQuery(cq);
/*
* NOTE: There are two possibilities for the returned q:
* 1.sysprop-query, which would not be empty.
* 2.index-query result(ids after optimization), which may be empty.
*/
if (q == null) {
queries.add(this.indexQuery(cq), this.batchSize);
} else if (!q.empty()) {
queries.add(q);
}
}
return queries;
}
private Query optimizeQuery(ConditionQuery query) {
if (query.idsSize() > 0) {
throw new HugeException(
"Not supported querying by id and conditions: %s", query);
}
Id label = query.condition(HugeKeys.LABEL);
// Optimize vertex query
if (label != null && query.resultType().isVertex()) {
VertexLabel vertexLabel = this.graph().vertexLabel(label);
if (vertexLabel.idStrategy() == IdStrategy.PRIMARY_KEY) {
List<Id> keys = vertexLabel.primaryKeys();
E.checkState(!keys.isEmpty(),
"The primary keys can't be empty when using " +
"'%s' id strategy for vertex label '%s'",
IdStrategy.PRIMARY_KEY, vertexLabel.name());
if (query.matchUserpropKeys(keys)) {
// Query vertex by label + primary-values
query.optimized(OptimizedType.PRIMARY_KEY);
String primaryValues = query.userpropValuesString(keys);
LOG.debug("Query vertices by primaryKeys: {}", query);
// Convert {vertex-label + primary-key} to vertex-id
Id id = SplicingIdGenerator.splicing(label.asString(),
primaryValues);
/*
* Just query by primary-key(id), ignore other userprop(if
* exists) that it will be filtered by queryVertices(Query)
*/
return new IdQuery(query, id);
}
}
}
// Optimize edge query
if (query.resultType().isEdge() && label != null &&
query.condition(HugeKeys.OWNER_VERTEX) != null &&
query.condition(HugeKeys.DIRECTION) != null &&
matchEdgeSortKeys(query, false, this.graph())) {
// Query edge by sourceVertex + direction + label + sort-values
query.optimized(OptimizedType.SORT_KEYS);
query = query.copy();
// Serialize sort-values
List<Id> keys = this.graph().edgeLabel(label).sortKeys();
List<Condition> conditions =
GraphIndexTransaction.constructShardConditions(
query, keys, HugeKeys.SORT_VALUES);
query.query(conditions);
/*
* Reset all userprop since transferred to sort-keys, ignore other
* userprop(if exists) that it will be filtered by queryEdges(Query)
*/
query.resetUserpropConditions();
LOG.debug("Query edges by sortKeys: {}", query);
return query;
}
/*
* Query only by sysprops, like: by vertex label, by edge label.
* NOTE: we assume sysprops would be indexed by backend store
* but we don't support query edges only by direction/target-vertex.
*/
if (query.allSysprop()) {
if (query.resultType().isVertex()) {
verifyVerticesConditionQuery(query);
} else if (query.resultType().isEdge()) {
verifyEdgesConditionQuery(query);
}
/*
* Just support:
* 1.not query by label
* 2.or query by label and store supports this feature
*/
boolean byLabel = (label != null && query.conditionsSize() == 1);
if (!byLabel || this.store().features().supportsQueryByLabel()) {
return query;
}
}
return null;
}
private IdHolderList indexQuery(ConditionQuery query) {
/*
* Optimize by index-query
* It will return a list of id (maybe empty) if success,
* or throw exception if there is no any index for query properties.
*/
this.beforeRead();
try {
return this.indexTx.queryIndex(query);
} finally {
this.afterRead();
}
}
private VertexLabel checkVertexLabel(Object label, boolean verifyLabel) {
HugeVertexFeatures features = graph().features().vertex();
// Check Vertex label
if (label == null && features.supportsDefaultLabel()) {
label = features.defaultLabel();
}
if (label == null) {
throw Element.Exceptions.labelCanNotBeNull();
}
E.checkArgument(label instanceof String || label instanceof VertexLabel,
"Expect a string or a VertexLabel object " +
"as the vertex label argument, but got: '%s'", label);
// The label must be an instance of String or VertexLabel
if (label instanceof String) {
if (verifyLabel) {
ElementHelper.validateLabel((String) label);
}
label = graph().vertexLabel((String) label);
}
assert (label instanceof VertexLabel);
return (VertexLabel) label;
}
private void checkId(Id id, List<Id> keys, VertexLabel vertexLabel) {
// Check whether id match with id strategy
IdStrategy strategy = vertexLabel.idStrategy();
switch (strategy) {
case PRIMARY_KEY:
E.checkArgument(id == null,
"Can't customize vertex id when " +
"id strategy is '%s' for vertex label '%s'",
strategy, vertexLabel.name());
// Check whether primaryKey exists
List<Id> primaryKeys = vertexLabel.primaryKeys();
E.checkArgument(keys.containsAll(primaryKeys),
"The primary keys: %s of vertex label '%s' " +
"must be set when using '%s' id strategy",
this.graph().mapPkId2Name(primaryKeys),
vertexLabel.name(), strategy);
break;
case AUTOMATIC:
if (this.params().mode().maintaining()) {
E.checkArgument(id != null && id.number(),
"Must customize vertex number id when " +
"id strategy is '%s' for vertex label " +
"'%s' in restoring mode",
strategy, vertexLabel.name());
} else {
E.checkArgument(id == null,
"Can't customize vertex id when " +
"id strategy is '%s' for vertex label '%s'",
strategy, vertexLabel.name());
}
break;
case CUSTOMIZE_STRING:
case CUSTOMIZE_UUID:
E.checkArgument(id != null && !id.number(),
"Must customize vertex string id when " +
"id strategy is '%s' for vertex label '%s'",
strategy, vertexLabel.name());
break;
case CUSTOMIZE_NUMBER:
E.checkArgument(id != null && id.number(),
"Must customize vertex number id when " +
"id strategy is '%s' for vertex label '%s'",
strategy, vertexLabel.name());
break;
default:
throw new AssertionError("Unknown id strategy: " + strategy);
}
}
private void checkAggregateProperty(HugeElement element) {
E.checkArgument(element.getAggregateProperties().isEmpty() ||
this.store().features().supportsAggregateProperty(),
"The %s store does not support aggregate property",
this.store().provider().type());
}
private void checkAggregateProperty(HugeProperty<?> property) {
E.checkArgument(!property.isAggregateType() ||
this.store().features().supportsAggregateProperty(),
"The %s store does not support aggregate property",
this.store().provider().type());
}
private void checkNonnullProperty(HugeVertex vertex) {
Set<Id> keys = vertex.getPropertyKeys();
VertexLabel vertexLabel = vertex.schemaLabel();
// Check whether passed all non-null property
@SuppressWarnings("unchecked")
Collection<Id> nonNullKeys = CollectionUtils.subtract(
vertexLabel.properties(),
vertexLabel.nullableKeys());
if (!keys.containsAll(nonNullKeys)) {
@SuppressWarnings("unchecked")
Collection<Id> missed = CollectionUtils.subtract(nonNullKeys, keys);
HugeGraph graph = this.graph();
E.checkArgument(false, "All non-null property keys %s of " +
"vertex label '%s' must be set, missed keys %s",
graph.mapPkId2Name(nonNullKeys), vertexLabel.name(),
graph.mapPkId2Name(missed));
}
}
private void checkVertexExistIfCustomizedId(Map<Id, HugeVertex> vertices) {
Set<Id> ids = new HashSet<>();
for (HugeVertex vertex : vertices.values()) {
VertexLabel vl = vertex.schemaLabel();
if (!vl.hidden() && vl.idStrategy().isCustomized()) {
ids.add(vertex.id());
}
}
if (ids.isEmpty()) {
return;
}
IdQuery idQuery = new IdQuery(HugeType.VERTEX, ids);
Iterator<HugeVertex> results = this.queryVerticesFromBackend(idQuery);
try {
if (!results.hasNext()) {
return;
}
HugeVertex existedVertex = results.next();
HugeVertex newVertex = vertices.get(existedVertex.id());
if (!existedVertex.label().equals(newVertex.label())) {
throw new HugeException(
"The newly added vertex with id:'%s' label:'%s' " +
"is not allowed to insert, because already exist " +
"a vertex with same id and different label:'%s'",
newVertex.id(), newVertex.label(),
existedVertex.label());
}
} finally {
CloseableIterator.closeIterator(results);
}
}
private void lockForUpdateProperty(SchemaLabel schemaLabel,
HugeProperty<?> prop,
Runnable callback) {
this.checkOwnerThread();
Id pkey = prop.propertyKey().id();
Set<Id> indexIds = new HashSet<>();
for (Id il : schemaLabel.indexLabels()) {
if (graph().indexLabel(il).indexFields().contains(pkey)) {
indexIds.add(il);
}
}
String group = schemaLabel.type() == HugeType.VERTEX_LABEL ?
LockUtil.VERTEX_LABEL_DELETE :
LockUtil.EDGE_LABEL_DELETE;
try {
this.locksTable.lockReads(group, schemaLabel.id());
this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE, indexIds);
// Ensure schema label still exists
if (schemaLabel.type() == HugeType.VERTEX_LABEL) {
this.graph().vertexLabel(schemaLabel.id());
} else {
assert schemaLabel.type() == HugeType.EDGE_LABEL;
this.graph().edgeLabel(schemaLabel.id());
}
/*
* No need to lock INDEX_LABEL_ADD_UPDATE, because index label
* update only can add user data, which is unconcerned with
* update property
*/
this.beforeWrite();
callback.run();
this.afterWrite();
} catch (Throwable e) {
this.locksTable.unlock();
throw e;
}
}
private void removeLeftIndexIfNeeded(Map<Id, HugeVertex> vertices) {
Set<Id> ids = vertices.keySet();
if (ids.isEmpty()) {
return;
}
IdQuery idQuery = new IdQuery(HugeType.VERTEX, ids);
Iterator<HugeVertex> results = this.queryVerticesFromBackend(idQuery);
try {
while (results.hasNext()) {
HugeVertex existedVertex = results.next();
this.indexTx.updateVertexIndex(existedVertex, true);
}
} finally {
CloseableIterator.closeIterator(results);
}
}
private <T extends HugeElement> Iterator<T> filterUnmatchedRecords(
Iterator<T> results,
Query query) {
// Filter unused or incorrect records
return new FilterIterator<T>(results, elem -> {
// TODO: Left vertex/edge should to be auto removed via async task
if (elem.schemaLabel().undefined()) {
LOG.warn("Left record is found: id={}, label={}, properties={}",
elem.id(), elem.schemaLabel().id(),
elem.getPropertiesMap());
}
// Filter hidden results
if (!query.showHidden() && Graph.Hidden.isHidden(elem.label())) {
return false;
}
// Filter vertices/edges of deleting label
if (elem.schemaLabel().status().deleting() &&
!query.showDeleting()) {
return false;
}
// Process results that query from left index or primary-key
if (query.resultType().isVertex() == elem.type().isVertex() &&
!rightResultFromIndexQuery(query, elem)) {
// Only index query will come here
return false;
}
return true;
});
}
private boolean rightResultFromIndexQuery(Query query, HugeElement elem) {
/*
* If query is ConditionQuery or query.originQuery() is ConditionQuery
* means it's index query
*/
if (!(query instanceof ConditionQuery)) {
if (query.originQuery() instanceof ConditionQuery) {
query = query.originQuery();
} else {
return true;
}
}
ConditionQuery cq = (ConditionQuery) query;
if (cq.optimized() == OptimizedType.NONE || cq.test(elem)) {
if (cq.existLeftIndex(elem.id())) {
/*
* Both have correct and left index, wo should return true
* but also needs to cleaned up left index
*/
try {
this.indexTx.asyncRemoveIndexLeft(cq, elem);
} catch (Throwable e) {
LOG.warn("Failed to remove left index for query '{}', " +
"element '{}'", cq, elem, e);
}
}
/* Return true if:
* 1.not query by index or by primary-key/sort-key
* (cq.optimized() == 0 means query just by sysprop)
* 2.the result match all conditions
*/
return true;
}
if (cq.optimized() == OptimizedType.INDEX) {
try {
this.indexTx.asyncRemoveIndexLeft(cq, elem);
} catch (Throwable e) {
LOG.warn("Failed to remove left index for query '{}', " +
"element '{}'", cq, elem, e);
}
}
return false;
}
private <T extends HugeElement> Iterator<T> filterExpiredResultFromBackend(
Query query, Iterator<T> results) {
if (this.store().features().supportsTtl() || query.showExpired()) {
return results;
}
// Filter expired vertices/edges with TTL
return new FilterIterator<>(results, elem -> {
if (elem.expired()) {
DeleteExpiredJob.asyncDeleteExpiredObject(this.graph(), elem);
return false;
}
return true;
});
}
private <T> Iterator<T> skipOffsetOrStopLimit(Iterator<T> results,
Query query) {
if (query.noLimitAndOffset()) {
return results;
}
// Skip offset
long offset = query.offset();
if (offset > 0L && results.hasNext()) {
/*
* Must call results.hasNext() before query.actualOffset() due to
* some backends will go offset and update query.actualOffset
*/
long current = query.actualOffset();
for (; current < offset && results.hasNext(); current++) {
results.next();
query.goOffset(1L);
}
}
// Stop if reach limit
return new LimitIterator<>(results, elem -> {
long count = query.goOffset(1L);
return query.reachLimit(count - 1L);
});
}
private Iterator<?> joinTxVertices(Query query,
Iterator<HugeVertex> vertices) {
assert query.resultType().isVertex();
BiFunction<Query, HugeVertex, HugeVertex> matchTxFunc = (q, v) -> {
if (v.expired() && !q.showExpired()) {
// Filter expired vertices with TTL
return null;
}
// Filter vertices matched conditions
return q.test(v) ? v : null;
};
vertices = this.joinTxRecords(query, vertices, matchTxFunc,
this.addedVertices, this.removedVertices,
this.updatedVertices);
return vertices;
}
private Iterator<?> joinTxEdges(Query query, Iterator<HugeEdge> edges,
Map<Id, HugeVertex> removingVertices) {
assert query.resultType().isEdge();
BiFunction<Query, HugeEdge, HugeEdge> matchTxFunc = (q, e) -> {
assert q.resultType() == HugeType.EDGE;
if (e.expired() && !q.showExpired()) {
// Filter expired edges with TTL
return null;
}
// Filter edges matched conditions
return q.test(e) ? e : q.test(e = e.switchOwner()) ? e : null;
};
edges = this.joinTxRecords(query, edges, matchTxFunc,
this.addedEdges, this.removedEdges,
this.updatedEdges);
if (removingVertices.isEmpty()) {
return edges;
}
// Filter edges that belong to deleted vertex
return new FilterIterator<HugeEdge>(edges, edge -> {
for (HugeVertex v : removingVertices.values()) {
if (edge.belongToVertex(v)) {
return false;
}
}
return true;
});
}
private <V extends HugeElement> Iterator<V> joinTxRecords(
Query query,
Iterator<V> records,
BiFunction<Query, V, V> matchFunc,
Map<Id, V> addedTxRecords,
Map<Id, V> removedTxRecords,
Map<Id, V> updatedTxRecords) {
this.checkOwnerThread();
// Return the origin results if there is no change in tx
if (addedTxRecords.isEmpty() &&
removedTxRecords.isEmpty() &&
updatedTxRecords.isEmpty()) {
return records;
}
Set<V> txResults = InsertionOrderUtil.newSet();
/*
* Collect added/updated records
* Records in memory have higher priority than query from backend store
*/
for (V elem : addedTxRecords.values()) {
if (query.reachLimit(txResults.size())) {
break;
}
if ((elem = matchFunc.apply(query, elem)) != null) {
txResults.add(elem);
}
}
for (V elem : updatedTxRecords.values()) {
if (query.reachLimit(txResults.size())) {
break;
}
if ((elem = matchFunc.apply(query, elem)) != null) {
txResults.add(elem);
}
}
// Filter backend record if it's updated in memory
Iterator<V> backendResults = new FilterIterator<>(records, elem -> {
Id id = elem.id();
return !addedTxRecords.containsKey(id) &&
!updatedTxRecords.containsKey(id) &&
!removedTxRecords.containsKey(id);
});
return new ExtendableIterator<V>(txResults.iterator(), backendResults);
}
private void checkTxVerticesCapacity() throws LimitExceedException {
if (this.verticesInTxSize() >= this.verticesCapacity) {
throw new LimitExceedException(
"Vertices size has reached tx capacity %d",
this.verticesCapacity);
}
}
private void checkTxEdgesCapacity() throws LimitExceedException {
if (this.edgesInTxSize() >= this.edgesCapacity) {
throw new LimitExceedException(
"Edges size has reached tx capacity %d",
this.edgesCapacity);
}
}
private void propertyUpdated(HugeElement element, HugeProperty<?> property,
HugeProperty<?> oldProperty) {
if (element.type().isVertex()) {
this.updatedVertices.put(element.id(), (HugeVertex) element);
} else {
assert element.type().isEdge();
this.updatedEdges.put(element.id(), (HugeEdge) element);
}
if (oldProperty != null) {
this.updatedOldestProps.add(oldProperty);
}
if (property == null) {
this.removedProps.add(oldProperty);
} else {
this.addedProps.remove(property);
this.addedProps.add(property);
}
}
private HugeVertex parseEntry(BackendEntry entry) {
try {
HugeVertex vertex = this.serializer.readVertex(graph(), entry);
assert vertex != null;
return vertex;
} catch (ForbiddenException | SecurityException e) {
/*
* Can't ignore permission exception here, otherwise users will
* be confused to treat as the record does not exist.
*/
throw e;
} catch (Throwable e) {
LOG.error("Failed to parse entry: {}", entry, e);
if (this.ignoreInvalidEntry) {
return null;
}
throw e;
}
}
/*
* TODO: set these methods to protected
*/
public void removeIndex(IndexLabel indexLabel) {
// TODO: use event to replace direct call
this.checkOwnerThread();
this.beforeWrite();
this.indexTx.removeIndex(indexLabel);
this.afterWrite();
}
public void updateIndex(Id ilId, HugeElement element, boolean removed) {
// TODO: use event to replace direct call
this.checkOwnerThread();
this.indexTx.updateIndex(ilId, element, removed);
}
public void removeIndex(HugeIndex index) {
// TODO: use event to replace direct call
this.checkOwnerThread();
this.beforeWrite();
this.indexTx.doEliminate(this.serializer.writeIndex(index));
this.afterWrite();
}
public void removeVertices(VertexLabel vertexLabel) {
if (this.hasUpdate()) {
throw new HugeException("There are still changes to commit");
}
boolean autoCommit = this.autoCommit();
this.autoCommit(false);
// Commit data already in tx firstly
this.commit();
try {
this.traverseVerticesByLabel(vertexLabel, vertex -> {
this.removeVertex((HugeVertex) vertex);
this.commitIfGtSize(COMMIT_BATCH);
}, true);
this.commit();
} catch (Exception e) {
LOG.error("Failed to remove vertices", e);
throw new HugeException("Failed to remove vertices", e);
} finally {
this.autoCommit(autoCommit);
}
}
public void removeEdges(EdgeLabel edgeLabel) {
if (this.hasUpdate()) {
throw new HugeException("There are still changes to commit");
}
boolean autoCommit = this.autoCommit();
this.autoCommit(false);
// Commit data already in tx firstly
this.commit();
try {
if (this.store().features().supportsDeleteEdgeByLabel()) {
// TODO: Need to change to writeQuery!
this.doRemove(this.serializer.writeId(HugeType.EDGE_OUT,
edgeLabel.id()));
this.doRemove(this.serializer.writeId(HugeType.EDGE_IN,
edgeLabel.id()));
} else {
this.traverseEdgesByLabel(edgeLabel, edge -> {
this.removeEdge((HugeEdge) edge);
this.commitIfGtSize(COMMIT_BATCH);
}, true);
}
this.commit();
} catch (Exception e) {
LOG.error("Failed to remove edges", e);
throw new HugeException("Failed to remove edges", e);
} finally {
this.autoCommit(autoCommit);
}
}
public void traverseVerticesByLabel(VertexLabel label,
Consumer<Vertex> consumer,
boolean deleting) {
this.traverseByLabel(label, this::queryVertices, consumer, deleting);
}
public void traverseEdgesByLabel(EdgeLabel label, Consumer<Edge> consumer,
boolean deleting) {
this.traverseByLabel(label, this::queryEdges, consumer, deleting);
}
private <T> void traverseByLabel(SchemaLabel label,
Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean deleting) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
Query query = label.enableLabelIndex() ? new ConditionQuery(type) :
new Query(type);
query.capacity(Query.NO_CAPACITY);
query.limit(Query.NO_LIMIT);
if (this.store().features().supportsQueryByPage()) {
query.page(PageInfo.PAGE_NONE);
}
if (label.hidden()) {
query.showHidden(true);
}
query.showDeleting(deleting);
query.showExpired(deleting);
if (label.enableLabelIndex()) {
// Support label index, query by label index by paging
assert query instanceof ConditionQuery;
((ConditionQuery) query).eq(HugeKeys.LABEL, label.id());
Iterator<T> iter = fetcher.apply(query);
try {
// Fetch by paging automatically
while (iter.hasNext()) {
consumer.accept(iter.next());
/*
* Commit per batch to avoid too much data in single commit,
* especially for Cassandra backend
*/
this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);
}
// Commit changes if exists
this.commit();
} finally {
CloseableIterator.closeIterator(iter);
}
} else {
// Not support label index, query all and filter by label
if (query.paging()) {
query.limit(this.pageSize);
}
String page = null;
do {
Iterator<T> iter = fetcher.apply(query);
try {
while (iter.hasNext()) {
T e = iter.next();
SchemaLabel elemLabel = ((HugeElement) e).schemaLabel();
if (label.equals(elemLabel)) {
consumer.accept(e);
/*
* Commit per batch to avoid too much data in single
* commit, especially for Cassandra backend
*/
this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);
}
}
// Commit changes of every page before next page query
this.commit();
if (query.paging()) {
page = PageInfo.pageState(iter).toString();
query.page(page);
}
} finally {
CloseableIterator.closeIterator(iter);
}
} while (page != null);
}
}
public void createOlapPk(Id pkId) {
this.store().createOlapTable(pkId);
}
public void initAndRegisterOlapTable(Id pkId) {
this.store().checkAndRegisterOlapTable(pkId);
}
public void clearOlapPk(Id pkId) {
this.store().clearOlapTable(pkId);
}
public void removeOlapPk(Id pkId) {
this.store().removeOlapTable(pkId);
}
}