blob: 448412d6513ab8877566a10f9c7d12f80cd91de6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
/**
* The simple parallelizer determines the level of parallelization of a plan
* based on the cost of the underlying operations. It doesn't take into account
* system load or other factors. Based on the cost of the query, the
* parallelization for each major fragment will be determined. Once the amount
* of parallelization is done, assignment is done based on round robin
* assignment ordered by operator affinity (locality) to available execution
* Drillbits.
*/
public abstract class SimpleParallelizer implements QueryParallelizer {
static final Logger logger = LoggerFactory.getLogger(SimpleParallelizer.class);
private final long parallelizationThreshold;
private final int maxWidthPerNode;
private final int maxGlobalWidth;
private final double affinityFactor;
private boolean enableDynamicFC;
protected SimpleParallelizer(QueryContext context) {
OptionManager optionManager = context.getOptions();
long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
// compute the maxwidth
this.maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average, maxWidth);
this.maxGlobalWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue();
this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
this.enableDynamicFC = optionManager.getBoolean(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC);
}
protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
this.parallelizationThreshold = parallelizationThreshold;
this.maxWidthPerNode = maxWidthPerNode;
this.maxGlobalWidth = maxGlobalWidth;
this.affinityFactor = affinityFactor;
}
@Override
public long getSliceTarget() {
return parallelizationThreshold;
}
@Override
public int getMaxWidthPerNode() {
return maxWidthPerNode;
}
@Override
public int getMaxGlobalWidth() {
return maxGlobalWidth;
}
@Override
public double getAffinityFactor() {
return affinityFactor;
}
public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
// Gets the root fragment by removing all the dependent fragments on which
// root fragments depend upon. This is fine because the later parallelizer
// code traverses from these root fragments to their respective dependent
// fragments.
final Set<Wrapper> roots = Sets.newHashSet();
for (Wrapper w : planningSet) {
roots.add(w);
}
// Roots will be left over with the fragments which are not depended upon by any other fragments.
for (Wrapper wrapper : planningSet) {
final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for (Wrapper dependency : fragmentDependencies) {
if (roots.contains(dependency)) {
roots.remove(dependency);
}
}
}
}
return roots;
}
public PlanningSet prepareFragmentTree(Fragment rootFragment) {
PlanningSet planningSet = new PlanningSet();
initFragmentWrappers(rootFragment, planningSet);
constructFragmentDependencyGraph(rootFragment, planningSet);
return planningSet;
}
/**
* Traverse all the major fragments and parallelize each major fragment based on
* collected stats. The children fragments are parallelized before a parent
* fragment.
* @param planningSet Set of all major fragments and their context.
* @param roots Root nodes of the plan.
* @param activeEndpoints currently active drillbit endpoints.
* @throws PhysicalOperatorSetupException
*/
public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots,
Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
for (Wrapper wrapper : roots) {
traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> {
// If this fragment is already parallelized then no need do it again.
// This happens in the case of fragments which have MUX operators.
if (fragmentWrapper.isEndpointsAssignmentDone()) {
return;
}
fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
fragmentWrapper.getStats()
.getDistributionAffinity()
.getFragmentParallelizer()
.parallelizeFragment(fragmentWrapper, this, activeEndpoints);
//consolidate the cpu resources required by this major fragment per drillbit.
fragmentWrapper.computeCpuResources();
}));
}
}
public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
/**
* The starting function for the whole parallelization and memory computation logic.
* 1) Initially a fragment tree is prepared which contains a wrapper for each fragment.
* The topology of this tree is same as that of the major fragment tree.
* 2) Traverse this fragment tree to collect stats for each major fragment and then
* parallelize each fragment. At this stage minor fragments are not created but all
* the required information to create minor fragment are computed.
* 3) Memory is computed for each operator and for the minor fragment.
* 4) Lastly all the above computed information is used to create the minor fragments
* for each major fragment.
*
* @param options List of options set by the user.
* @param foremanNode foreman node for this query plan.
* @param queryId Query ID.
* @param activeEndpoints currently active endpoints on which this plan will run.
* @param rootFragment Root major fragment.
* @param session session context.
* @param queryContextInfo query context.
* @return
* @throws ExecutionSetupException
*/
@Override
public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
PlanningSet planningSet = prepareFragmentTree(rootFragment);
Set<Wrapper> rootFragments = getRootFragments(planningSet);
collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
adjustMemory(planningSet, rootFragments, activeEndpoints);
return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
}
/**
* Create multiple physical plans from original query planning, it will allow execute them eventually independently
* @param options
* @param foremanNode
* @param queryId
* @param activeEndpoints
* @param reader
* @param rootFragment
* @param session
* @param queryContextInfo
* @return The {@link QueryWorkUnit}s.
* @throws ExecutionSetupException
*/
public List<QueryWorkUnit> getSplitFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
// no op
throw new UnsupportedOperationException("Use children classes");
}
// For every fragment, create a Wrapper in PlanningSet.
@VisibleForTesting
public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) {
planningSet.get(rootFragment);
for (ExchangeFragmentPair fragmentPair : rootFragment) {
initFragmentWrappers(fragmentPair.getNode(), planningSet);
}
}
/**
* Based on the affinity of the Exchange that separates two fragments, setup fragment dependencies.
*
* @param planningSet
* @return Returns a list of leaf fragments in fragment dependency graph.
*/
private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
for (Wrapper currentFragment : planningSet) {
ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair();
if (sendingXchgForCurrFrag != null) {
ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency();
Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode());
// Mostly Receivers of the current fragment depend on the sender of the child fragments.
// However there is a special case for DeMux Exchanges where the Sender of the current
// fragment depends on the receiver of the parent fragment.
if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
receivingFragmentWrapper.addFragmentDependency(currentFragment);
} else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
currentFragment.addFragmentDependency(receivingFragmentWrapper);
}
}
}
planningSet.findRootWrapper(rootFragment);
}
/**
* Call operation on each fragment. Traversal calls operation
* on child fragments before calling it on the parent fragment.
*/
protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException {
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for (Wrapper dependency : fragmentDependencies) {
traverse(dependency, operation);
}
}
operation.accept(fragmentWrapper);
}
// A function which returns the memory assigned for a particular physical operator.
protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory();
protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
Fragment rootNode, PlanningSet planningSet, UserSession session,
QueryContextInformation queryContextInfo) throws ExecutionSetupException {
List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
MinorFragmentDefn rootFragmentDefn = null;
FragmentRoot rootOperator = null;
// Generate all the individual plan fragments and associated assignments. Note, we need all endpoints
// assigned before we can materialize.
for (Wrapper wrapper : planningSet) {
Fragment node = wrapper.getNode();
final PhysicalOperator physicalOperatorRoot = node.getRoot();
boolean isRootNode = rootNode == node;
if (isRootNode && wrapper.getWidth() != 1) {
throw new ForemanSetupException(String.format("Failure while trying to setup fragment. " +
"The root fragment must always have parallelization one. In the current case, the width was set to %d.",
wrapper.getWidth()));
}
// A fragment is self-driven if it doesn't rely on any other exchanges.
boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;
// Create a minorFragment for each major fragment.
for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
(fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory());
wrapper.resetAllocation();
PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
Preconditions.checkArgument(op instanceof FragmentRoot);
FragmentRoot root = (FragmentRoot) op;
FragmentHandle handle = FragmentHandle
.newBuilder()
.setMajorFragmentId(wrapper.getMajorFragmentId())
.setMinorFragmentId(minorFragmentId)
.setQueryId(queryId)
.build();
PlanFragment fragment = PlanFragment.newBuilder()
.setForeman(foremanNode)
.setHandle(handle)
.setAssignment(wrapper.getAssignedEndpoint(minorFragmentId))
.setLeafFragment(isLeafFragment)
.setContext(queryContextInfo)
.setMemInitial(wrapper.getInitialAllocation())
.setMemMax(wrapper.getMaxAllocation())
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root, enableDynamicFC))
.build();
MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options);
if (isRootNode) {
logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
rootFragmentDefn = fragmentDefn;
rootOperator = root;
} else {
logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
fragmentDefns.add(fragmentDefn);
}
}
}
Wrapper rootWrapper = planningSet.getRootWrapper();
return new QueryWorkUnit(rootOperator, rootFragmentDefn, fragmentDefns, rootWrapper);
}
/**
* Designed to setup initial values for arriving fragment accounting.
*/
protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
private boolean enableDynamicFC;
CountRequiredFragments(boolean enableDynamicFC) {
this.enableDynamicFC = enableDynamicFC;
}
public static List<Collector> getCollectors(PhysicalOperator root, boolean enableDynamicFC) {
List<Collector> collectors = Lists.newArrayList();
CountRequiredFragments countRequiredFragments = new CountRequiredFragments(enableDynamicFC);
root.accept(countRequiredFragments, collectors);
return collectors;
}
@Override
public Void visitReceiver(Receiver receiver, List<Collector> collectors) throws RuntimeException {
List<MinorFragmentEndpoint> endpoints = receiver.getProvidingEndpoints();
List<Integer> list = new ArrayList<>(endpoints.size());
for (MinorFragmentEndpoint ep : endpoints) {
list.add(ep.getId());
}
collectors.add(Collector.newBuilder()
.setIsSpooling(receiver.isSpooling())
.setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
.setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange())
.setEnableDynamicFc(enableDynamicFC)
.addAllIncomingMinorFragment(list)
.build());
return null;
}
@Override
public Void visitOp(PhysicalOperator op, List<Collector> collectors) throws RuntimeException {
for (PhysicalOperator o : op) {
o.accept(this, collectors);
}
return null;
}
}
}