blob: f83730b187bbd3647fb7dd8beac9562d32172e41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment
* into only one FragmentInstance.
*/
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
private SubPlan subPlan;
private Analysis analysis;
private MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
// Record which PlanFragment the PlanNode belongs
Map<PlanNodeId, PlanFragmentId> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
public SimpleFragmentParallelPlanner(
SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
this.subPlan = subPlan;
this.analysis = analysis;
this.queryContext = context;
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
}
@Override
public List<FragmentInstance> parallelPlan() {
prepare();
calculateNodeTopologyBetweenInstance();
return fragmentInstanceList;
}
private void prepare() {
List<PlanFragment> fragments = subPlan.getPlanFragmentList();
for (PlanFragment fragment : fragments) {
recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
produceFragmentInstance(fragment);
}
}
private void produceFragmentInstance(PlanFragment fragment) {
// If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
// one by one
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
Filter timeFilter = analysis.getGlobalTimeFilter();
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(fragment.getId(), rootCopy),
fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType(),
queryContext.getTimeOut(),
fragment.isRoot());
// Get the target region for origin PlanFragment, then its instance will be distributed one
// of them.
TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
// Set DataRegion and target host for the instance
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
if (analysis.getStatement() instanceof QueryStatement) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
if (regionReplicaSet == null
|| regionReplicaSet.getDataNodeLocations() == null
|| regionReplicaSet.getDataNodeLocations().size() == 0) {
throw new IllegalArgumentException(
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
}
String readConsistencyLevel =
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
// enums
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
// When planning fragment onto specific DataNode, the DataNode whose endPoint is in
// black list won't be considered because it may have connection issue now.
List<TDataNodeLocation> availableDataNodes =
filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
if (availableDataNodes.size() == 0) {
String errorMsg =
String.format(
"all replicas for region[%s] are not available in these DataNodes[%s]",
regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations());
throw new IllegalArgumentException(errorMsg);
}
if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) {
logger.info("available replicas: " + availableDataNodes);
}
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
}
return availableDataNodes.get(targetIndex);
}
private List<TDataNodeLocation> filterAvailableTDataNode(
List<TDataNodeLocation> originalDataNodeList) {
List<TDataNodeLocation> result = new LinkedList<>();
for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
if (isAvailableDataNode(dataNodeLocation)) {
result.add(dataNodeLocation);
}
}
return result;
}
private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
if (endPoint.equals(dataNodeLocation.internalEndPoint)) {
return false;
}
}
return true;
}
private void calculateNodeTopologyBetweenInstance() {
for (FragmentInstance instance : fragmentInstanceList) {
PlanNode rootNode = instance.getFragment().getPlanNodeTree();
if (rootNode instanceof FragmentSinkNode) {
// Set target Endpoint for FragmentSinkNode
FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
PlanNodeId downStreamNodeId = sinkNode.getDownStreamPlanNodeId();
FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
sinkNode.setDownStream(
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint(),
downStreamInstance.getId(),
downStreamNodeId);
// Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
PlanNode downStreamExchangeNode =
downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
((ExchangeNode) downStreamExchangeNode)
.setUpstream(
instance.getHostDataNode().getMPPDataExchangeEndPoint(),
instance.getId(),
sinkNode.getPlanNodeId());
}
}
}
private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
return instanceMap.get(planNodeMap.get(exchangeNodeId));
}
private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
planNodeMap.put(root.getPlanNodeId(), planFragmentId);
for (PlanNode child : root.getChildren()) {
recordPlanNodeRelation(child, planFragmentId);
}
}
}