blob: 066f96893dea9ec08232be882ddf886108682a85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.store.client;
import static java.util.stream.Collectors.groupingBy;
import static org.apache.hugegraph.store.client.util.HgAssert.isArgumentNotNull;
import static org.apache.hugegraph.store.client.util.HgAssert.isArgumentValid;
import static org.apache.hugegraph.store.client.util.HgAssert.isFalse;
import static org.apache.hugegraph.store.client.util.HgStoreClientConst.EMPTY_STRING;
import static org.apache.hugegraph.store.client.util.HgStoreClientUtil.err;
import static org.apache.hugegraph.store.client.util.HgStoreClientUtil.toStr;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hugegraph.store.HgKvEntry;
import org.apache.hugegraph.store.HgKvIterator;
import org.apache.hugegraph.store.HgKvOrderedIterator;
import org.apache.hugegraph.store.HgOwnerKey;
import org.apache.hugegraph.store.HgScanQuery;
import org.apache.hugegraph.store.HgStoreSession;
import org.apache.hugegraph.store.client.grpc.KvBatchScanner;
import org.apache.hugegraph.store.client.grpc.KvCloseableIterator;
import org.apache.hugegraph.store.client.util.HgAssert;
import org.apache.hugegraph.store.client.util.HgStoreClientConst;
import org.apache.hugegraph.store.client.util.HgStoreClientUtil;
import org.apache.hugegraph.store.grpc.stream.ScanStreamReq.Builder;
import org.apache.hugegraph.store.term.HgPair;
import org.apache.hugegraph.store.term.HgTriple;
import lombok.extern.slf4j.Slf4j;
/**
* created on 2022/01/19
*
* @version 0.6.0 added batch scan on 2022/03/03
*/
@Slf4j
@NotThreadSafe
class NodeTxSessionProxy implements HgStoreSession {
private final HgStoreNodeManager nodeManager;
private final HgStoreNodePartitioner nodePartitioner;
private final String graphName;
private final NodeTxExecutor txExecutor;
NodeTxSessionProxy(String graphName, HgStoreNodeManager nodeManager) {
this.nodeManager = nodeManager;
this.graphName = graphName;
this.nodePartitioner = this.nodeManager.getNodePartitioner();
this.txExecutor = NodeTxExecutor.graphOf(this.graphName, this);
isFalse(this.nodePartitioner == null,
"Failed to retrieve the node-partitioner from node-manager.");
}
@Override
public void beginTx() {
this.txExecutor.setTx(true);
}
@Override
public void commit() {
this.txExecutor.commitTx();
}
@Override
public void rollback() {
this.txExecutor.rollbackTx();
}
@Override
public boolean isTx() {
return this.txExecutor.isTx();
}
@Override
public boolean put(String table, HgOwnerKey ownerKey, byte[] value) {
// isArgumentValid(table, "table");
// isArgumentNotNull(ownerKey, "ownerKey");
// log.info("put -> graph: {}, table: {}, key: {}, value: {}",
// graphName, table, ownerKey, toByteStr(value));
// return this.txExecutor.prepareTx(
// () -> getNodeStream(table, ownerKey),
// e -> e.session.put(table, e.data.getKey(), value)
// );
return this.txExecutor.prepareTx(new HgTriple(table, ownerKey, null),
e -> e.getSession().put(table,
e.getKey(),
value));
}
@Override
public boolean directPut(String table, int partitionId, HgOwnerKey ownerKey, byte[] value) {
isArgumentValid(table, "table");
isArgumentNotNull(ownerKey, "ownerKey");
return this.txExecutor.prepareTx(
new HgTriple(table, ownerKey, partitionId),
e -> e.getSession().put(table, e.getKey(), value)
);
}
@Override
public boolean delete(String table, HgOwnerKey ownerKey) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(ownerKey == null, "The argument is invalid: ownerKey");
if (log.isDebugEnabled()) {
log.debug("delete -> graph: {}, table: {}, key: {}"
, graphName, table, toStr(ownerKey));
}
return this.txExecutor
.prepareTx(
new HgTriple(table, ownerKey, null),
e -> e.getSession().delete(table, e.getKey())
);
}
@Override
public boolean deleteSingle(String table, HgOwnerKey ownerKey) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(ownerKey == null, "The argument is invalid: ownerKey");
if (log.isDebugEnabled()) {
log.debug("deleteSingle -> graph: {}, table: {}, key: {}"
, graphName, table, toStr(ownerKey));
}
return this.txExecutor
.prepareTx(
new HgTriple(table, ownerKey, null),
e -> e.getSession().deleteSingle(table, e.getKey())
);
}
@Override
public boolean deletePrefix(String table, HgOwnerKey prefix) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(prefix == null, "The argument is invalid: prefix");
if (log.isDebugEnabled()) {
log.debug("deletePrefix -> graph: {}, table: {}, prefix: {}"
, graphName, table, toStr(prefix));
}
return this.txExecutor
.prepareTx(
new HgTriple(table, prefix, null),
e -> e.getSession().deletePrefix(table, e.getKey())
);
}
@Override
public boolean deleteRange(String table, HgOwnerKey start, HgOwnerKey end) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(start == null, "The argument is invalid: start");
HgAssert.isFalse(end == null, "The argument is invalid: end");
if (log.isDebugEnabled()) {
log.debug("deleteRange -> graph: {}, table: {}, start: {}, end: {}"
, graphName, table, toStr(start), toStr(end));
}
return this.txExecutor
.prepareTx(
new HgTriple(table, start, end),
e -> e.getSession().deleteRange(table, e.getKey(), e.getEndKey())
);
}
@Override
public boolean merge(String table, HgOwnerKey key, byte[] value) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(key == null, "The argument is invalid: key");
HgAssert.isFalse(value == null, "The argument is invalid: value");
if (log.isDebugEnabled()) {
log.debug("merge -> graph: {}, table: {}, key: {}, value: {}"
, graphName, table, toStr(key), toStr(value));
}
return this.txExecutor
.prepareTx(
new HgTriple(table, key, value),
e -> e.getSession().merge(table, e.getKey(), value)
);
}
/*--- tx end ---*/
@Override
public byte[] get(String table, HgOwnerKey ownerKey) {
isArgumentValid(table, "table");
isArgumentNotNull(ownerKey, "ownerKey");
return this.txExecutor
.limitOne(
() -> this.getNodeStream(table, ownerKey),
e -> e.session.get(table, e.data.getKey()), HgStoreClientConst.EMPTY_BYTES
);
}
@Override
public boolean clean(int partId) {
Collection<HgNodePartition> nodes = this.doPartition("", partId);
return nodes.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.clean(partId)
).findFirst().get();
}
@Override
@Deprecated
public List<HgKvEntry> batchGetOwner(String table, List<HgOwnerKey> keyList) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(HgAssert.isInvalid(keyList), "The argument is invalid: keyList");
return this.txExecutor
.toList(
(l) -> this.getStoreNode(l),
keyList,
key -> this.toNodeTkvList(table, key, key).stream(),
e -> e.session.batchGetOwner(table, e.data)
);
}
@Override
public HgKvIterator<HgKvEntry> batchPrefix(String table, List<HgOwnerKey> keyList) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(HgAssert.isInvalid(keyList), "The argument is invalid: keyList");
return this.toHgKvIteratorProxy(
this.txExecutor
.toList(
(l) -> this.getStoreNode(l),
keyList,
key -> this.toNodeTkvList(table, key, key).stream(),
e -> Collections.singletonList(e.session.batchPrefix(table, e.data))
)
, Long.MAX_VALUE);
}
@Override
public boolean truncate() {
return this.txExecutor
.isAllTrue(
() -> this.getNodeStream(EMPTY_STRING),
e -> e.session.truncate()
);
}
@Override
public boolean existsTable(String table) {
return this.txExecutor
.ifAnyTrue(
() -> this.getNodeStream(EMPTY_STRING),
e -> e.session.existsTable(table)
);
}
@Override
public boolean createTable(String table) {
return this.txExecutor
.isAllTrue(
() -> this.getNodeStream(EMPTY_STRING),
e -> e.session.createTable(table)
);
}
@Override
public boolean deleteTable(String table) {
return this.txExecutor
.isAllTrue(
() -> this.getNodeStream(EMPTY_STRING),
e -> e.session.deleteTable(table)
);
}
@Override
public boolean dropTable(String table) {
return this.txExecutor
.isAllTrue(
() -> this.getNodeStream(table),
e -> e.session.dropTable(table)
);
}
@Override
public boolean deleteGraph(String graph) {
return this.txExecutor
.isAllTrue(
() -> this.getNodeStream(EMPTY_STRING),
e -> e.session.deleteGraph(graph)
);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table) {
return scanIterator(table, 0);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, byte[] query) {
return scanIterator(table, 0, query);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, long limit) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), limit)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, long limit, byte[] query) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), limit, query)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey keyPrefix) {
return scanIterator(table, keyPrefix, 0);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey keyPrefix, long limit) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(keyPrefix == null, "The argument is invalid: keyPrefix");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, keyPrefix)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), limit)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey keyPrefix, long limit,
byte[] query) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(keyPrefix == null, "The argument is invalid: keyPrefix");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, keyPrefix)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), limit, query)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey startKey,
HgOwnerKey endKey) {
return this.scanIterator(table, startKey, endKey, 0, null);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey startKey,
HgOwnerKey endKey, long limit) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(startKey == null, "The argument is invalid: startKey");
HgAssert.isFalse(endKey == null, "The argument is invalid: endKey");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, startKey, endKey)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), e.getEndKey(), limit)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey startKey, HgOwnerKey endKey
, long limit, byte[] query) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(startKey == null, "The argument is invalid: startKey");
HgAssert.isFalse(endKey == null, "The argument is invalid: endKey");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, startKey, endKey)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), e.getEndKey(), limit,
query)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, HgOwnerKey startKey, HgOwnerKey endKey
, long limit, int scanType, byte[] query) {
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
HgAssert.isFalse(startKey == null, "The argument is invalid: startKey");
HgAssert.isFalse(endKey == null, "The argument is invalid: endKey");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, startKey, endKey)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable(), e.getKey(), e.getEndKey(), limit,
scanType, query)
)
.collect(Collectors.toList())
, limit);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(String table, int codeFrom, int codeTo,
int scanType, byte[] query) {
if (log.isDebugEnabled()) {
log.debug("graph: {}, table: {}, codeFrom: {}, codeTo: {}, scanType: {}, query: {}"
, graphName, table, codeFrom, codeTo, scanType, HgStoreClientUtil.toStr(query));
}
HgAssert.isFalse(HgAssert.isInvalid(table), "The argument is invalid: table");
return this.toHgKvIteratorProxy(
this.toNodeTkvList(table, codeFrom, codeTo)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.scanIterator(e.getTable()
, e.getKey().getKeyCode()
, e.getEndKey().getKeyCode(),
scanType, query)
)
.collect(Collectors.toList())
, 0);
}
@Override
public HgKvIterator<HgKvEntry> scanIterator(Builder scanReqBuilder) {
List<NodeTkv> nodeTKvs = this.toNodeTkvList(scanReqBuilder);
Function<NodeTkv, HgKvIterator<HgKvEntry>> hgKvIteratorFunction = e -> {
HgStoreSession session = this.getStoreNode(e.getNodeId())
.openSession(this.graphName);
return session.scanIterator(scanReqBuilder);
};
List<HgKvIterator> iterators = nodeTKvs.parallelStream()
.map(hgKvIteratorFunction)
.collect(Collectors.toList());
return this.toHgKvIteratorProxy(iterators, scanReqBuilder.getLimit());
}
@Override
public long count(String table) {
return this.toNodeTkvList(table)
.parallelStream()
.map(
e -> this.getStoreNode(e.getNodeId()).openSession(this.graphName)
.count(e.getTable())
)
.collect(Collectors.summingLong(l -> l));
}
@Override
public List<HgKvIterator<HgKvEntry>> scanBatch(HgScanQuery scanQuery) {
HgAssert.isArgumentNotNull(scanQuery, "scanQuery");
return this.toTkvMapFunc(scanQuery.getScanMethod())
.apply(scanQuery)
.entrySet()
.stream()
.map(e ->
this.getStoreNode(e.getKey())
.openSession(this.graphName)
.scanBatch(toScanQueryFunc(scanQuery.getScanMethod())
.apply(scanQuery.getTable(), e.getValue())
.setQuery(scanQuery.getQuery())
.setLimit(scanQuery.getLimit())
.setPerKeyLimit(scanQuery.getPerKeyLimit())
.setPerKeyMax((scanQuery.getPerKeyMax()))
.setScanType(scanQuery.getScanType())
.build()
)
)
//.peek(e->log.info("{}",e))
.flatMap(List::stream)
.collect(Collectors.toList());
}
@Override
public KvCloseableIterator<HgKvIterator<HgKvEntry>> scanBatch2(HgScanQuery scanQuery) {
return scanBatch3(scanQuery, null);
}
@Override
public KvCloseableIterator<HgKvIterator<HgKvEntry>> scanBatch3(HgScanQuery scanQuery,
KvCloseableIterator iterator) {
KvCloseableIterator notifierWrap = KvBatchScanner.ofMerger(scanQuery, (query, notifier) -> {
Map<Long, List<NodeTkv>> nodeTkvs = this.toTkvMapFunc(scanQuery.getScanMethod())
.apply(query);
nodeTkvs.forEach((storeId, tkvs) -> {
this.getStoreNode(storeId)
.openSession(this.graphName)
.scanBatch3(toScanQueryFunc(scanQuery.getScanMethod())
.apply(scanQuery.getTable(), tkvs)
.setQuery(scanQuery.getQuery())
.setLimit(scanQuery.getLimit())
.setSkipDegree(scanQuery.getSkipDegree())
.setPerKeyLimit(scanQuery.getPerKeyLimit())
.setPerKeyMax((scanQuery.getPerKeyMax()))
.setScanType(scanQuery.getScanType())
.setOrderType(scanQuery.getOrderType())
.build(), notifier
);
});
return true;
});
return notifierWrap;
}
private Function<HgScanQuery, Map<Long, List<NodeTkv>>> toTkvMapFunc(
HgScanQuery.ScanMethod scanMethod) {
switch (scanMethod) {
case RANGE:
return scanQuery -> {
List<HgOwnerKey> starts = scanQuery.getStartList();
List<HgOwnerKey> ends = scanQuery.getEndList();
int size = starts.size();
return IntStream.range(0, size)
.boxed()
.map(i -> this.toNodeTkvList(scanQuery.getTable(),
starts.get(i), ends.get(i)))
.flatMap(List::stream)
.collect(groupingBy(NodeTkv::getNodeId));
};
case PREFIX:
return scanQuery ->
scanQuery.getPrefixList()
.stream()
.map(keyPrefix -> this.toNodeTkvList(scanQuery.getTable(),
keyPrefix))
.flatMap(List::stream)
.collect(groupingBy(NodeTkv::getNodeId));
default:
return scanQuery -> this.toNodeTkvList(scanQuery.getTable())
.stream()
.collect(groupingBy(NodeTkv::getNodeId));
}
}
private BiFunction<String, List<NodeTkv>, HgScanQuery.ScanBuilder> toScanQueryFunc(
HgScanQuery.ScanMethod scanMethod) {
switch (scanMethod) {
case RANGE:
return (table, tkvList) -> {
List<HgOwnerKey> startList = new LinkedList();
List<HgOwnerKey> endList = new LinkedList();
tkvList.stream().forEach(e -> {
startList.add(e.getKey());
endList.add(e.getEndKey());
});
return HgScanQuery.ScanBuilder.rangeOf(table, startList, endList);
};
case PREFIX:
return (table, tkvList) ->
HgScanQuery.ScanBuilder.prefixOf(table,
tkvList.stream()
.map(e -> e.getKey())
.collect(Collectors.toList())
);
default:
return (table, tkvList) -> HgScanQuery.ScanBuilder.tableOf(table);
}
}
/*-- common --*/
private HgKvIterator toHgKvIteratorProxy(List<HgKvIterator> iteratorList, long limit) {
boolean isAllOrderedLimiter = iteratorList.stream()
.allMatch(
e -> e instanceof HgKvOrderedIterator);
HgKvIterator<HgKvEntry> iterator;
if (isAllOrderedLimiter) {
iterator = new SequencedIterator(iteratorList.stream()
.map(e -> (HgKvOrderedIterator) e)
.collect(Collectors.toList()), limit);
} else {
iterator = new TopWorkIteratorProxy(iteratorList, limit);
}
return iterator;
}
HgStoreNode getStoreNode(Long nodeId) {
HgStoreNode res = this.nodeManager.applyNode(this.graphName, nodeId);
if (res == null) {
throw err("Failed to apply for an instance of HgStoreNode from node-manager.");
}
return res;
}
public boolean doAction(String table, HgOwnerKey startKey, HgOwnerKey endKey,
Function<NodeTkv, Boolean> action) {
Collection<HgNodePartition> partitions =
doPartition(table, startKey.getOwner(), endKey.getOwner());
for (HgNodePartition partition : partitions) {
HgStoreNode storeNode = this.getStoreNode(partition.getNodeId());
HgStoreSession session = this.txExecutor.openNodeSession(storeNode);
NodeTkv data = new NodeTkv(partition, table, startKey, endKey);
data.setSession(session);
if (!action.apply(data)) {
return false;
}
}
return true;
}
public boolean doAction(String table, HgOwnerKey startKey, Integer code,
Function<NodeTkv, Boolean> action) {
Collection<HgNodePartition> partitions = this.doPartition(table, code);
for (HgNodePartition partition : partitions) {
HgStoreNode storeNode = this.getStoreNode(partition.getNodeId());
HgStoreSession session = this.txExecutor.openNodeSession(storeNode);
NodeTkv data = new NodeTkv(partition, table, startKey, code);
data.setSession(session);
if (!action.apply(data)) {
return false;
}
}
return true;
}
private List<NodeTkv> toNodeTkvList(Builder scanReqBuilder) {
// TODO 使用builder获取owner
String table = scanReqBuilder.getTable();
HgOwnerKey ownerKey = HgStoreClientConst.ALL_PARTITION_OWNER_KEY;
byte[] allOwner = ownerKey.getOwner();
Collection<HgNodePartition> partitions = doPartition(table,
allOwner,
allOwner);
List<NodeTkv> nodeTkvs = new ArrayList<>(partitions.size());
for (HgNodePartition partition : partitions) {
nodeTkvs.add(new NodeTkv(partition, table, ownerKey, ownerKey));
}
return nodeTkvs;
}
private List<NodeTkv> toNodeTkvList(String table) {
Collection<HgNodePartition> partitions = doPartition(table,
HgStoreClientConst.ALL_PARTITION_OWNER_KEY.getOwner(),
HgStoreClientConst.ALL_PARTITION_OWNER_KEY.getOwner());
ArrayList<NodeTkv> nodeTkvs = new ArrayList<>(partitions.size());
for (HgNodePartition partition : partitions) {
nodeTkvs.add(new NodeTkv(partition, table, HgStoreClientConst.ALL_PARTITION_OWNER_KEY,
HgStoreClientConst.ALL_PARTITION_OWNER_KEY));
}
return nodeTkvs;
}
private List<NodeTkv> toNodeTkvList(String table, HgOwnerKey ownerKey) {
Collection<HgNodePartition> partitions =
doPartition(table, ownerKey.getOwner(), ownerKey.getOwner());
ArrayList<NodeTkv> nodeTkvs = new ArrayList<>(partitions.size());
for (HgNodePartition partition : partitions) {
nodeTkvs.add(new NodeTkv(partition, table, ownerKey, ownerKey));
}
return nodeTkvs;
}
private List<NodeTkv> toNodeTkvList(String table, HgOwnerKey startKey, HgOwnerKey endKey) {
Collection<HgNodePartition> partitions =
doPartition(table, startKey.getOwner(), endKey.getOwner());
ArrayList<NodeTkv> nodeTkvs = new ArrayList<>(partitions.size());
for (HgNodePartition partition : partitions) {
nodeTkvs.add(new NodeTkv(partition, table, startKey, endKey));
}
return nodeTkvs;
}
private List<NodeTkv> toNodeTkvList(String table, int startCode, int endCode) {
Collection<HgNodePartition> partitions = this.doPartition(table, startCode, endCode);
ArrayList<NodeTkv> nodeTkvs = new ArrayList<>(partitions.size());
for (HgNodePartition partition : partitions) {
nodeTkvs.add(
new NodeTkv(partition, table, HgOwnerKey.codeOf(startCode),
HgOwnerKey.codeOf(endCode)));
}
return nodeTkvs;
}
/**
* @return not null
*/
private Collection<HgNodePartition> doPartition(String table, byte[] startKey, byte[] endKey) {
HgNodePartitionerBuilder partitionerBuilder = HgNodePartitionerBuilder.resetAndGet();
int status = this.nodePartitioner.partition(partitionerBuilder, this.graphName, startKey,
endKey);
if (status != 0) {
throw err("The node-partitioner is not work.");
}
Collection<HgNodePartition> partitions = partitionerBuilder.getPartitions();
if (partitions.isEmpty()) {
throw err("Failed to get the collection of HgNodePartition from node-partitioner.");
}
return partitions;
}
/**
* @return @return not null
*/
private Collection<HgNodePartition> doPartition(String table, int startCode, int endCode) {
HgNodePartitionerBuilder partitionerBuilder = HgNodePartitionerBuilder.resetAndGet();
int status = this.nodePartitioner.partition(partitionerBuilder, this.graphName, startCode,
endCode);
if (status != 0) {
throw err("The node-partitioner is not work.");
}
Collection<HgNodePartition> partitions = partitionerBuilder.getPartitions();
if (partitions.isEmpty()) {
throw err("Failed to get the collection of HgNodePartition from node-partitioner.");
}
return partitions;
}
Collection<HgNodePartition> doPartition(String table, int partitionId) {
HgNodePartitionerBuilder partitionerBuilder = HgNodePartitionerBuilder.resetAndGet();
int status =
this.nodePartitioner.partition(partitionerBuilder, this.graphName, partitionId);
if (status != 0) {
throw err("The node-partitioner is not work.");
}
Collection<HgNodePartition> partitions = partitionerBuilder.getPartitions();
if (partitions.isEmpty()) {
throw err("Failed to get the collection of HgNodePartition from node-partitioner.");
}
return partitions;
}
private Stream<HgPair<HgStoreNode, NodeTkv>> getNodeStream(String table) {
return this.toNodeTkvList(table)
.stream()
.map(
e -> new HgPair<>(this.getStoreNode(e.getNodeId()), e)
);
}
Stream<HgPair<HgStoreNode, NodeTkv>> getNodeStream(String table,
HgOwnerKey ownerKey) {
return this.toNodeTkvList(table, ownerKey)
.stream()
.map(
e -> new HgPair<>(this.getStoreNode(e.getNodeId()), e)
);
}
Stream<HgPair<HgStoreNode, NodeTkv>> getNodeStream(String table, HgOwnerKey startKey,
HgOwnerKey endKey) {
return this.toNodeTkvList(table, startKey, endKey)
.stream()
.map(
e -> new HgPair<>(this.getStoreNode(e.getNodeId()), e)
);
}
// private List<HgPair<HgStoreNode, NodeTkv>> getNode(String table) {
// List<NodeTkv> nodeTkvList = this.toNodeTkvList(table);
// return nodeTkv2Node(nodeTkvList);
// }
List<HgPair<HgStoreNode, NodeTkv>> getNode(String table, HgOwnerKey ownerKey) {
List<NodeTkv> nodeTkvList = this.toNodeTkvList(table, ownerKey);
return nodeTkv2Node(nodeTkvList);
}
List<HgPair<HgStoreNode, NodeTkv>> getNode(String table, HgOwnerKey startKey,
HgOwnerKey endKey) {
List<NodeTkv> nodeTkvList = this.toNodeTkvList(table, startKey, endKey);
return nodeTkv2Node(nodeTkvList);
}
//
//boolean doAction(String table, HgOwnerKey startKey, HgOwnerKey endKey,
// Function<NodeTkv, Boolean> action) {
// return this.doAction(table, startKey, endKey, action);
//
//}
// List<HgPair<HgStoreNode, NodeTkv>> getNode(String table, Integer endKey) {
// .stream()
// .map(e -> new NodeTkv(e, nodeParams.getX(), nodeParams.getY(), nodeParams.getY
// ().getKeyCode()))
// .map(
// e -> new HgPair<>(this.proxy.getStoreNode(e.getNodeId()), e)
// );
// Collection<HgNodePartition> nodePartitions = this.doPartition(table, endKey);
// for (HgNodePartition nodePartition: nodePartitions) {
//
// }
// return nodeTkv2Node(nodeTkvList);
//
// }
private List<HgPair<HgStoreNode, NodeTkv>> nodeTkv2Node(Collection<NodeTkv> nodeTkvList) {
ArrayList<HgPair<HgStoreNode, NodeTkv>> hgPairs = new ArrayList<>(nodeTkvList.size());
for (NodeTkv nodeTkv : nodeTkvList) {
hgPairs.add(new HgPair<>(this.getStoreNode(nodeTkv.getNodeId()), nodeTkv));
}
return hgPairs;
}
}