blob: ffa8cdaab82d0d87ca338c59bcd192679041466b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.service;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import org.apache.hugegraph.pd.KvService;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.kv.K;
import org.apache.hugegraph.pd.grpc.kv.KResponse;
import org.apache.hugegraph.pd.grpc.kv.Kv;
import org.apache.hugegraph.pd.grpc.kv.KvResponse;
import org.apache.hugegraph.pd.grpc.kv.KvServiceGrpc;
import org.apache.hugegraph.pd.grpc.kv.LockRequest;
import org.apache.hugegraph.pd.grpc.kv.LockResponse;
import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse;
import org.apache.hugegraph.pd.grpc.kv.TTLRequest;
import org.apache.hugegraph.pd.grpc.kv.TTLResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchKv;
import org.apache.hugegraph.pd.grpc.kv.WatchRequest;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchState;
import org.apache.hugegraph.pd.grpc.kv.WatchType;
import org.apache.hugegraph.pd.raft.RaftEngine;
import org.apache.hugegraph.pd.raft.RaftStateListener;
import org.apache.hugegraph.pd.watch.KvWatchSubject;
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
/**
* kv 存储的核心实现类
*/
@Slf4j
@GRpcService
public class KvServiceGrpcImpl extends KvServiceGrpc.KvServiceImplBase implements RaftStateListener,
ServiceGrpc {
private final ManagedChannel channel = null;
KvService kvService;
AtomicLong count = new AtomicLong();
String msg = "node is not leader,it is necessary to redirect to the leader on the client";
@Autowired
private PDConfig pdConfig;
private KvWatchSubject subjects;
private ScheduledExecutorService executor;
@PostConstruct
public void init() {
RaftEngine.getInstance().init(pdConfig.getRaft());
RaftEngine.getInstance().addStateListener(this);
kvService = new KvService(pdConfig);
subjects = new KvWatchSubject(pdConfig);
executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(() -> {
if (isLeader()) {
subjects.keepClientAlive();
}
}, 0, KvWatchSubject.WATCH_TTL / 2, TimeUnit.MILLISECONDS);
}
/**
* 普通的 put
*
* @param request
* @param responseObserver
*/
@Override
public void put(Kv request, StreamObserver<KvResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getPutMethod(), request, responseObserver);
return;
}
KvResponse response;
KvResponse.Builder builder = KvResponse.newBuilder();
try {
String key = request.getKey();
String value = request.getValue();
this.kvService.put(key, value);
WatchKv watchKV = getWatchKv(key, value);
subjects.notifyAllObserver(key, WatchType.Put, new WatchKv[]{watchKV});
response = builder.setHeader(getResponseHeader()).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getPutMethod(), request, responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 普通的 get
*
* @param request
* @param responseObserver
*/
@Override
public void get(K request, StreamObserver<KResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getGetMethod(), request, responseObserver);
return;
}
KResponse response;
KResponse.Builder builder = KResponse.newBuilder();
try {
String value = this.kvService.get(request.getKey());
builder.setHeader(getResponseHeader());
if (value != null) {
builder.setValue(value);
}
response = builder.build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getGetMethod(), request, responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 普通的 delete
*
* @param request
* @param responseObserver
*/
@Override
public void delete(K request, StreamObserver<KvResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getDeleteMethod(), request, responseObserver);
return;
}
KvResponse response;
KvResponse.Builder builder = KvResponse.newBuilder();
try {
String key = request.getKey();
Kv deleted = this.kvService.delete(key);
if (deleted.getValue() != null) {
WatchKv watchKV = getWatchKv(deleted.getKey(), deleted.getValue());
subjects.notifyAllObserver(key, WatchType.Delete, new WatchKv[]{watchKV});
}
response = builder.setHeader(getResponseHeader()).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getDeleteMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 按前缀删除
*
* @param request
* @param responseObserver
*/
@Override
public void deletePrefix(K request, StreamObserver<KvResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getDeletePrefixMethod(), request,
responseObserver);
return;
}
KvResponse response;
KvResponse.Builder builder = KvResponse.newBuilder();
try {
String key = request.getKey();
List<Kv> kvs = this.kvService.deleteWithPrefix(key);
WatchKv[] watchKvs = new WatchKv[kvs.size()];
int i = 0;
for (Kv kv : kvs) {
WatchKv watchKV = getWatchKv(kv.getKey(), kv.getValue());
watchKvs[i++] = watchKV;
}
subjects.notifyAllObserver(key, WatchType.Delete, watchKvs);
response = builder.setHeader(getResponseHeader()).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getDeletePrefixMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 按前缀查询
*
* @param request
* @param responseObserver
*/
@Override
public void scanPrefix(K request, StreamObserver<ScanPrefixResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getScanPrefixMethod(), request,
responseObserver);
return;
}
ScanPrefixResponse response;
ScanPrefixResponse.Builder builder = ScanPrefixResponse.newBuilder();
try {
Map kvs = this.kvService.scanWithPrefix(request.getKey());
response = builder.setHeader(getResponseHeader()).putAllKvs(kvs).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getScanPrefixMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 获取随机非 0 字符串做 Id
*
* @return
*/
private long getRandomLong() {
long result;
Random random = new Random();
while ((result = random.nextLong()) == 0) {
continue;
}
return result;
}
/**
* 普通的 watch
*
* @param request
* @param responseObserver
*/
@Override
public void watch(WatchRequest request, StreamObserver<WatchResponse> responseObserver) {
if (!isLeader()) {
responseObserver.onError(new PDException(-1, msg));
return;
}
try {
clientWatch(request, responseObserver, false);
} catch (PDException e) {
if (!isLeader()) {
try {
responseObserver.onError(new PDException(-1, msg));
} catch (IllegalStateException ie) {
} catch (Exception e1) {
log.error("redirect with error: ", e1);
}
}
}
}
/**
* 普通的前缀 watch
*
* @param request
* @param responseObserver
*/
@Override
public void watchPrefix(WatchRequest request, StreamObserver<WatchResponse> responseObserver) {
if (!isLeader()) {
responseObserver.onError(new PDException(-1, msg));
return;
}
try {
clientWatch(request, responseObserver, true);
} catch (PDException e) {
if (!isLeader()) {
try {
responseObserver.onError(new PDException(-1, msg));
} catch (IllegalStateException ie) {
} catch (Exception e1) {
log.error("redirect with error: ", e1);
}
}
}
}
/**
* 上面两个方法的通用方式
*
* @param request
* @param responseObserver
* @param isPrefix
* @throws PDException
*/
private void clientWatch(WatchRequest request, StreamObserver<WatchResponse> responseObserver,
boolean isPrefix) throws PDException {
try {
String key = request.getKey();
long clientId = request.getClientId();
WatchResponse.Builder builder = WatchResponse.newBuilder();
WatchResponse response;
if (request.getState().equals(WatchState.Starting) && clientId == 0) {
clientId = getRandomLong();
response = builder.setClientId(clientId).setState(WatchState.Starting).build();
} else {
response = builder.setState(WatchState.Started).build();
}
String delimiter =
isPrefix ? KvWatchSubject.PREFIX_DELIMITER : KvWatchSubject.KEY_DELIMITER;
subjects.addObserver(key, clientId, responseObserver, delimiter);
synchronized (responseObserver) {
responseObserver.onNext(response);
}
} catch (PDException e) {
if (!isLeader()) {
throw new PDException(-1, msg);
}
throw new PDException(e.getErrorCode(), e);
}
}
/**
* 加锁
*
* @param request
* @param responseObserver
*/
@Override
public void lock(LockRequest request, StreamObserver<LockResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getLockMethod(), request, responseObserver);
return;
}
LockResponse response;
LockResponse.Builder builder = LockResponse.newBuilder();
try {
long clientId = request.getClientId();
if (clientId == 0) {
clientId = getRandomLong();
}
boolean locked = this.kvService.lock(request.getKey(), request.getTtl(), clientId);
response =
builder.setHeader(getResponseHeader()).setSucceed(locked).setClientId(clientId)
.build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getLockMethod(), request, responseObserver);
return;
}
log.error("lock with error :", e);
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void lockWithoutReentrant(LockRequest request,
StreamObserver<LockResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getLockWithoutReentrantMethod(), request,
responseObserver);
return;
}
LockResponse response;
LockResponse.Builder builder = LockResponse.newBuilder();
try {
long clientId = request.getClientId();
if (clientId == 0) {
clientId = getRandomLong();
}
boolean locked = this.kvService.lockWithoutReentrant(request.getKey(), request.getTtl(),
clientId);
response =
builder.setHeader(getResponseHeader()).setSucceed(locked).setClientId(clientId)
.build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getLockWithoutReentrantMethod(), request,
responseObserver);
return;
}
log.error("lock with error :", e);
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void isLocked(LockRequest request, StreamObserver<LockResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getIsLockedMethod(), request, responseObserver);
return;
}
LockResponse response;
LockResponse.Builder builder = LockResponse.newBuilder();
try {
boolean locked = this.kvService.locked(request.getKey());
response = builder.setHeader(getResponseHeader()).setSucceed(locked).build();
} catch (PDException e) {
log.error("lock with error :", e);
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getIsLockedMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 解锁
*
* @param request
* @param responseObserver
*/
@Override
public void unlock(LockRequest request, StreamObserver<LockResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getUnlockMethod(), request, responseObserver);
return;
}
LockResponse response;
LockResponse.Builder builder = LockResponse.newBuilder();
try {
long clientId = request.getClientId();
if (clientId == 0) {
throw new PDException(-1, "incorrect clientId: 0");
}
boolean unlocked = this.kvService.unlock(request.getKey(), clientId);
response = builder.setHeader(getResponseHeader()).setSucceed(unlocked)
.setClientId(clientId).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getUnlockMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 锁续活
*
* @param request
* @param responseObserver
*/
@Override
public void keepAlive(LockRequest request, StreamObserver<LockResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getKeepAliveMethod(), request,
responseObserver);
return;
}
LockResponse response;
LockResponse.Builder builder = LockResponse.newBuilder();
try {
long clientId = request.getClientId();
if (clientId == 0) {
throw new PDException(-1, "incorrect clientId: 0");
}
boolean alive = this.kvService.keepAlive(request.getKey(), clientId);
response =
builder.setHeader(getResponseHeader()).setSucceed(alive).setClientId(clientId)
.build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getKeepAliveMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 带超时时间的 put
*
* @param request
* @param responseObserver
*/
@Override
public void putTTL(TTLRequest request, StreamObserver<TTLResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getPutTTLMethod(), request, responseObserver);
return;
}
TTLResponse response;
TTLResponse.Builder builder = TTLResponse.newBuilder();
try {
this.kvService.put(request.getKey(), request.getValue(), request.getTtl());
response = builder.setHeader(getResponseHeader()).setSucceed(true).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getPutTTLMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 续活带有超时时间的 key
*
* @param request
* @param responseObserver
*/
@Override
public void keepTTLAlive(TTLRequest request, StreamObserver<TTLResponse> responseObserver) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getKeepTTLAliveMethod(), request,
responseObserver);
return;
}
TTLResponse response;
TTLResponse.Builder builder = TTLResponse.newBuilder();
try {
this.kvService.keepAlive(request.getKey());
response = builder.setHeader(getResponseHeader()).setSucceed(true).build();
} catch (PDException e) {
if (!isLeader()) {
redirectToLeader(channel, KvServiceGrpc.getKeepTTLAliveMethod(), request,
responseObserver);
return;
}
response = builder.setHeader(getResponseHeader(e)).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
private WatchKv getWatchKv(String key, String value) {
WatchKv kv = WatchKv.newBuilder().setKey(key).setValue(value).build();
return kv;
}
@Override
public void onRaftLeaderChanged() {
subjects.notifyClientChangeLeader();
}
}