blob: 484637294106c601e3921607ce8d589b5e56e0b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.AbstractMuxExchange;
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A visitor to compute memory requirements for each operator in a minor fragment.
* This visitor will be called for each major fragment. It traverses the physical operators
* in that major fragment and computes the memory for each operator per each minor fragment.
* The minor fragment memory resources are further grouped into per Drillbit resource
* requirements.
*/
public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeException> {
private final PlanningSet planningSet;
// List of all the buffered operators and their memory requirement per drillbit.
private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
private final QueryContext queryContext;
public MemoryCalculator(PlanningSet planningSet, QueryContext context) {
this.planningSet = planningSet;
this.bufferedOperators = new HashMap<>();
this.queryContext = context;
}
// Helper method to compute the minor fragment count per drillbit. This method returns
// a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments)
// per Drillbit.
private Map<DrillbitEndpoint, Integer> getMinorFragCountPerDrillbit(Wrapper currFragment) {
return currFragment.getAssignedEndpoints().stream()
.collect(Collectors.groupingBy(Function.identity(),
Collectors.summingInt(x -> 1)));
}
// Helper method to merge the memory computations for each operator given memory per operator
// and the number of minor fragments per Drillbit.
private void merge(Wrapper currFrag,
Map<DrillbitEndpoint, Integer> minorFragsPerDrillBit,
Function<Entry<DrillbitEndpoint, Integer>, Long> getMemory) {
NodeResource.merge(currFrag.getResourceMap(),
minorFragsPerDrillBit.entrySet()
.stream()
.collect(Collectors.toMap((x) -> x.getKey(),
(x) -> NodeResource.create(0,
getMemory.apply(x)))));
}
@Override
public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
Wrapper receivingFragment = planningSet.get(fragment.getNode().getSendingExchangePair().getNode());
merge(fragment,
getMinorFragCountPerDrillbit(fragment),
// get the memory requirements for the sender operator.
(x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue()));
return visitOp(exchange, fragment);
}
@Override
public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
final List<Fragment.ExchangeFragmentPair> receivingExchangePairs = fragment.getNode().getReceivingExchangePairs();
final Map<DrillbitEndpoint, Integer> sendingFragsPerDrillBit = new HashMap<>();
for(Fragment.ExchangeFragmentPair pair : receivingExchangePairs) {
if (pair.getExchange() == exchange) {
Wrapper sendingFragment = planningSet.get(pair.getNode());
Preconditions.checkArgument(sendingFragment.isEndpointsAssignmentDone());
for (DrillbitEndpoint endpoint : sendingFragment.getAssignedEndpoints()) {
sendingFragsPerDrillBit.putIfAbsent(endpoint, 0);
sendingFragsPerDrillBit.put(endpoint, sendingFragsPerDrillBit.get(endpoint)+1);
}
}
}
final int totalSendingFrags = sendingFragsPerDrillBit.entrySet().stream()
.mapToInt((x) -> x.getValue()).reduce(0, (x, y) -> x+y);
merge(fragment,
getMinorFragCountPerDrillbit(fragment),
(x) -> exchange.getReceiverMemory(fragment.getWidth(),
// If the exchange is a MuxExchange then the sending fragments are from that particular drillbit otherwise
// sending fragments are from across the cluster.
exchange instanceof AbstractMuxExchange ? sendingFragsPerDrillBit.get(x.getKey()) : totalSendingFrags));
return null;
}
public List<Pair<PhysicalOperator, Long>> getBufferedOperators(DrillbitEndpoint endpoint) {
return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>());
}
@Override
public Void visitOp(PhysicalOperator op, Wrapper fragment) {
long memoryCost = (int) Math.ceil(op.getCost().getMemoryCost());
if (op.isBufferedOperator(queryContext)) {
// If the operator is a buffered operator then get the memory estimates of the optimizer.
// The memory estimates of the optimizer are for the whole operator spread across all the
// minor fragments. Divide this memory estimation by fragment width to get the memory
// requirement per minor fragment.
long memoryCostPerMinorFrag = (int) Math.ceil(memoryCost/fragment.getAssignedEndpoints().size());
Map<DrillbitEndpoint, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);
Map<DrillbitEndpoint,
Pair<PhysicalOperator, Long>> bufferedOperatorsPerDrillbit =
drillbitEndpointMinorFragMap.entrySet().stream()
.collect(Collectors.toMap((x) -> x.getKey(),
(x) -> Pair.of(op,
memoryCostPerMinorFrag * x.getValue())));
bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> {
bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>());
bufferedOperators.get(x.getKey()).add(x.getValue());
});
merge(fragment,
drillbitEndpointMinorFragMap,
(x) -> memoryCostPerMinorFrag * x.getValue());
} else {
// Memory requirement for the non-buffered operators is just the batch size.
merge(fragment,
getMinorFragCountPerDrillbit(fragment),
(x) -> memoryCost * x.getValue());
}
for (PhysicalOperator child : op) {
child.accept(this, fragment);
}
return null;
}
}