blob: f0109a6235120a7bbfa5195be7a0bff2872943f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.watch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate;
import org.apache.hugegraph.pd.KvService;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.kv.WatchEvent;
import org.apache.hugegraph.pd.grpc.kv.WatchKv;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchState;
import org.apache.hugegraph.pd.grpc.kv.WatchType;
import org.apache.hugegraph.pd.raft.RaftEngine;
import org.apache.hugegraph.pd.store.RaftKVStore;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
/**
* watch订阅、响应处理类
**/
@Slf4j
public class KvWatchSubject {
public static final String KEY_DELIMITER = "KW";
public static final String PREFIX_DELIMITER = "PW";
public static final String ALL_PREFIX = "W";
public static final long WATCH_TTL = 20000L;
private static final ConcurrentMap<String, StreamObserver<WatchResponse>> clients =
new ConcurrentHashMap<>();
private final KvService kvService;
BiPredicate<String, String> equal = String::equals;
BiPredicate<String, String> startWith = String::startsWith;
/**
* 会使用以下三组key:
* clients -> W@KW@key@clientId
* rocksdb key1 ->W@KW@key@clientId
* rocksdb key2 ->W@clientId@KW@key@clientId
**/
public KvWatchSubject(PDConfig pdConfig) {
this.kvService = new KvService(pdConfig);
}
public String getWatchKey(String key, String watchDelimiter) {
return KvService.getKeyWithoutPrefix(ALL_PREFIX, watchDelimiter, key);
}
private void addWatchKey(String key, String delimiter, long clientId) throws PDException {
String watchKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId);
kvService.put(watchKey, "", WATCH_TTL);
String clientFirstKey =
KvService.getKeyWithoutPrefix(ALL_PREFIX, clientId, delimiter, key, clientId);
kvService.put(clientFirstKey, "", WATCH_TTL);
}
private void removeWatchKey(String key, String delimiter, long clientId) throws PDException {
String watchKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId);
kvService.delete(watchKey);
String clientFirstKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, clientId, delimiter, key);
kvService.deleteWithPrefix(clientFirstKey);
}
/**
* 增加观察者
*
* @param key 观察的key
* @param clientId 客户端标识
* @param observer
* @param delimiter 观察类型标识符,对前缀监听或者对key的监听可以通过此参数区分
* @throws PDException
*/
public void addObserver(String key, long clientId, StreamObserver<WatchResponse> observer,
String delimiter) throws PDException {
String keyWithoutPrefix =
KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId);
clients.putIfAbsent(keyWithoutPrefix, observer);
addWatchKey(key, delimiter, clientId);
log.info("client:{},start to watch key:{}", clientId, key);
}
public void removeObserver(String key, long clientId, String delimiter) throws PDException {
removeWatchKey(key, delimiter, clientId);
String keyWithoutPrefix =
KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId);
clients.remove(keyWithoutPrefix);
}
/**
* 通知观察者方法,key和prefix都使用此方法,predicate不同
*
* @param key
* @param watchType 观察类型,一般是增加和删除
* @param predicate 判断等于或者是前匹配,用来适配key或prefix观察
* @param kvs
* @throws PDException
*/
public void notifyObserver(String key, WatchType watchType,
BiPredicate<String, String> predicate,
WatchKv... kvs) throws PDException {
boolean isEqual = predicate.equals(equal);
String watchDelimiter = isEqual ? KEY_DELIMITER : PREFIX_DELIMITER;
String watchKeyPrefix = isEqual ? key : "";
String storeKey = getWatchKey(watchKeyPrefix, watchDelimiter);
Map<String, String> map = kvService.scanWithPrefix(storeKey);
String delimiter = String.valueOf(KvService.KV_DELIMITER);
WatchResponse watchResponse;
for (String keyAndClient : map.keySet()) {
String[] values = keyAndClient.split(delimiter);
assert values.length == 4;
String watchKey = values[2];
String c = values[3];
long clientId = Long.parseLong(c);
LinkedList<WatchEvent> watchEvents = new LinkedList<>();
for (WatchKv kv : kvs) {
String kvKey = kv.getKey();
boolean match = predicate.test(kvKey, watchKey);
if (!match) {
continue;
}
WatchKv watchKv =
WatchKv.newBuilder().setKey(kvKey).setValue(kv.getValue()).build();
WatchEvent event =
WatchEvent.newBuilder().setCurrent(watchKv).setType(watchType).build();
watchEvents.add(event);
}
StreamObserver<WatchResponse> observer = clients.get(keyAndClient);
watchResponse =
WatchResponse.newBuilder().setState(WatchState.Started).setClientId(clientId)
.addAllEvents(watchEvents).build();
try {
if (observer != null) {
synchronized (observer) {
// log.info("notifyObserver for clientId:{}", clientId);
observer.onNext(watchResponse);
}
} else {
log.info("cannot find StreamObserver for clientId:{}", clientId);
}
} catch (StatusRuntimeException ignored) {
} catch (Exception e) {
log.warn("notifyObserver with error:{}", clientId, e);
}
}
}
public void notifyAllObserver(String key, WatchType watchType, WatchKv[] kvs) throws
PDException {
notifyObserver(key, watchType, equal, kvs);
notifyObserver(key, watchType, startWith, kvs);
}
/**
* 续活客户端
* 1.往客户端发一个alive的消息,带重试哈
* 2.如果有响应,则续活之前保存的那两组key
* 3.如果多次都失败,则删除内存和rocksdb的数据
*/
public void keepClientAlive() {
WatchResponse testAlive = WatchResponse.newBuilder().setState(WatchState.Alive).build();
Set<Map.Entry<String, StreamObserver<WatchResponse>>> entries = clients.entrySet();
Map.Entry<String, StreamObserver<WatchResponse>>[] array =
entries.toArray(new Map.Entry[0]);
Arrays.stream(array).parallel().forEach(entry -> {
StreamObserver<WatchResponse> value = entry.getValue();
String key = entry.getKey();
String delimiter = KvService.getDelimiter();
String client = key.split(delimiter)[3];
String clientKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, client);
if (value == null) {
removeClient(null, key, clientKey);
}
boolean done = false;
String removes = client + KvService.KV_DELIMITER;
for (int i = 0; i < 3; i++) {
try {
synchronized (value) {
value.onNext(testAlive);
}
Map<String, String> clientKeys = kvService.scanWithPrefix(clientKey);
for (Map.Entry<String, String> keyEntry : clientKeys.entrySet()) {
String entryKey = keyEntry.getKey();
String aliveKey = entryKey.replaceFirst(removes, "");
boolean keepAliveKey = kvService.keepAlive(aliveKey);
boolean keepAliveEntry = kvService.keepAlive(entryKey);
// log.info("keep alive client:{},{}:{},{}:{}", client, aliveKey,
// keepAliveKey,
// entryKey,
// keepAliveEntry);
done = true;
}
break;
} catch (Exception e) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
log.info("keep alive client {} with error:{}", client, e);
}
}
}
if (!done) {
log.info("remove client {} for no data", client);
removeClient(value, key, clientKey);
}
});
}
private void removeClient(StreamObserver<WatchResponse> value, String key, String clientKey) {
try {
log.info("remove null observer,client:", clientKey);
if (RaftEngine.getInstance().isLeader()) {
kvService.deleteWithPrefix(clientKey);
} else {
// todo: delete records via client
var store = kvService.getMeta().getStore();
if (store instanceof RaftKVStore) {
((RaftKVStore) store).doRemoveByPrefix(kvService.getStoreKey(clientKey));
}
}
if (value != null) {
synchronized (value) {
value.onCompleted();
}
}
clients.remove(key);
} catch (PDException e) {
log.error("remove client with error:", e);
}
}
/**
* 通知客户端leader切换了,重连
*/
public void notifyClientChangeLeader() {
WatchResponse response =
WatchResponse.newBuilder().setState(WatchState.Leader_Changed).build();
for (Map.Entry<String, StreamObserver<WatchResponse>> entry : clients.entrySet()) {
StreamObserver<WatchResponse> value = entry.getValue();
String key = entry.getKey();
String client = key.split(KvService.getDelimiter())[3];
String clientKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, client);
if (value == null) {
removeClient(null, key, clientKey);
}
for (int i = 0; i < 3; i++) {
try {
synchronized (value) {
value.onNext(response);
}
removeClient(value, key, clientKey);
break;
} catch (Exception e) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
}
}
}
}