make locally dispatch concurrently
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 72ad599..6194996 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -37,7 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -230,9 +229,7 @@ } analysis.setRedirectNodeList(redirectInfo); - List<WritePlanNode> res = new ArrayList<>(splitMap.values()); - Collections.shuffle(res); - return res; + return new ArrayList<>(splitMap.values()); } @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 2e89294..de756c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,12 +19,15 @@ package org.apache.iotdb.db.queryengine.plan.scheduler; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -54,9 +57,15 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; @@ -85,6 +94,10 @@ private static final String UNEXPECTED_ERRORS = "Unexpected errors: "; + private final Lock queueLock = new ReentrantLock(); + + private final Map<Integer, ExecutorService> dataRegionQueueMap = new HashMap<>(); + public FragmentInstanceDispatcherImpl( QueryType type, MPPQueryContext queryContext, @@ -192,18 +205,69 @@ if (!localInstances.isEmpty()) { // sync dispatch to local long localScheduleStartTime = System.nanoTime(); - for (FragmentInstance localInstance : localInstances) { - try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { - dispatchLocally(localInstance); - } catch (FragmentInstanceDispatchException e) { - dataNodeFailureList.add(e.getFailureStatus()); - } catch (Throwable t) { - logger.warn(DISPATCH_FAILED, t); - dataNodeFailureList.add( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + if (localInstances.get(0).getRegionReplicaSet().regionId.type + == TConsensusGroupType.DataRegion) { + List<Future<Void>> resList = new ArrayList<>(localInstances.size()); + queueLock.lock(); + try { + for (FragmentInstance localInstance : localInstances) { + try { + int regionId = localInstance.getRegionReplicaSet().getRegionId().id; + ExecutorService executorService = dataRegionQueueMap.get(regionId); + if (executorService == null) { + executorService = + IoTDBThreadPoolFactory.newFixedThreadPool( + 1, ThreadName.DATA_REGION_CONSUMER.getName() + "-" + regionId); + dataRegionQueueMap.put(regionId, executorService); + } + resList.add(executorService.submit(new writeFITask(localInstance))); + } catch (Throwable t) { + logger.warn(DISPATCH_FAILED, t); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + } + } + } finally { + queueLock.unlock(); + } + + for (Future<Void> future : resList) { + try { + future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof FragmentInstanceDispatchException) { + FragmentInstanceDispatchException cause = + (FragmentInstanceDispatchException) e.getCause(); + dataNodeFailureList.add(cause.getFailureStatus()); + } else { + logger.warn(DISPATCH_FAILED, e); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + e.getMessage())); + } + } catch (Throwable e) { + logger.warn(DISPATCH_FAILED, e); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + e.getMessage())); + } + } + } else { + for (FragmentInstance localInstance : localInstances) { + try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { + dispatchLocally(localInstance); + } catch (FragmentInstanceDispatchException e) { + dataNodeFailureList.add(e.getFailureStatus()); + } catch (Throwable t) { + logger.warn(DISPATCH_FAILED, t); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + } } } + PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost( System.nanoTime() - localScheduleStartTime); } @@ -239,6 +303,57 @@ } } + private static class writeFITask implements Callable<Void> { + + private final FragmentInstance instance; + + public writeFITask(FragmentInstance instance) { + this.instance = instance; + } + + @Override + public Void call() throws Exception { + // deserialize ConsensusGroupId + ConsensusGroupId groupId = null; + if (instance.getExecutorType().isStorageExecutor()) { + try { + groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId( + instance.getRegionReplicaSet().getRegionId()); + } catch (Throwable t) { + logger.warn("Deserialize ConsensusGroupId failed. ", t); + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, + "Deserialize ConsensusGroupId failed: " + t.getMessage())); + } + } + + PlanNode planNode = instance.getFragment().getPlanNodeTree(); + RegionWriteExecutor writeExecutor = new RegionWriteExecutor(); + RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode); + if (!writeResult.isAccepted()) { + logger.warn( + "write locally failed. TSStatus: {}, message: {}", + writeResult.getStatus(), + writeResult.getMessage()); + if (writeResult.getStatus() == null) { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResult.getMessage())); + } else { + throw new FragmentInstanceDispatchException(writeResult.getStatus()); + } + } else { + // some expected and accepted status except SUCCESS_STATUS need to be returned + TSStatus status = writeResult.getStatus(); + if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new FragmentInstanceDispatchException(status); + } + } + return null; + } + } + private void dispatchOneInstance(FragmentInstance instance) throws FragmentInstanceDispatchException { TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index aff32b2..29f7d83 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -45,6 +45,7 @@ DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"), MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"), ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"), + DATA_REGION_CONSUMER("Data-Region-Consumer"), // -------------------------- Compaction -------------------------- COMPACTION_WORKER("Compaction-Worker"), @@ -198,7 +199,8 @@ DATANODE_INTERNAL_RPC_SERVICE, DATANODE_INTERNAL_RPC_PROCESSOR, MPP_COORDINATOR_WRITE_EXECUTOR, - ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL)); + ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL, + DATA_REGION_CONSUMER)); private static final Set<ThreadName> compactionThreadNames = new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK, COMPACTION_SCHEDULE));