blob: b61f07ac1de91418d9b65c096f144f073aefab70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.store;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.Pdpb;
import org.apache.hugegraph.pd.raft.KVOperation;
import org.apache.hugegraph.pd.raft.KVStoreClosure;
import org.apache.hugegraph.pd.raft.RaftEngine;
import org.apache.hugegraph.pd.raft.RaftStateMachine;
import org.apache.hugegraph.pd.raft.RaftTaskHandler;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RaftKVStore implements HgKVStore, RaftTaskHandler {
private final RaftEngine engine;
private final HgKVStore store;
public RaftKVStore(RaftEngine engine, HgKVStore store) {
this.engine = engine;
this.store = store;
}
@Override
public void init(PDConfig config) {
this.store.init(config);
this.engine.addTaskHandler(this);
}
private BaseKVStoreClosure createClosure() {
return new BaseKVStoreClosure() {
@Override
public void run(Status status) {
if (!status.isOk()) {
log.error("An exception occurred while performing the RAFT,{}",
status.getErrorMsg());
} else {
log.info("RAFT done!");
}
}
};
}
@Override
public void put(byte[] key, byte[] value) throws PDException {
KVOperation operation = KVOperation.createPut(key, value);
try {
applyOperation(operation).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
}
/**
* Queries can be read without rafting
*/
@Override
public byte[] get(byte[] key) throws PDException {
return store.get(key);
}
@Override
public List<KV> scanPrefix(byte[] prefix) {
return store.scanPrefix(prefix);
}
@Override
public long remove(byte[] bytes) throws PDException {
try {
applyOperation(KVOperation.createRemove(bytes)).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
return 0;
}
@Override
public long removeByPrefix(byte[] bytes) throws PDException {
try {
applyOperation(KVOperation.createRemoveByPrefix(bytes)).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
return 0;
}
@Override
public void clear() throws PDException {
try {
applyOperation(KVOperation.createClear()).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
}
@Override
public void putWithTTL(byte[] key, byte[] value, long ttl) throws PDException {
try {
applyOperation(KVOperation.createPutWithTTL(key, value, ttl)).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
}
@Override
public void putWithTTL(byte[] key, byte[] value, long ttl, TimeUnit timeUnit) throws
PDException {
try {
applyOperation(KVOperation.createPutWithTTL(key, value, ttl, timeUnit)).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
}
@Override
public List<byte[]> getListWithTTL(byte[] key) throws PDException {
return store.getListWithTTL(key);
}
@Override
public byte[] getWithTTL(byte[] key) throws PDException {
return store.getWithTTL(key);
}
@Override
public void removeWithTTL(byte[] key) throws PDException {
try {
applyOperation(KVOperation.createRemoveWithTTL(key)).get();
} catch (Exception e) {
throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
}
}
@Override
public void saveSnapshot(String snapshotPath) throws PDException {
store.saveSnapshot(snapshotPath);
}
@Override
public void loadSnapshot(String snapshotPath) throws PDException {
store.loadSnapshot(snapshotPath);
}
@Override
public List<KV> scanRange(byte[] start, byte[] end) {
return store.scanRange(start, end);
}
@Override
public void close() {
store.close();
}
/**
* Need to walk the real operation of Raft
*/
private void doPut(byte[] key, byte[] value) throws PDException {
store.put(key, value);
}
public long doRemove(byte[] bytes) throws PDException {
return this.store.remove(bytes);
}
public long doRemoveByPrefix(byte[] bytes) throws PDException {
return this.store.removeByPrefix(bytes);
}
public void doRemoveWithTTL(byte[] key) throws PDException {
this.store.removeWithTTL(key);
}
public void doClear() throws PDException {
this.store.clear();
}
public void doPutWithTTL(byte[] key, byte[] value, long ttl) throws PDException {
this.store.putWithTTL(key, value, ttl);
}
public void doPutWithTTL(byte[] key, byte[] value, long ttl, TimeUnit timeUnit) throws
PDException {
this.store.putWithTTL(key, value, ttl, timeUnit);
}
public void doSaveSnapshot(String snapshotPath) throws PDException {
this.store.saveSnapshot(snapshotPath);
}
public void doLoadSnapshot(String snapshotPath) throws PDException {
this.store.loadSnapshot(snapshotPath);
}
private <T> CompletableFuture<T> applyOperation(final KVOperation op) throws PDException {
CompletableFuture<T> future = new CompletableFuture<>();
try {
final Task task = new Task();
task.setData(ByteBuffer.wrap(op.toByteArray()));
task.setDone(new RaftStateMachine.RaftClosureAdapter(op, new KVStoreClosure() {
Object data;
Pdpb.Error error;
@Override
public Pdpb.Error getError() {
return error;
}
@Override
public void setError(Pdpb.Error error) {
this.error = error;
}
@Override
public Object getData() {
return data;
}
@Override
public void setData(Object data) {
this.data = data;
}
@Override
public void run(Status status) {
if (status.isOk()) {
future.complete((T) data);
} else {
RaftError raftError = status.getRaftError();
Pdpb.ErrorType type;
if (RaftError.EPERM.equals(raftError)) {
type = Pdpb.ErrorType.NOT_LEADER;
} else {
type = Pdpb.ErrorType.UNKNOWN;
}
error = Pdpb.Error.newBuilder().setType(type)
.setMessage(status.getErrorMsg())
.build();
future.completeExceptionally(
new PDException(error.getTypeValue()));
}
}
}));
this.engine.addTask(task);
return future;
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
}
private boolean isLeader() {
return this.engine.isLeader();
}
@Override
public boolean invoke(KVOperation op, KVStoreClosure response) throws PDException {
switch (op.getOp()) {
case KVOperation.GET:
break;
case KVOperation.PUT:
doPut(op.getKey(), op.getValue());
break;
case KVOperation.REMOVE:
doRemove(op.getKey());
break;
case KVOperation.PUT_WITH_TTL:
doPutWithTTL(op.getKey(), op.getValue(), (long) op.getArg());
break;
case KVOperation.PUT_WITH_TTL_UNIT:
Object[] arg = (Object[]) op.getArg();
doPutWithTTL(op.getKey(), op.getValue(), (long) arg[0], (TimeUnit) arg[1]);
break;
case KVOperation.REMOVE_BY_PREFIX:
doRemoveByPrefix(op.getKey());
break;
case KVOperation.REMOVE_WITH_TTL:
doRemoveWithTTL(op.getKey());
break;
case KVOperation.CLEAR:
doClear();
break;
case KVOperation.SAVE_SNAPSHOT:
doSaveSnapshot((String) op.getAttach());
break;
case KVOperation.LOAD_SNAPSHOT:
doLoadSnapshot((String) op.getAttach());
break;
default:
log.error("Err op {}", op.getOp());
}
return false;
}
}