blob: 62d14782dfe396bc725d9b1a91a9c3c3b748e504 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.NumericUtil;
import org.apache.hugegraph.util.StringEncoding;
import com.google.common.collect.ImmutableList;
public abstract class BackendTable<Session extends BackendSession, Entry> {
private final String table;
private final MetaDispatcher<Session> dispatcher;
public BackendTable(String table) {
this.table = table.toLowerCase();
this.dispatcher = new MetaDispatcher<>();
this.registerMetaHandlers();
}
public String table() {
return this.table;
}
public MetaDispatcher<Session> metaDispatcher() {
return this.dispatcher;
}
public void registerMetaHandler(String name, MetaHandler<Session> handler) {
this.dispatcher.registerMetaHandler(name, handler);
}
protected void registerMetaHandlers() {
// pass
}
public void updateIfPresent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
assert session == null || !session.hasChanges();
if (this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
session.commit();
}
}
}
}
public void updateIfAbsent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
assert session == null || !session.hasChanges();
if (!this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
session.commit();
}
}
}
}
/**
* Mapping query-type to table-type
* @param query origin query
* @return corresponding table type
*/
public static HugeType tableType(Query query) {
HugeType type = query.resultType();
// Mapping EDGE to EDGE_OUT/EDGE_IN
if (type == HugeType.EDGE) {
// We assume query OUT edges
type = HugeType.EDGE_OUT;
while (!(query instanceof ConditionQuery ||
query.originQuery() == null)) {
/*
* Some backends(like RocksDB) may trans ConditionQuery to
* IdQuery or IdPrefixQuery, so we should get the origin query.
*/
query = query.originQuery();
}
if (query.conditionsSize() > 0 && query instanceof ConditionQuery) {
ConditionQuery cq = (ConditionQuery) query;
// Does query IN edges
if (cq.condition(HugeKeys.DIRECTION) == Directions.IN) {
type = HugeType.EDGE_IN;
}
}
}
return type;
}
public static final String joinTableName(String prefix, String table) {
return prefix + "_" + table.toLowerCase();
}
public abstract void init(Session session);
public abstract void clear(Session session);
public abstract Iterator<BackendEntry> query(Session session, Query query);
public abstract Number queryNumber(Session session, Query query);
public abstract boolean queryExist(Session session, Entry entry);
public abstract void insert(Session session, Entry entry);
public abstract void delete(Session session, Entry entry);
public abstract void append(Session session, Entry entry);
public abstract void eliminate(Session session, Entry entry);
/****************************** ShardSplitter ******************************/
public abstract static class ShardSplitter<Session extends BackendSession> {
// The min shard size should >= 1M to prevent too many number of shards
protected static final int MIN_SHARD_SIZE = (int) Bytes.MB;
// We assume the size of each key-value is 100 bytes
protected static final int ESTIMATE_BYTES_PER_KV = 100;
public static final String START = "";
public static final String END = "";
private static final byte[] EMPTY = new byte[0];
public static final byte[] START_BYTES = new byte[]{0x0};
public static final byte[] END_BYTES = new byte[]{-1, -1, -1, -1,
-1, -1, -1, -1,
-1, -1, -1, -1,
-1, -1, -1, -1};
private final String table;
public ShardSplitter(String table) {
this.table = table;
}
public String table() {
return this.table;
}
public List<Shard> getSplits(Session session, long splitSize) {
E.checkArgument(splitSize >= MIN_SHARD_SIZE,
"The split-size must be >= %s bytes, but got %s",
MIN_SHARD_SIZE, splitSize);
long size = this.estimateDataSize(session);
if (size <= 0) {
size = this.estimateNumKeys(session) * ESTIMATE_BYTES_PER_KV;
}
double count = Math.ceil(size / (double) splitSize);
if (count <= 0) {
count = 1;
}
long maxKey = this.maxKey();
Double each = maxKey / count;
List<Shard> splits = new ArrayList<>((int) count);
String last = START;
long offset = 0L;
while (offset < maxKey) {
offset += each.longValue();
if (offset > maxKey) {
splits.add(new Shard(last, END, 0L));
break;
}
String current = this.position(offset);
splits.add(new Shard(last, current, 0L));
last = current;
}
return splits;
}
public final String position(long position) {
return String.valueOf(position);
}
public byte[] position(String position) {
if (END.equals(position)) {
return null;
}
int value = Long.valueOf(position).intValue();
return NumericUtil.intToBytes(value);
}
protected long maxKey() {
return BytesBuffer.UINT32_MAX;
}
protected abstract long estimateDataSize(Session session);
protected abstract long estimateNumKeys(Session session);
public static class Range {
private byte[] startKey;
private byte[] endKey;
public Range(byte[] startKey, byte[] endKey) {
this.startKey = Arrays.equals(EMPTY, startKey) ?
START_BYTES : startKey;
this.endKey = Arrays.equals(EMPTY, endKey) ? END_BYTES : endKey;
}
public List<Shard> splitEven(int count) {
if (count <= 1) {
return ImmutableList.of(new Shard(startKey(this.startKey),
endKey(this.endKey), 0));
}
byte[] start;
byte[] end;
boolean startChanged = false;
boolean endChanged = false;
int length;
if (this.startKey.length < this.endKey.length) {
length = this.endKey.length;
start = new byte[length];
System.arraycopy(this.startKey, 0, start, 0,
this.startKey.length);
end = this.endKey;
startChanged = true;
} else if (this.startKey.length > this.endKey.length) {
length = this.startKey.length;
end = new byte[length];
System.arraycopy(this.endKey, 0, end, 0,
this.endKey.length);
start = this.startKey;
endChanged = true;
} else {
assert this.startKey.length == this.endKey.length;
length = this.startKey.length;
start = this.startKey;
end = this.endKey;
}
assert count > 1;
byte[] each = align(new BigInteger(1, subtract(end, start))
.divide(BigInteger.valueOf(count))
.toByteArray(),
length);
byte[] offset = start;
byte[] last = offset;
boolean finished = false;
List<Shard> shards = new ArrayList<>(count);
while (Bytes.compare(offset, end) < 0 && !finished) {
offset = add(offset, each);
if (offset.length > end.length ||
Bytes.compare(offset, end) > 0) {
offset = end;
}
if (startChanged) {
last = this.startKey;
startChanged = false;
}
if (endChanged && Arrays.equals(offset, end)) {
offset = this.endKey;
finished = true;
}
shards.add(new Shard(startKey(last), endKey(offset), 0));
last = offset;
}
return shards;
}
private static String startKey(byte[] start) {
return Arrays.equals(start, START_BYTES) ?
START : StringEncoding.encodeBase64(start);
}
private static String endKey(byte[] end) {
return Arrays.equals(end, END_BYTES) ?
END : StringEncoding.encodeBase64(end);
}
private static byte[] add(byte[] array1, byte[] array2) {
E.checkArgument(array1.length == array2.length,
"The length of array should be equal");
int length = array1.length;
byte[] result = new byte[length];
int carry = 0;
for (int i = length - 1; i >= 0; i--) {
int i1 = byte2int(array1[i]);
int i2 = byte2int(array2[i]);
int col = i1 + i2 + carry;
carry = (col >> 8);
result[i] = int2byte(col);
}
if (carry == 0) {
return result;
}
byte[] target = new byte[length + 1];
target[0] = 0x1;
System.arraycopy(result, 0, target, 1, length);
return target;
}
private static byte[] subtract(byte[] array1, byte[] array2) {
E.checkArgument(array1.length == array2.length,
"The length of array should be equal");
int length = array1.length;
byte[] result = new byte[length];
int borrow = 0;
for (int i = length - 1; 0 <= i; i--) {
int i1 = byte2int(array1[i]);
int i2 = byte2int(array2[i]);
int col = i1 - i2 + borrow;
borrow = (col >> 8);
result[i] = int2byte(col);
}
E.checkArgument(borrow == 0, "The array1 must >= array2");
return result;
}
public static byte[] increase(byte[] array) {
int length = array.length;
byte[] target = new byte[length + 1];
System.arraycopy(array, 0, target, 0, length);
return target;
}
private static byte[] align(byte[] array, int length) {
int len = array.length;
E.checkArgument(len <= length,
"The length of array '%s' exceed " +
"align length '%s'", len, length);
byte[] target = new byte[length];
System.arraycopy(array, 0, target, length - len, len);
return target;
}
private static int byte2int(byte b) {
return (b & 0x000000ff);
}
private static byte int2byte(int i) {
return (byte) (i & 0x000000ff);
}
}
}
}