Implement all fragment instance sharing dop in one dataNode
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index f810bc1..b299dd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -20,6 +20,7 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +28,7 @@
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -75,6 +77,8 @@
// session info
private SessionInfo sessionInfo;
+ private int degreeOfParallelism = 1;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -84,9 +88,12 @@
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ FragmentInstance fragmentInstance) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo);
+ new FragmentInstanceContext(id, stateMachine, fragmentInstance.getSessionInfo());
+ instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -95,11 +102,16 @@
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
- SessionInfo sessionInfo,
IDataRegionForQuery dataRegion,
- Filter timeFilter) {
+ FragmentInstance fragmentInstance) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo, dataRegion, timeFilter);
+ new FragmentInstanceContext(
+ id,
+ stateMachine,
+ fragmentInstance.getSessionInfo(),
+ dataRegion,
+ fragmentInstance.getTimeFilter());
+ instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -368,4 +380,24 @@
sourcePaths = null;
sharedQueryDataSource = null;
}
+
+ private static int calculateDegreeOfParallelism(FragmentInstance fragmentInstance) {
+ int systemDop = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+ int instanceNumInDataNode = fragmentInstance.getInstanceNumInDataNode();
+ if (instanceNumInDataNode >= systemDop) {
+ return 1;
+ } else if (fragmentInstance.isRoot()) {
+ return systemDop / instanceNumInDataNode + systemDop % instanceNumInDataNode;
+ } else {
+ return systemDop / instanceNumInDataNode;
+ }
+ }
+
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ this.degreeOfParallelism = degreeOfParallelism;
+ }
+
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390f..47fbd43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -123,11 +123,7 @@
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId,
- stateMachine,
- instance.getSessionInfo(),
- dataRegion,
- instance.getTimeFilter()));
+ fragmentInstanceId, stateMachine, dataRegion, instance));
try {
List<PipelineDriverFactory> driverFactories =
@@ -190,7 +186,7 @@
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId, stateMachine, instance));
try {
List<PipelineDriverFactory> driverFactories =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde55..f6e4eb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -89,6 +89,7 @@
this.nextPipelineId = new AtomicInteger(0);
this.driverContext = new DataDriverContext(instanceContext, getNextPipelineId());
this.pipelineDriverFactories = new ArrayList<>();
+ this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
}
// For creating subContext, differ from parent context mainly in driver context
@@ -119,6 +120,7 @@
this.dataRegionTTL = Long.MAX_VALUE;
this.driverContext =
new SchemaDriverContext(instanceContext, schemaRegion, getNextPipelineId());
+ this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
this.pipelineDriverFactories = new ArrayList<>();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 6361a91..9ccf34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -57,15 +57,17 @@
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
- private SubPlan subPlan;
- private Analysis analysis;
- private MPPQueryContext queryContext;
+ private final SubPlan subPlan;
+ private final Analysis analysis;
+ private final MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
- Map<PlanFragmentId, FragmentInstance> instanceMap;
+ private final Map<PlanFragmentId, FragmentInstance> instanceMap;
// Record which PlanFragment the PlanNode belongs
- Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
- List<FragmentInstance> fragmentInstanceList;
+ private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
+ private final List<FragmentInstance> fragmentInstanceList;
+ // Record which dataNode each instance allocated to
+ private final Map<TDataNodeLocation, List<FragmentInstance>> locationToInstanceMap;
public SimpleFragmentParallelPlanner(
SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -75,6 +77,7 @@
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
+ this.locationToInstanceMap = new HashMap<>();
}
@Override
@@ -90,6 +93,13 @@
recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
produceFragmentInstance(fragment);
}
+ for (Map.Entry<TDataNodeLocation, List<FragmentInstance>> entry :
+ locationToInstanceMap.entrySet()) {
+ List<FragmentInstance> fragmentInstances = entry.getValue();
+ int instanceNumInDataNode = fragmentInstances.size();
+ fragmentInstances.forEach(
+ instance -> instance.setInstanceNumInDataNode(instanceNumInDataNode));
+ }
}
private void produceFragmentInstance(PlanFragment fragment) {
@@ -137,6 +147,9 @@
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
+ locationToInstanceMap
+ .computeIfAbsent(fragmentInstance.getHostDataNode(), x -> new ArrayList<>())
+ .add(fragmentInstance);
}
private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 97bd7e1..310b3f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -68,6 +68,8 @@
private final SessionInfo sessionInfo;
+ private int instanceNumInDataNode;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -126,6 +128,14 @@
return executorType.getRegionReplicaSet();
}
+ public void setInstanceNumInDataNode(int instanceNumInDataNode) {
+ this.instanceNumInDataNode = instanceNumInDataNode;
+ }
+
+ public int getInstanceNumInDataNode() {
+ return instanceNumInDataNode;
+ }
+
public PlanFragment getFragment() {
return fragment;
}
@@ -183,6 +193,7 @@
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
+ fragmentInstance.setInstanceNumInDataNode(ReadWriteIOUtils.readInt(buffer));
return fragmentInstance;
}
@@ -205,6 +216,7 @@
if (hostDataNode != null) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream);
}
+ ReadWriteIOUtils.write(instanceNumInDataNode, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
logger.error("Unexpected error occurs when serializing this FragmentInstance.", e);