blob: 9948efcf332628555fde6ffd66d8406b6e2b1375 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.tx;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.query.IdQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.query.QueryResults;
import org.slf4j.Logger;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.Transaction;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.serializer.AbstractSerializer;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendEntryIterator;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.collection.IdSet;
import com.google.common.util.concurrent.RateLimiter;
public abstract class AbstractTransaction implements Transaction {
protected static final Logger LOG = Log.logger(AbstractTransaction.class);
private final Thread ownerThread = Thread.currentThread();
private boolean autoCommit = false;
private boolean closed = false;
private boolean committing = false;
private boolean committing2Backend = false;
private final HugeGraphParams graph;
private final BackendStore store;
private BackendMutation mutation;
protected final AbstractSerializer serializer;
public AbstractTransaction(HugeGraphParams graph, BackendStore store) {
E.checkNotNull(graph, "graph");
E.checkNotNull(store, "store");
this.graph = graph;
this.serializer = this.graph.serializer();
this.store = store;
this.reset();
store.open(this.graph.configuration());
}
public HugeGraph graph() {
E.checkNotNull(this.graph, "graph");
return this.graph.graph();
}
protected HugeGraphParams params() {
E.checkNotNull(this.graph, "graph");
return this.graph;
}
protected BackendStore store() {
E.checkNotNull(this.store, "store");
return this.store;
}
public BackendFeatures storeFeatures() {
return this.store.features();
}
public boolean storeInitialized() {
return this.store.initialized();
}
public <R> R metadata(HugeType type, String meta, Object... args) {
return this.store().metadata(type, meta, args);
}
public String graphName() {
return this.params().name();
}
public GraphMode graphMode() {
return this.params().mode();
}
@Watched(prefix = "tx")
public Number queryNumber(Query query) {
LOG.debug("Transaction queryNumber: {}", query);
E.checkArgument(query.aggregate() != null,
"The aggregate must be set for number query: %s",
query);
Query squery = this.serializer.writeQuery(query);
this.beforeRead();
try {
return this.store.queryNumber(squery);
} finally {
this.afterRead();
}
}
@Watched(prefix = "tx")
public QueryResults<BackendEntry> query(Query query) {
LOG.debug("Transaction query: {}", query);
/*
* NOTE: it's dangerous if an IdQuery/ConditionQuery is empty
* check if the query is empty and its class is not the Query itself
*/
if (query.empty() && !query.getClass().equals(Query.class)) {
throw new BackendException("Query without any id or condition");
}
Query squery = this.serializer.writeQuery(query);
// Do rate limit if needed
RateLimiter rateLimiter = this.graph.readRateLimiter();
if (rateLimiter != null && query.resultType().isGraph()) {
double time = rateLimiter.acquire(1);
if (time > 0) {
LOG.debug("Waited for {}s to query", time);
}
BackendEntryIterator.checkInterrupted();
}
this.beforeRead();
try {
this.injectOlapPkIfNeeded(squery);
return new QueryResults<>(this.store.query(squery), query);
} finally {
this.afterRead(); // TODO: not complete the iteration currently
}
}
private void injectOlapPkIfNeeded(Query query) {
if (!query.resultType().isVertex() ||
!this.graph.readMode().showOlap()) {
return;
}
/*
* Control olap access by auth, only accessible olap property key
* will be queried
*/
Set<Id> olapPks = new IdSet(CollectionType.EC);
for (PropertyKey propertyKey : this.graph.graph().propertyKeys()) {
if (propertyKey.olap()) {
olapPks.add(propertyKey.id());
}
}
query.olapPks(olapPks);
}
@Watched(prefix = "tx")
public BackendEntry query(HugeType type, Id id) {
IdQuery idQuery = new IdQuery.OneIdQuery(type, id);
return this.query(idQuery).one();
}
public BackendEntry get(HugeType type, Id id) {
BackendEntry entry = this.query(type, id);
if (entry == null) {
throw new NotFoundException(
"Not found the %s entry with id '%s'",
type.readableName(), id);
}
return entry;
}
@Watched(prefix = "tx")
@Override
public void commit() throws BackendException {
LOG.debug("Transaction commit() [auto: {}]...", this.autoCommit);
this.checkOwnerThread();
if (this.closed) {
throw new BackendException("Transaction has been closed");
}
if (this.committing) {
// It is not allowed to recursively commit in a transaction
return;
}
if (!this.hasUpdate()) {
LOG.debug("Transaction has no data to commit({})", store());
return;
}
// Do rate limit if needed
RateLimiter rateLimiter = this.graph.writeRateLimiter();
if (rateLimiter != null) {
int size = this.mutationSize();
double time = size > 0 ? rateLimiter.acquire(size) : 0.0;
if (time > 0) {
LOG.debug("Waited for {}s to mutate {} item(s)", time, size);
}
BackendEntryIterator.checkInterrupted();
}
// Do commit
assert !this.committing : "Not allowed to commit when it's committing";
this.committing = true;
try {
this.commit2Backend();
} finally {
this.committing = false;
this.reset();
}
}
@Override
public void commitIfGtSize(int size) throws BackendException {
if (this.mutationSize() >= size) {
this.commit();
}
}
@Watched(prefix = "tx")
@Override
public void rollback() throws BackendException {
LOG.debug("Transaction rollback()...");
this.reset();
if (this.committing2Backend) {
this.rollbackBackend();
}
}
@Watched(prefix = "tx")
@Override
public void close() {
if (this.hasUpdate()) {
throw new BackendException("There are still changes to commit");
}
if (this.closed) {
return;
}
this.closed = true;
this.autoCommit = true; /* Let call after close() fail to commit */
this.store().close();
}
@Override
public boolean autoCommit() {
return this.autoCommit;
}
public boolean hasUpdate() {
return !this.mutation.isEmpty();
}
public boolean hasUpdate(HugeType type, Action action) {
return this.mutation.contains(type, action);
}
public int mutationSize() {
return this.mutation.size();
}
protected void autoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
protected void reset() {
if (this.mutation == null || !this.mutation.isEmpty()) {
this.mutation = new BackendMutation();
}
}
protected BackendMutation mutation() {
return this.mutation;
}
protected void commit2Backend() {
BackendMutation mutation = this.prepareCommit();
assert !mutation.isEmpty();
this.commitMutation2Backend(mutation);
}
protected void commitMutation2Backend(BackendMutation... mutations) {
assert mutations.length > 0;
this.committing2Backend = true;
// If an exception occurred, catch in the upper layer and rollback
this.store.beginTx();
for (BackendMutation mutation : mutations) {
this.store.mutate(mutation);
}
this.store.commitTx();
this.committing2Backend = false;
}
protected void rollbackBackend() {
this.committing2Backend = false;
this.store.rollbackTx();
}
protected BackendMutation prepareCommit() {
// For sub-class preparing data, nothing to do here
LOG.debug("Transaction prepareCommit()...");
return this.mutation();
}
protected void beforeWrite() {
// TODO: auto open()
}
protected void afterWrite() {
if (this.autoCommit()) {
this.commitOrRollback();
}
}
protected void beforeRead() {
if (this.autoCommit() && this.hasUpdate()) {
this.commitOrRollback();
}
}
protected void afterRead() {
// pass
}
protected void checkOwnerThread() {
if (Thread.currentThread() != this.ownerThread) {
throw new BackendException("Can't operate a tx in other threads");
}
}
@Watched(prefix = "tx")
public void commitOrRollback() {
LOG.debug("Transaction commitOrRollback()");
this.checkOwnerThread();
/*
* The mutation will be reset after commit, in order to log the
* mutation after failure, let's save it to a local variable.
*/
BackendMutation mutation = this.mutation();
try {
// Do commit
this.commit();
} catch (Throwable e1) {
LOG.error("Failed to commit changes:", e1);
// Do rollback
try {
this.rollback();
} catch (Throwable e2) {
LOG.error("Failed to rollback changes:\n {}", mutation, e2);
}
/*
* Rethrow the commit exception
* The e.getMessage maybe too long to see key information,
*/
throw new BackendException("Failed to commit changes: %s(%s)",
StringUtils.abbreviateMiddle(
e1.getMessage(), ".", 256),
HugeException.rootCause(e1));
}
}
@Watched(prefix = "tx")
public void doInsert(BackendEntry entry) {
this.doAction(Action.INSERT, entry);
}
@Watched(prefix = "tx")
public void doAppend(BackendEntry entry) {
this.doAction(Action.APPEND, entry);
}
@Watched(prefix = "tx")
public void doEliminate(BackendEntry entry) {
this.doAction(Action.ELIMINATE, entry);
}
@Watched(prefix = "tx")
public void doRemove(BackendEntry entry) {
this.doAction(Action.DELETE, entry);
}
@Watched(prefix = "tx")
public void doUpdateIfPresent(BackendEntry entry) {
this.doAction(Action.UPDATE_IF_PRESENT, entry);
}
@Watched(prefix = "tx")
public void doUpdateIfAbsent(BackendEntry entry) {
this.doAction(Action.UPDATE_IF_ABSENT, entry);
}
protected void doAction(Action action, BackendEntry entry) {
LOG.debug("Transaction {} entry {}", action, entry);
E.checkNotNull(entry, "entry");
this.mutation.add(entry, action);
}
}