blob: c693a67b493630d23a6e49899176fd0dbb7133d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.grpc.kv.Kv;
import org.apache.hugegraph.pd.grpc.kv.V;
import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
import org.apache.hugegraph.pd.store.KV;
import org.springframework.stereotype.Service;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
/**
*
**/
@Slf4j
@Service
public class KvService {
public static final char KV_DELIMITER = '@';
private static final String TTL_PREFIX = "T";
private static final String KV_PREFIX = "K";
private static final String LOCK_PREFIX = "L";
private static final String KV_PREFIX_DELIMITER = KV_PREFIX + KV_DELIMITER;
private static final byte[] EMPTY_VALUE = new byte[0];
private final MetadataRocksDBStore meta;
private PDConfig pdConfig;
public KvService(PDConfig config) {
this.pdConfig = config;
meta = new MetadataRocksDBStore(config);
}
public static String getKey(Object... keys) {
StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
builder.append(KV_PREFIX).append(KV_DELIMITER);
for (Object key : keys) {
builder.append(key == null ? "" : key).append(KV_DELIMITER);
}
return builder.substring(0, builder.length() - 1);
}
public static byte[] getKeyBytes(Object... keys) {
String key = getKey(keys);
return key.getBytes(Charset.defaultCharset());
}
public static String getKeyWithoutPrefix(Object... keys) {
StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
for (Object key : keys) {
builder.append(key == null ? "" : key).append(KV_DELIMITER);
}
return builder.substring(0, builder.length() - 1);
}
public static String getDelimiter() {
return String.valueOf(KV_DELIMITER);
}
public PDConfig getPdConfig() {
return pdConfig;
}
public void setPdConfig(PDConfig pdConfig) {
this.pdConfig = pdConfig;
}
public void put(String key, String value) throws PDException {
V storeValue = V.newBuilder().setValue(value).setTtl(0).build();
meta.put(getStoreKey(key), storeValue.toByteArray());
// log.warn("add key with key-{}:value-{}", key, value);
}
public void put(String key, String value, long ttl) throws PDException {
long curTime = System.currentTimeMillis();
curTime += ttl;
V storeValue = V.newBuilder().setValue(value).setSt(ttl).setTtl(curTime).build();
meta.put(getStoreKey(key), storeValue.toByteArray());
meta.put(getTTLStoreKey(key, curTime), EMPTY_VALUE);
// log.warn("add key with key-{}:value-{}:ttl-{}", key, value, ttl);
}
public String get(String key) throws PDException {
byte[] storeKey = getStoreKey(key);
return get(storeKey);
}
public String get(byte[] keyBytes) throws PDException {
byte[] bytes = meta.getOne(keyBytes);
String v = getValue(keyBytes, bytes);
return v;
}
private String getValue(byte[] keyBytes, byte[] valueBytes) throws PDException {
if (valueBytes == null || valueBytes.length == 0) {
return "";
}
try {
V v = V.parseFrom(valueBytes);
if (v.getTtl() == 0 || v.getTtl() >= System.currentTimeMillis()) {
return v.getValue();
} else {
meta.remove(keyBytes);
meta.remove(getTTLStoreKey(new String(keyBytes), v.getTtl()));
}
} catch (Exception e) {
log.error("parse value with error:{}", e.getMessage());
throw new PDException(-1, e.getMessage());
}
return null;
}
public boolean keepAlive(String key) throws PDException {
byte[] bytes = meta.getOne(getStoreKey(key));
try {
if (bytes == null || bytes.length == 0) {
return false;
}
V v = V.parseFrom(bytes);
if (v != null) {
long ttl = v.getTtl();
long st = v.getSt();
meta.remove(getTTLStoreKey(key, ttl));
put(key, v.getValue(), st);
return true;
} else {
return false;
}
} catch (InvalidProtocolBufferException e) {
throw new PDException(-1, e.getMessage());
}
}
public Kv delete(String key) throws PDException {
byte[] storeKey = getStoreKey(key);
String value = this.get(storeKey);
meta.remove(storeKey);
Kv.Builder builder = Kv.newBuilder().setKey(key);
if (value != null) {
builder.setValue(value);
}
Kv kv = builder.build();
// log.warn("delete kv with key :{}", key);
return kv;
}
public List<Kv> deleteWithPrefix(String key) throws PDException {
byte[] storeKey = getStoreKey(key);
//TODO to many rows for scan
List<KV> kvList = meta.scanPrefix(storeKey);
LinkedList<Kv> kvs = new LinkedList<>();
for (KV kv : kvList) {
String kvKey = new String(kv.getKey()).replaceFirst(KV_PREFIX_DELIMITER, "");
String kvValue = getValue(kv.getKey(), kv.getValue());
if (kvValue != null) {
kvs.add(Kv.newBuilder().setKey(kvKey).setValue(kvValue).build());
}
}
meta.removeByPrefix(storeKey);
// log.warn("delete kv with key prefix :{}", key);
return kvs;
}
/**
* scan result ranged from key start and key end
*
* @param keyStart
* @param keyEnd
* @return Records
* @throws PDException
*/
public Map<String, String> scanRange(String keyStart, String keyEnd) throws PDException {
List<KV> list = meta.scanRange(getStoreKey(keyStart), getStoreKey(keyEnd));
Map<String, String> map = new HashMap<>();
for (KV kv : list) {
String kvKey = new String(kv.getKey()).replaceFirst(KV_PREFIX_DELIMITER, "");
String kvValue = getValue(kv.getKey(), kv.getValue());
if (kvValue != null) {
map.put(kvKey, kvValue);
}
}
return map;
}
public Map<String, String> scanWithPrefix(String key) throws PDException {
List<KV> kvList = meta.scanPrefix(getStoreKey(key));
HashMap<String, String> map = new HashMap<>();
for (KV kv : kvList) {
String kvKey = new String(kv.getKey()).replaceFirst(KV_PREFIX_DELIMITER, "");
String kvValue = getValue(kv.getKey(), kv.getValue());
if (kvValue != null) {
map.put(kvKey, kvValue);
}
}
return map;
}
public boolean locked(String key) throws PDException {
String lockKey = KvService.getKeyWithoutPrefix(KvService.LOCK_PREFIX, key);
Map<String, String> allLock = scanWithPrefix(lockKey);
return allLock != null && allLock.size() != 0;
}
private boolean owned(String key, long clientId) throws PDException {
String lockKey = KvService.getKeyWithoutPrefix(KvService.LOCK_PREFIX, key);
Map<String, String> allLock = scanWithPrefix(lockKey);
if (allLock.size() == 0) {
return true;
}
for (Map.Entry<String, String> entry : allLock.entrySet()) {
String entryKey = entry.getKey();
String[] split = entryKey.split(String.valueOf(KV_DELIMITER));
if (Long.valueOf(split[split.length - 1]).equals(clientId)) {
return true;
}
}
return false;
}
public boolean lock(String key, long ttl, long clientId) throws PDException {
//TODO lock improvement
synchronized (KvService.class) {
if (!owned(key, clientId)) {
return false;
}
put(getLockKey(key, clientId), " ", ttl);
return true;
}
}
public boolean lockWithoutReentrant(String key, long ttl,
long clientId) throws PDException {
synchronized (KvService.class) {
if (locked(key)) {
return false;
}
put(getLockKey(key, clientId), " ", ttl);
return true;
}
}
public boolean unlock(String key, long clientId) throws PDException {
synchronized (KvService.class) {
if (!owned(key, clientId)) {
return false;
}
delete(getLockKey(key, clientId));
return true;
}
}
public boolean keepAlive(String key, long clientId) throws PDException {
String lockKey = getLockKey(key, clientId);
return keepAlive(lockKey);
}
public String getLockKey(String key, long clientId) {
return getKeyWithoutPrefix(LOCK_PREFIX, key, clientId);
}
public byte[] getStoreKey(String key) {
return getKeyBytes(key);
}
public byte[] getTTLStoreKey(String key, long time) {
return getKeyBytes(TTL_PREFIX, time, key);
}
public void clearTTLData() {
try {
byte[] ttlStartKey = getTTLStoreKey("", 0);
byte[] ttlEndKey = getTTLStoreKey("", System.currentTimeMillis());
List<KV> kvList = meta.scanRange(ttlStartKey, ttlEndKey);
for (KV kv : kvList) {
String key = new String(kv.getKey());
int index = key.indexOf(KV_DELIMITER, 2);
String delKey = key.substring(index + 1);
delete(delKey);
meta.remove(kv.getKey());
}
} catch (Exception e) {
log.error("clear ttl data with error :", e);
}
}
public MetadataRocksDBStore getMeta() {
return meta;
}
}