blob: 3515f12b9ec562b4af0643f26b42481066ff82f0 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.backend.store.raft;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendFeatures;
import com.baidu.hugegraph.backend.store.BackendMutation;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.BackendStoreProvider;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreAction;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Log;
public class RaftBackendStore implements BackendStore {
private static final Logger LOG = Log.logger(RaftBackendStore.class);
private final BackendStore store;
private final RaftSharedContext context;
private final ThreadLocal<MutationBatch> threadLocalBatch;
public RaftBackendStore(BackendStore store, RaftSharedContext context) {
this.store = store;
this.context = context;
this.threadLocalBatch = new ThreadLocal<>();
}
public BackendStore originStore() {
return this.store;
}
private RaftNode node() {
return this.context.node();
}
@Override
public String store() {
return this.store.store();
}
@Override
public String database() {
return this.store.database();
}
@Override
public BackendStoreProvider provider() {
return this.store.provider();
}
@Override
public boolean isSchemaStore() {
return this.store.isSchemaStore();
}
@Override
public synchronized void open(HugeConfig config) {
this.store.open(config);
}
@Override
public void close() {
this.store.close();
}
@Override
public boolean opened() {
return this.store.opened();
}
@Override
public void init() {
// this.submitAndWait(StoreCommand.INIT);
this.store.init();
}
@Override
public void clear(boolean clearSpace) {
byte value = clearSpace ? (byte) 1 : (byte) 0;
byte[] bytes = StoreCommand.wrap(value);
this.submitAndWait(StoreAction.CLEAR, bytes);
}
@Override
public boolean initialized() {
return this.store.initialized();
}
@Override
public void truncate() {
this.submitAndWait(StoreAction.TRUNCATE, null);
}
@Override
public void mutate(BackendMutation mutation) {
if (mutation.isEmpty()) {
return;
}
// Just add to local buffer
this.getOrNewBatch().add(mutation);
}
@Override
@SuppressWarnings("unchecked")
public Iterator<BackendEntry> query(Query query) {
return (Iterator<BackendEntry>) this.queryByRaft(
query, o -> store.query(query));
}
@Override
public Number queryNumber(Query query) {
return (Number) this.queryByRaft(query, o -> store.queryNumber(query));
}
@Override
public void beginTx() {
// Don't write raft log, commitTx(in statemachine) will call beginTx
}
@Override
public void commitTx() {
MutationBatch batch = this.getOrNewBatch();
try {
byte[] bytes = StoreSerializer.writeMutations(batch.mutations);
this.submitAndWait(StoreAction.COMMIT_TX, bytes);
} finally {
batch.clear();
}
}
@Override
public void rollbackTx() {
this.submitAndWait(StoreAction.ROLLBACK_TX, null);
}
@Override
public <R> R metadata(HugeType type, String meta, Object[] args) {
return this.store.metadata(type, meta, args);
}
@Override
public BackendFeatures features() {
return this.store.features();
}
@Override
public void increaseCounter(HugeType type, long increment) {
IncrCounter incrCounter = new IncrCounter(type, increment);
byte[] bytes = StoreSerializer.writeIncrCounter(incrCounter);
this.submitAndWait(StoreAction.INCR_COUNTER, bytes);
}
@Override
public long getCounter(HugeType type) {
return (Long) this.queryByRaft(type, o -> this.store.getCounter(type));
}
private Object submitAndWait(StoreAction action, byte[] data) {
StoreType type = this.context.storeType(this.store());
return this.submitAndWait(new StoreCommand(type, action, data));
}
private Object submitAndWait(StoreCommand command) {
StoreClosure closure = new StoreClosure(command);
return this.node().submitAndWait(command, closure);
}
private Object queryByRaft(Object query, Function<Object, Object> func) {
if (this.node().selfIsLeader() || !this.context.isSafeRead()) {
return func.apply(query);
}
RaftClosure<Object> future = new RaftClosure<>();
ReadIndexClosure readIndexClosure = new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
future.complete(status, () -> func.apply(query));
} else {
future.failure(status, new BackendException(
"Failed to execute query '%s' with read-index: %s",
query, status));
}
}
};
this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
try {
return future.waitFinished();
} catch (Throwable e) {
LOG.warn("Failed to execute query {} with read-index: {}",
query, future.status());
throw new BackendException("Failed to execute query", e);
}
}
private static class MutationBatch {
private List<BackendMutation> mutations;
public MutationBatch() {
this.mutations = new ArrayList<>();
}
public void add(BackendMutation mutation) {
this.mutations.add(mutation);
}
public void clear() {
this.mutations = new ArrayList<>();
}
}
private MutationBatch getOrNewBatch() {
MutationBatch batch = this.threadLocalBatch.get();
if (batch == null) {
batch = new MutationBatch();
this.threadLocalBatch.set(batch);
}
return batch;
}
public static class IncrCounter {
private HugeType type;
private long increment;
public IncrCounter(HugeType type, long increment) {
this.type = type;
this.increment = increment;
}
public HugeType type() {
return this.type;
}
public long increment() {
return this.increment;
}
}
}