reduce rpc count for dispatch and thread count for internal service Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java index 29a5f80..0d6406c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -25,15 +25,18 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,38 +46,53 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; public class AsyncPlanNodeSender { + private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class); private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager; private final List<FragmentInstance> instances; - private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap; + + private final Map<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> batchRequests; + private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap; private final AtomicLong pendingNumber; + private final long startSendTime; public AsyncPlanNodeSender( IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager, List<FragmentInstance> instances) { + this.startSendTime = System.nanoTime(); this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; this.instances = instances; - this.instanceId2RespMap = new ConcurrentHashMap<>(); - this.pendingNumber = new AtomicLong(instances.size()); + this.batchRequests = new HashMap<>(instances.size()); + for (int i = 0; i < instances.size(); i++) { + Pair<List<Integer>, TSendPlanNodeBatchReq> value = + this.batchRequests.computeIfAbsent( + instances.get(i).getHostDataNode().getInternalEndPoint(), + x -> new Pair<>(new ArrayList<>(), new TSendPlanNodeBatchReq())); + value.getLeft().add(i); + value + .getRight() + .addToRequests( + new TSendPlanNodeSingleReq( + new TPlanNode( + instances.get(i).getFragment().getPlanNodeTree().serializeToByteBuffer()), + instances.get(i).getRegionReplicaSet().getRegionId())); + } + this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size()); + this.pendingNumber = new AtomicLong(batchRequests.keySet().size()); } public void sendAll() { - long startSendTime = System.nanoTime(); - for (int i = 0; i < instances.size(); ++i) { - FragmentInstance instance = instances.get(i); + for (Map.Entry<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> entry : + batchRequests.entrySet()) { AsyncSendPlanNodeHandler handler = - new AsyncSendPlanNodeHandler(i, pendingNumber, instanceId2RespMap, startSendTime); + new AsyncSendPlanNodeHandler( + entry.getValue().getLeft(), pendingNumber, instanceId2RespMap, startSendTime); try { - TSendPlanNodeReq sendPlanNodeReq = - new TSendPlanNodeReq( - new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()), - instance.getRegionReplicaSet().getRegionId()); AsyncDataNodeInternalServiceClient client = - asyncInternalServiceClientManager.borrowClient( - instance.getHostDataNode().getInternalEndPoint()); - client.sendPlanNode(sendPlanNodeReq, handler); + asyncInternalServiceClientManager.borrowClient(entry.getKey()); + client.sendPlanNode(entry.getValue().getRight(), handler); } catch (Exception e) { handler.onError(e); } @@ -92,7 +110,7 @@ public List<TSStatus> getFailureStatusList() { List<TSStatus> failureStatusList = new ArrayList<>(); TSStatus status; - for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) { + for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) { status = entry.getValue().getStatus(); if (!entry.getValue().accepted) { if (status == null) { @@ -122,7 +140,7 @@ } public Future<FragInstanceDispatchResult> getResult() { - for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) { + for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) { if (!entry.getValue().accepted) { logger.warn( "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java index 2bd50a6..eab4bb5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -19,37 +19,42 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.async.AsyncMethodCallback; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> { - private final int instanceId; +public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeBatchResp> { + + private final List<Integer> instanceIds; private final AtomicLong pendingNumber; - private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap; + private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap; private final long sendTime; private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); public AsyncSendPlanNodeHandler( - int instanceId, + List<Integer> instanceIds, AtomicLong pendingNumber, - Map<Integer, TSendPlanNodeResp> instanceId2RespMap, + Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap, long sendTime) { - this.instanceId = instanceId; + this.instanceIds = instanceIds; this.pendingNumber = pendingNumber; this.instanceId2RespMap = instanceId2RespMap; this.sendTime = sendTime; } @Override - public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) { - instanceId2RespMap.put(instanceId, tSendPlanNodeResp); + public void onComplete(TSendPlanNodeBatchResp tSendPlanNodeResp) { + for (int i = 0; i < tSendPlanNodeResp.getResponses().size(); i++) { + instanceId2RespMap.put(instanceIds.get(i), tSendPlanNodeResp.getResponses().get(i)); + } if (pendingNumber.decrementAndGet() == 0) { PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime); synchronized (pendingNumber) { @@ -60,13 +65,13 @@ @Override public void onError(Exception e) { - TSendPlanNodeResp resp = new TSendPlanNodeResp(); + TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp(); String errorMsg = String.format("Fail to send plan node, exception message: %s", e); resp.setAccepted(false); resp.setMessage(errorMsg); resp.setStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)); - instanceId2RespMap.put(instanceId, resp); + instanceIds.forEach(instanceId -> instanceId2RespMap.put(instanceId, resp)); if (pendingNumber.decrementAndGet() == 0) { PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime); synchronized (pendingNumber) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index a471e92..9ba7e1d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -42,8 +42,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -268,11 +270,15 @@ } break; case WRITE: - TSendPlanNodeReq sendPlanNodeReq = - new TSendPlanNodeReq( - new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()), - instance.getRegionReplicaSet().getRegionId()); - TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq); + TSendPlanNodeBatchReq sendPlanNodeReq = + new TSendPlanNodeBatchReq( + Collections.singletonList( + new TSendPlanNodeSingleReq( + new TPlanNode( + instance.getFragment().getPlanNodeTree().serializeToByteBuffer()), + instance.getRegionReplicaSet().getRegionId()))); + TSendPlanNodeSingleResp sendPlanNodeResp = + client.sendPlanNode(sendPlanNodeReq).getResponses().get(0); if (!sendPlanNodeResp.accepted) { logger.warn( "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 7f45d9b..7bd9795 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -175,8 +175,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp; import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; @@ -289,18 +290,24 @@ } @Override - public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) { - LOGGER.debug("receive PlanNode to group[{}]", req.getConsensusGroupId()); - ConsensusGroupId groupId = - ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); - PlanNode planNode = PlanNodeType.deserialize(req.planNode.body); - RegionWriteExecutor executor = new RegionWriteExecutor(); - TSendPlanNodeResp resp = new TSendPlanNodeResp(); - RegionExecutionResult executionResult = executor.execute(groupId, planNode); - resp.setAccepted(executionResult.isAccepted()); - resp.setMessage(executionResult.getMessage()); - resp.setStatus(executionResult.getStatus()); - return resp; + public TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req) { + TSendPlanNodeBatchResp responses = new TSendPlanNodeBatchResp(); + req.getRequests() + .forEach( + request -> { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId( + request.getConsensusGroupId()); + PlanNode planNode = PlanNodeType.deserialize(request.planNode.body); + RegionWriteExecutor executor = new RegionWriteExecutor(); + TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp(); + RegionExecutionResult executionResult = executor.execute(groupId, planNode); + resp.setAccepted(executionResult.isAccepted()); + resp.setMessage(executionResult.getMessage()); + resp.setStatus(executionResult.getStatus()); + responses.addToResponses(resp); + }); + return responses; } @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index c06fcf6..43e9cc6 100644 --- a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -43,8 +43,9 @@ import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp; +import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; @@ -61,11 +62,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class DataNodeInternalRPCServiceImplTest { + private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl; private static final int dataNodeId = 0; @@ -145,16 +148,18 @@ ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer(); // put serialized planNode to TSendPlanNodeReq - TSendPlanNodeReq request = new TSendPlanNodeReq(); + TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq(); TPlanNode tPlanNode = new TPlanNode(); tPlanNode.setBody(byteBuffer); request.setPlanNode(tPlanNode); request.setConsensusGroupId(regionReplicaSet.getRegionId()); // Use consensus layer to execute request - TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request); + TSendPlanNodeBatchResp response = + dataNodeInternalRPCServiceImpl.sendPlanNode( + new TSendPlanNodeBatchReq(Collections.singletonList(request))); - Assert.assertTrue(response.accepted); + Assert.assertTrue(response.getResponses().get(0).accepted); } @Test @@ -221,16 +226,18 @@ ByteBuffer byteBuffer = createAlignedTimeSeriesNode.serializeToByteBuffer(); // put serialized planNode to TSendPlanNodeReq - TSendPlanNodeReq request = new TSendPlanNodeReq(); + TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq(); TPlanNode tPlanNode = new TPlanNode(); tPlanNode.setBody(byteBuffer); request.setPlanNode(tPlanNode); request.setConsensusGroupId(regionReplicaSet.getRegionId()); // Use consensus layer to execute request - TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request); + TSendPlanNodeBatchResp response = + dataNodeInternalRPCServiceImpl.sendPlanNode( + new TSendPlanNodeBatchReq(Collections.singletonList(request))); - Assert.assertTrue(response.accepted); + Assert.assertTrue(response.getResponses().get(0).accepted); } @Test @@ -308,16 +315,18 @@ ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer(); // put serialized planNode to TSendPlanNodeReq - TSendPlanNodeReq request = new TSendPlanNodeReq(); + TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq(); TPlanNode tPlanNode = new TPlanNode(); tPlanNode.setBody(byteBuffer); request.setPlanNode(tPlanNode); request.setConsensusGroupId(regionReplicaSet.getRegionId()); // Use consensus layer to execute request - TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request); + TSendPlanNodeBatchResp response = + dataNodeInternalRPCServiceImpl.sendPlanNode( + new TSendPlanNodeBatchReq(Collections.singletonList(request))); - Assert.assertTrue(response.accepted); + Assert.assertTrue(response.getResponses().get(0).accepted); } private TRegionReplicaSet genRegionReplicaSet() {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift index 73b6314..c4c40ee 100644 --- a/thrift/src/main/thrift/datanode.thrift +++ b/thrift/src/main/thrift/datanode.thrift
@@ -118,12 +118,20 @@ 2: optional string message } -struct TSendPlanNodeReq { +struct TSendPlanNodeBatchReq { + 1: required list<TSendPlanNodeSingleReq> requests; +} + +struct TSendPlanNodeSingleReq { 1: required TPlanNode planNode 2: required common.TConsensusGroupId consensusGroupId } -struct TSendPlanNodeResp { +struct TSendPlanNodeBatchResp { + 1: required list<TSendPlanNodeSingleResp> responses; +} + +struct TSendPlanNodeSingleResp { 1: required bool accepted 2: optional string message 3: optional common.TSStatus status @@ -472,7 +480,7 @@ /** * dispatch PlanNode to remote node for write request in order to save resource */ - TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req); + TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req); TFragmentInstanceInfoResp fetchFragmentInstanceInfo(TFetchFragmentInstanceInfoReq req);