blob: 1ab3ca0acd3b73e107781e18330d960fcbeb5a10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.server;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
import org.apache.iotdb.cluster.exception.NotInSameGroupException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Processor;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.DataSyncService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class DataClusterServer extends RaftServer
implements TSDataService.AsyncIface, TSDataService.Iface {
private static final Logger logger = LoggerFactory.getLogger(DataClusterServer.class);
// key: the header of a data group, value: the member representing this node in this group and
// it is currently at service
private Map<Node, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
private Map<Node, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
private Map<Node, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
// key: the header of a data group, value: the member representing this node in this group but
// it is out of service because another node has joined the group and expelled this node, or
// the node itself is removed, but it is still stored to provide snapshot for other nodes
private StoppedMemberManager stoppedMemberManager;
private PartitionTable partitionTable;
private DataGroupMember.Factory dataMemberFactory;
private MetaGroupMember metaGroupMember;
public DataClusterServer(
Node thisNode, DataGroupMember.Factory dataMemberFactory, MetaGroupMember metaGroupMember) {
super(thisNode);
this.dataMemberFactory = dataMemberFactory;
this.metaGroupMember = metaGroupMember;
this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory, thisNode);
}
@Override
public void stop() {
closeLogManagers();
for (DataGroupMember member : headerGroupMap.values()) {
member.stop();
}
super.stop();
}
/**
* Add a DataGroupMember into this server, if a member with the same header exists, the old member
* will be stopped and replaced by the new one.
*
* @param dataGroupMember
*/
public void addDataGroupMember(DataGroupMember dataGroupMember) {
DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
if (removedMember != null) {
removedMember.stop();
asyncServiceMap.remove(dataGroupMember.getHeader());
syncServiceMap.remove(dataGroupMember.getHeader());
}
stoppedMemberManager.remove(dataGroupMember.getHeader());
headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
}
private <T> DataAsyncService getDataAsyncService(
Node header, AsyncMethodCallback<T> resultHandler, Object request) {
return asyncServiceMap.computeIfAbsent(
header,
h -> {
DataGroupMember dataMember = getDataMember(h, resultHandler, request);
return dataMember != null ? new DataAsyncService(dataMember) : null;
});
}
private DataSyncService getDataSyncService(Node header) {
return syncServiceMap.computeIfAbsent(
header,
h -> {
DataGroupMember dataMember = getDataMember(h, null, null);
return dataMember != null ? new DataSyncService(dataMember) : null;
});
}
/**
* @param header the header of the group which the local node is in
* @param resultHandler can be set to null if the request is an internal request
* @param request the toString() of this parameter should explain what the request is and it is
* only used in logs for tracing
* @return
*/
public <T> DataGroupMember getDataMember(
Node header, AsyncMethodCallback<T> resultHandler, Object request) {
// if the resultHandler is not null, then the request is a external one and must be with a
// header
if (header == null) {
if (resultHandler != null) {
resultHandler.onError(new NoHeaderNodeException());
}
return null;
}
DataGroupMember member = stoppedMemberManager.get(header);
if (member != null) {
return member;
}
// avoid creating two members for a header
Exception ex = null;
synchronized (headerGroupMap) {
member = headerGroupMap.get(header);
if (member != null) {
return member;
}
logger.info("Received a request \"{}\" from unregistered header {}", request, header);
if (partitionTable != null) {
try {
member = createNewMember(header);
} catch (NotInSameGroupException | CheckConsistencyException e) {
ex = e;
}
} else {
logger.info("Partition is not ready, cannot create member");
ex = new PartitionTableUnavailableException(thisNode);
}
if (ex != null && resultHandler != null) {
resultHandler.onError(ex);
}
return member;
}
}
/**
* @param header
* @return A DataGroupMember representing this node in the data group of the header.
* @throws NotInSameGroupException If this node is not in the group of the header.
*/
private DataGroupMember createNewMember(Node header)
throws NotInSameGroupException, CheckConsistencyException {
DataGroupMember member;
PartitionGroup partitionGroup;
partitionGroup = partitionTable.getHeaderGroup(header);
if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
// if the partition table is old, this node may have not been moved to the new group
metaGroupMember.syncLeaderWithConsistencyCheck(true);
partitionGroup = partitionTable.getHeaderGroup(header);
}
if (partitionGroup != null && partitionGroup.contains(thisNode)) {
// the two nodes are in the same group, create a new data member
member = dataMemberFactory.create(partitionGroup, thisNode);
DataGroupMember prevMember = headerGroupMap.put(header, member);
if (prevMember != null) {
prevMember.stop();
}
logger.info("Created a member for header {}", header);
member.start();
} else {
// the member may have been stopped after syncLeader
member = stoppedMemberManager.get(header);
if (member != null) {
return member;
}
logger.info(
"This node {} does not belong to the group {}, header {}",
thisNode,
partitionGroup,
header);
throw new NotInSameGroupException(partitionGroup, thisNode);
}
return member;
}
// Forward requests. Find the DataGroupMember that is in the group of the header of the
// request, and forward the request to it. See methods in DataGroupMember for details.
@Override
public void sendHeartbeat(
HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.sendHeartbeat(request, resultHandler);
}
}
@Override
public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.startElection(request, resultHandler);
}
}
@Override
public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.appendEntries(request, resultHandler);
}
}
@Override
public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.appendEntry(request, resultHandler);
}
}
@Override
public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.sendSnapshot(request, resultHandler);
}
}
@Override
public void pullSnapshot(
PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.pullSnapshot(request, resultHandler);
}
}
@Override
public void executeNonQueryPlan(
ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.executeNonQueryPlan(request, resultHandler);
}
}
@Override
public void refreshConnection(RefreshReuqest request, AsyncMethodCallback<Void> resultHandler) {
resultHandler.onComplete(null);
}
@Override
public void requestCommitIndex(
Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index");
if (service != null) {
service.requestCommitIndex(header, resultHandler);
}
}
@Override
public void readFile(
String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(thisNode, resultHandler, "Read file:" + filePath);
if (service != null) {
service.readFile(filePath, offset, length, resultHandler);
}
}
@Override
public void querySingleSeries(
SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
DataAsyncService service =
getDataAsyncService(
request.getHeader(), resultHandler, "Query series:" + request.getPath());
if (service != null) {
service.querySingleSeries(request, resultHandler);
}
}
@Override
public void queryMultSeries(
MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) throws TException {
DataAsyncService service =
getDataAsyncService(
request.getHeader(), resultHandler, "Query series:" + request.getPath());
if (service != null) {
service.queryMultSeries(request, resultHandler);
}
}
@Override
public void fetchSingleSeries(
Node header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch reader:" + readerId);
if (service != null) {
service.fetchSingleSeries(header, readerId, resultHandler);
}
}
@Override
public void fetchMultSeries(
Node header,
long readerId,
List<String> paths,
AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
throws TException {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch reader:" + readerId);
if (service != null) {
service.fetchMultSeries(header, readerId, paths, resultHandler);
}
}
@Override
public void getAllPaths(
Node header,
List<String> paths,
boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Find path:" + paths);
if (service != null) {
service.getAllPaths(header, paths, withAlias, resultHandler);
}
}
@Override
public void endQuery(
Node header, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "End query");
if (service != null) {
service.endQuery(header, thisNode, queryId, resultHandler);
}
}
@Override
public void querySingleSeriesByTimestamp(
SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
DataAsyncService service =
getDataAsyncService(
request.getHeader(),
resultHandler,
"Query by timestamp:"
+ request.getQueryId()
+ "#"
+ request.getPath()
+ " of "
+ request.getRequester());
if (service != null) {
service.querySingleSeriesByTimestamp(request, resultHandler);
}
}
@Override
public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
if (service != null) {
service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
}
}
@Override
public void pullTimeSeriesSchema(
PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
service.pullTimeSeriesSchema(request, resultHandler);
}
}
@Override
public void pullMeasurementSchema(
PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
DataAsyncService service =
getDataAsyncService(request.getHeader(), resultHandler, "Pull measurement schema");
service.pullMeasurementSchema(request, resultHandler);
}
@Override
public void getAllDevices(
Node header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Get all devices");
service.getAllDevices(header, paths, resultHandler);
}
@Override
public void getDevices(
Node header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler)
throws TException {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Get devices");
service.getDevices(header, planBytes, resultHandler);
}
@Override
public void getNodeList(
Node header, String path, int nodeLevel, AsyncMethodCallback<List<String>> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Get node list");
service.getNodeList(header, path, nodeLevel, resultHandler);
}
@Override
public void getChildNodeInNextLevel(
Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Get child node in next level");
service.getChildNodeInNextLevel(header, path, resultHandler);
}
@Override
public void getChildNodePathInNextLevel(
Node header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Get child node path in next level");
service.getChildNodePathInNextLevel(header, path, resultHandler);
}
@Override
public void getAllMeasurementSchema(
Node header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Get all measurement schema");
service.getAllMeasurementSchema(header, planBytes, resultHandler);
}
@Override
public void getAggrResult(
GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
service.getAggrResult(request, resultHandler);
}
@Override
public void getUnregisteredTimeseries(
Node header, List<String> timeseriesList, AsyncMethodCallback<List<String>> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Check if measurements are registered");
service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
}
@Override
public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
service.getGroupByExecutor(request, resultHandler);
}
@Override
public void getGroupByResult(
Node header,
long executorId,
long startTime,
long endTime,
AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Fetch group by");
service.getGroupByResult(header, executorId, startTime, endTime, resultHandler);
}
@Override
TProcessor getProcessor() {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new AsyncProcessor<>(this);
} else {
return new Processor<>(this);
}
}
@Override
TServerTransport getServerSocket() throws TTransportException {
logger.info(
"[{}] Cluster node will listen {}:{}",
getServerClientName(),
config.getInternalIp(),
config.getInternalDataPort());
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new TNonblockingServerSocket(
new InetSocketAddress(config.getInternalIp(), thisNode.getDataPort()),
getConnectionTimeoutInMS());
} else {
return new TServerSocket(
new InetSocketAddress(config.getInternalIp(), thisNode.getDataPort()));
}
}
@Override
String getClientThreadPrefix() {
return "DataClientThread-";
}
@Override
String getServerClientName() {
return "DataServerThread-";
}
/**
* Try adding the node into the group of each DataGroupMember, and if the DataGroupMember no
* longer stays in that group, also remove and stop it. If the new group contains this node, also
* create and add a new DataGroupMember for it.
*
* @param node
* @param result
*/
public void addNode(Node node, NodeAdditionResult result) {
Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
synchronized (headerGroupMap) {
while (entryIterator.hasNext()) {
Entry<Node, DataGroupMember> entry = entryIterator.next();
DataGroupMember dataGroupMember = entry.getValue();
// the member may be extruded from the group, remove and stop it if so
boolean shouldLeave = dataGroupMember.addNode(node, result);
if (shouldLeave) {
logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes());
entryIterator.remove();
removeMember(entry.getKey(), entry.getValue());
}
}
if (result.getNewGroup().contains(thisNode)) {
logger.info("Adding this node into a new group {}", result.getNewGroup());
DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode);
addDataGroupMember(dataGroupMember);
dataGroupMember.start();
dataGroupMember.pullNodeAdditionSnapshots(
((SlotPartitionTable) partitionTable).getNodeSlots(node), node);
}
}
}
private void removeMember(Node header, DataGroupMember dataGroupMember) {
try {
dataGroupMember.syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
dataGroupMember.setReadOnly();
dataGroupMember.stop();
stoppedMemberManager.put(header, dataGroupMember);
}
/**
* Set the partition table as the in-use one and build a DataGroupMember for each local group (the
* group which the local node is in) and start them.
*
* @param partitionTable
* @throws TTransportException
*/
@SuppressWarnings("java:S1135")
public void buildDataGroupMembers(PartitionTable partitionTable) {
setPartitionTable(partitionTable);
// TODO-Cluster: if there are unchanged members, do not stop and restart them
// clear previous members if the partition table is reloaded
for (DataGroupMember value : headerGroupMap.values()) {
value.stop();
}
for (DataGroupMember value : headerGroupMap.values()) {
value.setUnchanged(false);
}
List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups();
for (PartitionGroup partitionGroup : partitionGroups) {
DataGroupMember prevMember = headerGroupMap.get(partitionGroup.getHeader());
if (prevMember == null || !prevMember.getAllNodes().equals(partitionGroup)) {
logger.info("Building member of data group: {}", partitionGroup);
// no previous member or member changed
DataGroupMember dataGroupMember = dataMemberFactory.create(partitionGroup, thisNode);
dataGroupMember.start();
// the previous member will be replaced here
addDataGroupMember(dataGroupMember);
dataGroupMember.setUnchanged(true);
} else {
prevMember.setUnchanged(true);
}
}
// remove out-dated members of this node
headerGroupMap.entrySet().removeIf(e -> !e.getValue().isUnchanged());
logger.info("Data group members are ready");
}
/**
* Try removing a node from the groups of each DataGroupMember. If the node is the header of some
* group, set the member to read only so that it can still provide data for other nodes that has
* not yet pulled its data. If the node is the local node, remove all members whose group is not
* headed by this node. Otherwise, just change the node list of the member and pull new data. And
* create a new DataGroupMember if this node should join a new group because of this removal.
*
* @param node
* @param removalResult cluster changes due to the node removal
*/
public void removeNode(Node node, NodeRemovalResult removalResult) {
Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
synchronized (headerGroupMap) {
while (entryIterator.hasNext()) {
Entry<Node, DataGroupMember> entry = entryIterator.next();
DataGroupMember dataGroupMember = entry.getValue();
if (dataGroupMember.getHeader().equals(node)) {
// the group is removed as the node is removed, so new writes should be rejected as
// they belong to the new holder, but the member is kept alive for other nodes to pull
// snapshots
entryIterator.remove();
removeMember(entry.getKey(), entry.getValue());
} else {
if (node.equals(thisNode)) {
// this node is removed, it is no more replica of other groups
List<Integer> nodeSlots =
((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader());
dataGroupMember.removeLocalData(nodeSlots);
entryIterator.remove();
dataGroupMember.stop();
} else {
// the group should be updated and pull new slots from the removed node
dataGroupMember.removeNode(node, removalResult);
}
}
}
PartitionGroup newGroup = removalResult.getNewGroup();
if (newGroup != null) {
logger.info("{} should join a new group {}", thisNode, newGroup);
try {
createNewMember(newGroup.getHeader());
} catch (NotInSameGroupException e) {
// ignored
} catch (CheckConsistencyException ce) {
logger.error("remove node failed, error={}", ce.getMessage());
}
}
}
}
public void setPartitionTable(PartitionTable partitionTable) {
this.partitionTable = partitionTable;
}
/**
* When the node joins a cluster, it also creates a new data group and a corresponding member
* which has no data. This is to make that member pull data from other nodes.
*/
public void pullSnapshots() {
List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(thisNode);
DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
}
/** @return The reports of every DataGroupMember in this node. */
public List<DataMemberReport> genMemberReports() {
List<DataMemberReport> dataMemberReports = new ArrayList<>();
for (DataGroupMember value : headerGroupMap.values()) {
dataMemberReports.add(value.genReport());
}
return dataMemberReports;
}
@Override
public void previousFill(
PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
service.previousFill(request, resultHandler);
}
public void closeLogManagers() {
for (DataGroupMember member : headerGroupMap.values()) {
member.closeLogManager();
}
}
@Override
public void matchTerm(
long index, long term, Node header, AsyncMethodCallback<Boolean> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Match term");
service.matchTerm(index, term, header, resultHandler);
}
@Override
public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, "last");
service.last(request, resultHandler);
}
@Override
public void getPathCount(
Node header,
List<String> pathsToQuery,
int level,
AsyncMethodCallback<Integer> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "count path");
service.getPathCount(header, pathsToQuery, level, resultHandler);
}
@Override
public void getDeviceCount(
Node header, List<String> pathsToQuery, AsyncMethodCallback<Integer> resultHandler)
throws TException {
DataAsyncService service = getDataAsyncService(header, resultHandler, "count device");
if (service != null) {
service.getDeviceCount(header, pathsToQuery, resultHandler);
}
}
@Override
public void onSnapshotApplied(
Node header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Snapshot applied");
service.onSnapshotApplied(header, slots, resultHandler);
}
@Override
public long querySingleSeries(SingleSeriesQueryRequest request) throws TException {
return getDataSyncService(request.getHeader()).querySingleSeries(request);
}
@Override
public long queryMultSeries(MultSeriesQueryRequest request) throws TException {
return getDataSyncService(request.getHeader()).queryMultSeries(request);
}
@Override
public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TException {
return getDataSyncService(header).fetchSingleSeries(header, readerId);
}
@Override
public Map<String, ByteBuffer> fetchMultSeries(Node header, long readerId, List<String> paths)
throws TException {
return getDataSyncService(header).fetchMultSeries(header, readerId, paths);
}
@Override
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException {
return getDataSyncService(request.getHeader()).querySingleSeriesByTimestamp(request);
}
@Override
public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
}
@Override
public void endQuery(Node header, Node thisNode, long queryId) throws TException {
getDataSyncService(header).endQuery(header, thisNode, queryId);
}
@Override
public GetAllPathsResult getAllPaths(Node header, List<String> path, boolean withAlias)
throws TException {
return getDataSyncService(header).getAllPaths(header, path, withAlias);
}
@Override
public Set<String> getAllDevices(Node header, List<String> path) throws TException {
return getDataSyncService(header).getAllDevices(header, path);
}
@Override
public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException {
return getDataSyncService(header).getNodeList(header, path, nodeLevel);
}
@Override
public Set<String> getChildNodeInNextLevel(Node header, String path) throws TException {
return getDataSyncService(header).getChildNodeInNextLevel(header, path);
}
@Override
public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
return getDataSyncService(header).getChildNodePathInNextLevel(header, path);
}
@Override
public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer planBinary) throws TException {
return getDataSyncService(header).getAllMeasurementSchema(header, planBinary);
}
@Override
public ByteBuffer getDevices(Node header, ByteBuffer planBinary) throws TException {
return getDataSyncService(header).getDevices(header, planBinary);
}
@Override
public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws TException {
return getDataSyncService(request.getHeader()).getAggrResult(request);
}
@Override
public List<String> getUnregisteredTimeseries(Node header, List<String> timeseriesList)
throws TException {
return getDataSyncService(header).getUnregisteredTimeseries(header, timeseriesList);
}
@Override
public PullSnapshotResp pullSnapshot(PullSnapshotRequest request) throws TException {
return getDataSyncService(request.getHeader()).pullSnapshot(request);
}
@Override
public long getGroupByExecutor(GroupByRequest request) throws TException {
return getDataSyncService(request.header).getGroupByExecutor(request);
}
@Override
public List<ByteBuffer> getGroupByResult(
Node header, long executorId, long startTime, long endTime) throws TException {
return getDataSyncService(header).getGroupByResult(header, executorId, startTime, endTime);
}
@Override
public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException {
return getDataSyncService(request.getHeader()).pullTimeSeriesSchema(request);
}
@Override
public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException {
return getDataSyncService(request.getHeader()).pullMeasurementSchema(request);
}
@Override
public ByteBuffer previousFill(PreviousFillRequest request) throws TException {
return getDataSyncService(request.getHeader()).previousFill(request);
}
@Override
public ByteBuffer last(LastQueryRequest request) throws TException {
return getDataSyncService(request.getHeader()).last(request);
}
@Override
public int getPathCount(Node header, List<String> pathsToQuery, int level) throws TException {
return getDataSyncService(header).getPathCount(header, pathsToQuery, level);
}
@Override
public boolean onSnapshotApplied(Node header, List<Integer> slots) {
return getDataSyncService(header).onSnapshotApplied(header, slots);
}
@Override
public int getDeviceCount(Node header, List<String> pathsToQuery) throws TException {
return getDataSyncService(header).getDeviceCount(header, pathsToQuery);
}
@Override
public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
return getDataSyncService(request.getHeader()).sendHeartbeat(request);
}
@Override
public long startElection(ElectionRequest request) {
return getDataSyncService(request.getHeader()).startElection(request);
}
@Override
public long appendEntries(AppendEntriesRequest request) throws TException {
return getDataSyncService(request.getHeader()).appendEntries(request);
}
@Override
public long appendEntry(AppendEntryRequest request) throws TException {
return getDataSyncService(request.getHeader()).appendEntry(request);
}
@Override
public void sendSnapshot(SendSnapshotRequest request) throws TException {
getDataSyncService(request.getHeader()).sendSnapshot(request);
}
@Override
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
return getDataSyncService(request.getHeader()).executeNonQueryPlan(request);
}
@Override
public void refreshConnection(RefreshReuqest request) {}
@Override
public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {
return getDataSyncService(header).requestCommitIndex(header);
}
@Override
public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
return getDataSyncService(thisNode).readFile(filePath, offset, length);
}
@Override
public boolean matchTerm(long index, long term, Node header) {
return getDataSyncService(header).matchTerm(index, term, header);
}
@Override
public ByteBuffer peekNextNotNullValue(Node header, long executorId, long startTime, long endTime)
throws TException {
return getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime);
}
@Override
public void peekNextNotNullValue(
Node header,
long executorId,
long startTime,
long endTime,
AsyncMethodCallback<ByteBuffer> resultHandler)
throws TException {
resultHandler.onComplete(
getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime));
}
@Override
public void removeHardLink(String hardLinkPath) throws TException {
getDataSyncService(thisNode).removeHardLink(hardLinkPath);
}
@Override
public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
getDataAsyncService(thisNode, resultHandler, hardLinkPath)
.removeHardLink(hardLinkPath, resultHandler);
}
}