blob: 661670b8a8010a91ee5b8db01ba06ba92b6033c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.meta;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.config.PDConfig;
import org.apache.hugegraph.pd.store.KV;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import lombok.extern.slf4j.Slf4j;
/**
* Implementation class for auto-increment ID.
*/
@Slf4j
public class IdMetaStore extends MetadataRocksDBStore {
private static final String ID_PREFIX = "@ID@";
private static final String CID_PREFIX = "@CID@";
private static final String CID_SLOT_PREFIX = "@CID_SLOT@";
private static final String CID_DEL_SLOT_PREFIX = "@CID_DEL_SLOT@";
private static final String SEPARATOR = "@";
private static final ConcurrentHashMap<String, Object> SEQUENCES = new ConcurrentHashMap<>();
private static final long CID_DEL_TIMEOUT = 24 * 3600 * 1000;
private final long clusterId;
public IdMetaStore(PDConfig pdConfig) {
super(pdConfig);
this.clusterId = pdConfig.getClusterId();
}
public static long bytesToLong(byte[] b) {
ByteBuffer buf = ByteBuffer.wrap(b);
return buf.getLong();
}
public static byte[] longToBytes(long l) {
ByteBuffer buf = ByteBuffer.wrap(new byte[Long.SIZE]);
buf.putLong(l);
buf.flip();
return buf.array();
}
/**
* Get auto-increment ID.
*
* @param key
* @param delta
* @return
* @throws PDException
*/
public long getId(String key, int delta) throws PDException {
Object probableLock = getLock(key);
byte[] keyBs = (ID_PREFIX + key).getBytes(Charset.defaultCharset());
synchronized (probableLock) {
byte[] bs = getOne(keyBs);
long current = bs != null ? bytesToLong(bs) : 0L;
long next = current + delta;
put(keyBs, longToBytes(next));
return current;
}
}
private Object getLock(String key) {
Object probableLock = new Object();
Object currentLock = SEQUENCES.putIfAbsent(key, probableLock);
if (currentLock != null) {
probableLock = currentLock;
}
return probableLock;
}
public void resetId(String key) throws PDException {
Object probableLock = new Object();
Object currentLock = SEQUENCES.putIfAbsent(key, probableLock);
if (currentLock != null) {
probableLock = currentLock;
}
byte[] keyBs = (ID_PREFIX + key).getBytes(Charset.defaultCharset());
synchronized (probableLock) {
removeByPrefix(keyBs);
}
}
/**
* Within 24 hours of deleting the cid identified by the name,
* repeat applying for the same name's cid to keep the same value.
* This design is to prevent inconsistent caching, causing data errors.
*
* @param key
* @param name cid identifier
* @param max
* @return
* @throws PDException
*/
public long getCId(String key, String name, long max) throws PDException {
// Check for expired cids to delete. The frequency of deleting graphs is relatively low,
// so this has little performance impact.
byte[] delKeyPrefix = (CID_DEL_SLOT_PREFIX +
key + SEPARATOR).getBytes(Charset.defaultCharset());
synchronized (this) {
scanPrefix(delKeyPrefix).forEach(kv -> {
long[] value = (long[]) deserialize(kv.getValue());
if (value.length >= 2) {
if (System.currentTimeMillis() - value[1] > CID_DEL_TIMEOUT) {
try {
delCId(key, value[0]);
remove(kv.getKey());
} catch (Exception e) {
log.error("Exception ", e);
}
}
}
});
// Restore key from delayed deletion queue
byte[] cidDelayKey = getCIDDelayKey(key, name);
byte[] value = getOne(cidDelayKey);
if (value != null) {
// Remove from delayed deletion queue
remove(cidDelayKey);
return ((long[]) deserialize(value))[0];
} else {
return getCId(key, max);
}
}
}
/**
* Add to the deletion queue for delayed deletion.
*/
public long delCIdDelay(String key, String name, long cid) throws PDException {
byte[] delKey = getCIDDelayKey(key, name);
put(delKey, serialize(new long[]{cid, System.currentTimeMillis()}));
return cid;
}
/**
* Get an auto-incrementing cyclic non-repeating ID. When the upper limit is reached, it
* starts from 0 again.
*
* @param key
* @param max the upper limit of the ID. After reaching this value, it starts incrementing
* from 0 again.
* @return
* @throws PDException
*/
public long getCId(String key, long max) throws PDException {
Object probableLock = getLock(key);
byte[] keyBs = (CID_PREFIX + key).getBytes(Charset.defaultCharset());
synchronized (probableLock) {
byte[] bs = getOne(keyBs);
long current = bs != null ? bytesToLong(bs) : 0L;
long last = current == 0 ? max - 1 : current - 1;
{ // Find an unused cid
List<KV> kvs = scanRange(genCIDSlotKey(key, current), genCIDSlotKey(key, max));
for (KV kv : kvs) {
if (current == bytesToLong(kv.getValue())) {
current++;
} else {
break;
}
}
}
if (current == max) {
current = 0;
List<KV> kvs = scanRange(genCIDSlotKey(key, current), genCIDSlotKey(key, last));
for (KV kv : kvs) {
if (current == bytesToLong(kv.getValue())) {
current++;
} else {
break;
}
}
}
if (current == last) {
return -1;
}
put(genCIDSlotKey(key, current), longToBytes(current));
put(keyBs, longToBytes(current + 1));
return current;
}
}
private byte[] genCIDSlotKey(String key, long value) {
byte[] keySlot = (CID_SLOT_PREFIX + key + SEPARATOR).getBytes(Charset.defaultCharset());
ByteBuffer buf = ByteBuffer.allocate(keySlot.length + Long.SIZE);
buf.put(keySlot);
buf.put(longToBytes(value));
return buf.array();
}
private byte[] getCIDDelayKey(String key, String name) {
byte[] bsKey = (CID_DEL_SLOT_PREFIX +
key + SEPARATOR +
name).getBytes(Charset.defaultCharset());
return bsKey;
}
/**
* Delete a cyclic ID and release its value.
*
* @param key
* @param cid
* @return
* @throws PDException
*/
public long delCId(String key, long cid) throws PDException {
return remove(genCIDSlotKey(key, cid));
}
private byte[] serialize(Object obj) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
Hessian2Output output = new Hessian2Output(bos);
output.writeObject(obj);
output.flush();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Object deserialize(byte[] bytes) {
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
Hessian2Input input = new Hessian2Input(bis);
Object obj = input.readObject();
input.close();
return obj;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}